SpringCloudStream极简教程
1.SpringBoot整合XXLJob2023-10-102.SpringBoot集成海康网络设备SDK2023-03-243.SpringBoot程序预装载数据2022-04-284.SpringBoot整合Nacos自动刷新配置2022-01-285.SpringBoot集成Shiro2021-07-096.Spring Boot 统一RESTful接口响应和统一异常处理2020-09-177.Spring Boot Admin极简教程2020-09-048.Spring Boot集成Actuator2020-09-019.Spring Boot日志框架Slf4j+logback2020-09-0110.Spring Boot接口设计2020-07-1711.SpringBoot整合MinIO2023-09-1312.SpringBoot如何缓存方法返回值?2023-10-2413.SpringBoot对象拷贝2023-12-2514.SpringBoot中Bean的条件装配01-2615.SpringBoot使用git-commit-id-maven-plugin打包02-2116.SpringBoot中bean的生命周期04-0717.SpringCloud解决feign调用token丢失问题05-1718.记录一次WhatTheFuck经历05-2719.Guava中的Joiner和Splitter10-10
20.SpringCloudStream极简教程12-06
简介
Spring Cloud Stream 是一个轻量级消息驱动微服务框架,旨在简化与消息中间件(如 Kafka、RabbitMQ 等)的集成,支持消息的发布和订阅模式。它提供了一种基于 Spring 编程模型的方式(即自动依赖注入和强调通过注解来完成功能的封装),使得构建可扩展和灵活的消息驱动应用变得更加简单。
特点
- 消息中间件支持:Spring Cloud Stream 支持多种消息中间件,包括 Kafka、RabbitMQ 等,用户可以通过简单的配置切换不同的消息系统而不需修改业务逻辑代码。
- 绑定模型:Spring Cloud Stream 通过“绑定”抽象层来简化与消息中间件的交互。开发者不需要直接处理底层的消息中间件,而是通过定义“绑定器”来与消息源(如 Kafka、RabbitMQ)进行通信。
- 消息驱动:提供了事件驱动和流处理的支持。
- 简化配置:通过 Spring Boot 的自动配置,Spring Cloud Stream 可以通过简单的属性配置来进行消息系统的连接和消息传递。
- 可扩展性:Spring Cloud Stream 支持开发者使用自定义的消息转换器、处理器等组件,使得消息传递过程能够根据具体业务需求进行灵活定制。
- 与 Spring Cloud 集成:在 Spring.io 中是SpringCloud下的顶级项目,可以与SpringCloud其它项目无缝集成,适用于微服务架构。
核心模块
- Binder:用于实现消息系统的具体接入,例如 Kafka、RabbitMQ 等。
- Channel:消息的通道,通过 @StreamListener 注解来监听通道,接收和处理消息。消息生产者和消费者都是通过Channel来处理消息的。
- Producer & Consumer:生产者和消费者,分别负责消息的发布和订阅。Spring Cloud Stream 提供了注解 @Output 和 @Input 来标注消息通道的生产与消费。
最佳实践
pom
<code> <dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.7.6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2021.0.6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies> </dependencyManagement> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2021.0.6.2</version>
</dependency>
<!-- SpringCloud Alibaba Nacos Config -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2021.0.6.2</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency> </dependencies> </code>
- 核心依赖:spring-cloud-stream-binder-kafka
- 其中已经包含了 spring-kafka 这个依赖,无需重复引入
- 从其依赖关系来看,SpringCloudStream 的实现引用了 SpringIntegration 这个框架,这也是一个比较有趣的框架,是Spring的顶级框架,感兴趣的可以参看 SpringIntegration漫谈 了解 SpringIntegration 框架的设计立场和实现思路。
yml
<code> spring:
kafka:
consumer:
max-poll-records: 50
bootstrap-servers: 192.168.1.92:9092 </code>
- max-poll-records:指定消费者每次从 Kafka 拉取(poll)时能够获取的最大消息数量。
- bootstrap-servers:kafka的server端的连接地址。注意需要将kafka-server 的 server.properties 配置文件中的 listeners=PRIVATE://0.0.0.0:9092 并且 advertised.listeners=PRIVATE://192.168.1.92:9092 ,否则无法对外提供服务。
定义消息通道
<code> import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface AircraftChannel {
/**
* kafka topic 名称
*/
String TOPIC = KAFKA_TOPIC_NGH_AIRCRAFT;
/**
* 定义消费者接收消息的通道
* @return
*/
@Input(AircraftChannel.TOPIC)
SubscribableChannel input();
/**
* 定义生产者发送消息的通道
* @return
*/
@Output(AircraftChannel.TOPIC)
MessageChannel output();
}
</code>
- @Input: 使用input注解指定此方法来处理消息的接收
- @Output:使用output注解指定此方法来处理消息的发送
- 一个频道中可以定义多个input和output
定义消息生产者
<code> import com.nghsmart.nghaircraft.channel.AircraftChannel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
@AllArgsConstructor @Slf4j @EnableBinding(AircraftChannel.class) public class AircraftProducer {
private final AircraftChannel aircraftChannel;
public void sendMessage(String message) {
boolean send = aircraftChannel.output().send(MessageBuilder.withPayload(message).build());
log.info("send message: {}", message);
}
}
</code>
- EnableBinding:此注解修饰的类会被Spring容器管理起来,其导入了@Configuration注解。
- EnableBinding 注解为 AircraftChannel.class 中的接口创建实现类,并通过Spring的自动配置,实现类会对接kafka的adapter,这样就实现了通道和kafkaServer的绑定
- send:通过注入频道并调用频道中output处理器的send方法将消息发送到kafakServer中的特定topic,即AircraftChannel.TOPIC
定义消息消费者
<code> import com.nghsmart.nghaircraft.channel.AircraftChannel;
import com.nghsmart.nghaircraft.config.RedisTemplateGeneric;
import com.nghsmart.nghaircraft.constant.RedisKeyEnum;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
@Slf4j @AllArgsConstructor @EnableBinding(AircraftChannel.class) public class AircraftConsumer {
@StreamListener(AircraftChannel.TOPIC)
public void receiveMessage(Message<String> message) {
try {
log.debug("AircraftConsumer_Received_message: {}", message.getPayload());
//TODO 解析数据
} catch (Exception e) {
log.error("AircraftConsumer_error,msg={}", e.getMessage());
e.printStackTrace();
}
}
}
</code>
- EnableBinding:此注解修饰的类会被Spring容器管理起来,其导入了@Configuration注解。
- EnableBinding 注解为 AircraftChannel.class 中的接口创建实现类,并通过Spring的自动配置,实现类会对接kafka的adapter,这样就实现了通道和kafkaServer的绑定
- StreamListener:通过StreamListener注解为AircraftChannel.TOPIC这个topic创建监听,当kafkaAdapter接收到消息后,将触发回调,调用receiveMessage方法处理消息。
定义Http接口
通过请求接口,发送消息到 kafka
<code> import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@Slf4j @RequiredArgsConstructor @RestController @RequestMapping("/test") public class TestController {
private final AircraftProducer aircraftProducer;
@GetMapping("/test1")
public String test1() {
aircraftProducer.sendMessage("test1");
return "test1";
}
}
</code>
- 新建一个RESTFful接口用于测试消息的发送
- 通过注入AircraftProducer,调用其sendMessage 方法发送消息
- 消息发送出去之后,会被AircraftConsumer监听到,并回调到receiveMessage,可以通过观察log,查看消息的整个生命周期流转。
总结
本文介绍了 SpringCloudStream 这个框架的作用和相关生态,并编写了相应的代码示例作为 最佳实践 参考,代码示例会上传到我的代码仓库 SpringBoot漫谈 中(见引用),欢迎大家浏览、学习、交流。
公众号: TechnologyRamble,欢迎大家关注!!!
引用
__EOF__
合集:
SpringBoot
, 极简教程
分类:
Spring Cloud
推荐该文 关注博主关注博主 收藏本文 分享微信
0
0
« 上一篇: 使用Nginx搭建流媒体服务器
我的博客
[Ctrl+Enter快捷键提交]
【推荐】FFA 2024大会视频回放:Apache Flink 的过去、现在及未来 【推荐】抖音旗下AI助手豆包,你的智能百科全书,全免费不限次数 【推荐】轻量又高性能的 SSH 工具 IShell:AI 加持,快人一步
· .NET Core 锁(Lock)底层原理浅谈 · ASP.NET Core 9.0 中新增的MapStaticAssets() 中间件 · Asp.net MVC 中的 Http 管道事件为什么要以 Application_ 开头? · .NET Core 异步(Async)底层原理浅谈 · 什么是.NET的强类型字符串(Strongly typed string)?
2019-12-06 在ASP.NET 中调用 WebService 服务