跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
默认wiki名称
搜索
搜索
创建账号
登录
个人工具
创建账号
登录
查看“测试robot”的源代码
页面
讨论
简体中文
阅读
查看源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
查看源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
←
测试robot
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于这些用户组的用户执行:
用户
、
管理员
您可以查看和复制此页面的源代码。
<div id="mainContent"> <div class="forFlow"> <div id="post_detail"> <div id="topics"> <div class="post"> = SpringCloudStream极简教程 = <div class="postBody"> <div class="blogpost-body" id="cnblogs_post_body"> <div class="current-collection"> <div class="current-collection-title">[[合集 - SpringBoot(20)]] </div> <div aria-roledescription="list" class="current-collection-links">[[1.SpringBoot整合XXLJob2023-10-10]][[2.SpringBoot集成海康网络设备SDK2023-03-24]][[3.SpringBoot程序预装载数据2022-04-28]][[4.SpringBoot整合Nacos自动刷新配置2022-01-28]][[5.SpringBoot集成Shiro2021-07-09]][[6.Spring Boot 统一RESTful接口响应和统一异常处理2020-09-17]][[7.Spring Boot Admin极简教程2020-09-04]][[8.Spring Boot集成Actuator2020-09-01]][[9.Spring Boot日志框架Slf4j+logback2020-09-01]][[10.Spring Boot接口设计2020-07-17]][[11.SpringBoot整合MinIO2023-09-13]][[12.SpringBoot如何缓存方法返回值?2023-10-24]][[13.SpringBoot对象拷贝2023-12-25]][[14.SpringBoot中Bean的条件装配01-26]][[15.SpringBoot使用git-commit-id-maven-plugin打包02-21]][[16.SpringBoot中bean的生命周期04-07]][[17.SpringCloud解决feign调用token丢失问题05-17]][[18.记录一次WhatTheFuck经历05-27]][[19.Guava中的Joiner和Splitter10-10]] <div aria-roledescription="listitem" class="current-collection-item current" data-serial="20">20.SpringCloudStream极简教程12-06 </div> </div> <div class="current-collection-footer current-collection-collapse-button-wrapper"> <div class="current-collection-collapse-button clickable">收起 </div> </div> </div> == 简介 == Spring Cloud Stream 是一个轻量级消息驱动微服务框架,旨在简化与消息中间件(如 Kafka、RabbitMQ 等)的集成,支持消息的发布和订阅模式。它提供了一种基于 Spring 编程模型的方式(即自动依赖注入和强调通过注解来完成功能的封装),使得构建可扩展和灵活的消息驱动应用变得更加简单。 === 特点 === * 消息中间件支持:Spring Cloud Stream 支持多种消息中间件,包括 Kafka、RabbitMQ 等,用户可以通过简单的配置切换不同的消息系统而不需修改业务逻辑代码。 * 绑定模型:Spring Cloud Stream 通过“绑定”抽象层来简化与消息中间件的交互。开发者不需要直接处理底层的消息中间件,而是通过定义“绑定器”来与消息源(如 Kafka、RabbitMQ)进行通信。 * 消息驱动:提供了事件驱动和流处理的支持。 * 简化配置:通过 Spring Boot 的自动配置,Spring Cloud Stream 可以通过简单的属性配置来进行消息系统的连接和消息传递。 * 可扩展性:Spring Cloud Stream 支持开发者使用自定义的消息转换器、处理器等组件,使得消息传递过程能够根据具体业务需求进行灵活定制。 * 与 Spring Cloud 集成:在 Spring.io 中是SpringCloud下的顶级项目,可以与SpringCloud其它项目无缝集成,适用于微服务架构。 == 核心模块 == * Binder:用于实现消息系统的具体接入,例如 Kafka、RabbitMQ 等。 * Channel:消息的通道,通过 @StreamListener 注解来监听通道,接收和处理消息。消息生产者和消费者都是通过Channel来处理消息的。 * Producer & Consumer:生产者和消费者,分别负责消息的发布和订阅。Spring Cloud Stream 提供了注解 @Output 和 @Input 来标注消息通道的生产与消费。 == 最佳实践 == === pom === <pre><code> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.7.6</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2021.0.6</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>${spring-boot.version}</version> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2021.0.6.2</version> </dependency> <!-- SpringCloud Alibaba Nacos Config --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> <version>2021.0.6.2</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> </dependencies> </code> </pre> * 核心依赖:spring-cloud-stream-binder-kafka * 其中已经包含了 spring-kafka 这个依赖,无需重复引入 * 从其依赖关系来看,SpringCloudStream 的实现引用了 SpringIntegration 这个框架,这也是一个比较有趣的框架,是Spring的顶级框架,感兴趣的可以参看 [[SpringIntegration漫谈]] 了解 SpringIntegration 框架的设计立场和实现思路。 === yml === <pre><code> spring: kafka: consumer: max-poll-records: 50 bootstrap-servers: 192.168.1.92:9092 </code> </pre> * max-poll-records:指定消费者每次从 Kafka 拉取(poll)时能够获取的最大消息数量。 * bootstrap-servers:kafka的server端的连接地址。注意需要将kafka-server 的 server.properties 配置文件中的 listeners=PRIVATE://0.0.0.0:9092 并且 advertised.listeners=PRIVATE://192.168.1.92:9092 ,否则无法对外提供服务。 === 定义消息通道 === <pre><code> import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface AircraftChannel { /** * kafka topic 名称 */ String TOPIC = KAFKA_TOPIC_NGH_AIRCRAFT; /** * 定义消费者接收消息的通道 * @return */ @Input(AircraftChannel.TOPIC) SubscribableChannel input(); /** * 定义生产者发送消息的通道 * @return */ @Output(AircraftChannel.TOPIC) MessageChannel output(); } </code> </pre> * @Input: 使用input注解指定此方法来处理消息的接收 * @Output:使用output注解指定此方法来处理消息的发送 * 一个频道中可以定义多个input和output === 定义消息生产者 === <pre><code> import com.nghsmart.nghaircraft.channel.AircraftChannel; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; @AllArgsConstructor @Slf4j @EnableBinding(AircraftChannel.class) public class AircraftProducer { private final AircraftChannel aircraftChannel; public void sendMessage(String message) { boolean send = aircraftChannel.output().send(MessageBuilder.withPayload(message).build()); log.info("send message: {}", message); } } </code> </pre> * EnableBinding:此注解修饰的类会被Spring容器管理起来,其导入了@Configuration注解。 * EnableBinding 注解为 AircraftChannel.class 中的接口创建实现类,并通过Spring的自动配置,实现类会对接kafka的adapter,这样就实现了通道和kafkaServer的绑定 * send:通过注入频道并调用频道中output处理器的send方法将消息发送到kafakServer中的特定topic,即AircraftChannel.TOPIC === 定义消息消费者 === <pre><code> import com.nghsmart.nghaircraft.channel.AircraftChannel; import com.nghsmart.nghaircraft.config.RedisTemplateGeneric; import com.nghsmart.nghaircraft.constant.RedisKeyEnum; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; @Slf4j @AllArgsConstructor @EnableBinding(AircraftChannel.class) public class AircraftConsumer { @StreamListener(AircraftChannel.TOPIC) public void receiveMessage(Message<String> message) { try { log.debug("AircraftConsumer_Received_message: {}", message.getPayload()); //TODO 解析数据 } catch (Exception e) { log.error("AircraftConsumer_error,msg={}", e.getMessage()); e.printStackTrace(); } } } </code> </pre> * EnableBinding:此注解修饰的类会被Spring容器管理起来,其导入了@Configuration注解。 * EnableBinding 注解为 AircraftChannel.class 中的接口创建实现类,并通过Spring的自动配置,实现类会对接kafka的adapter,这样就实现了通道和kafkaServer的绑定 * StreamListener:通过StreamListener注解为AircraftChannel.TOPIC这个topic创建监听,当kafkaAdapter接收到消息后,将触发回调,调用receiveMessage方法处理消息。 === 定义Http接口 === <blockquote> 通过请求接口,发送消息到 kafka </blockquote> <pre><code> import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.core.io.DefaultResourceLoader; import org.springframework.core.io.Resource; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @Slf4j @RequiredArgsConstructor @RestController @RequestMapping("/test") public class TestController { private final AircraftProducer aircraftProducer; @GetMapping("/test1") public String test1() { aircraftProducer.sendMessage("test1"); return "test1"; } } </code> </pre> * 新建一个RESTFful接口用于测试消息的发送 * 通过注入AircraftProducer,调用其sendMessage 方法发送消息 * 消息发送出去之后,会被AircraftConsumer监听到,并回调到receiveMessage,可以通过观察log,查看消息的整个生命周期流转。 == 总结 == 本文介绍了 SpringCloudStream 这个框架的作用和相关生态,并编写了相应的代码示例作为 '''最佳实践''' 参考,代码示例会上传到我的代码仓库 [[SpringBoot漫谈]] 中(见引用),欢迎大家浏览、学习、交流。 公众号: '''TechnologyRamble''',欢迎大家关注!!! == 引用 == * [[https://spring.io/projects/spring-cloud-stream#overview]] * [[https://gitee.com/naylor_personal/ramble-spring-boot]] <div> * [[简介]] * [[特点]] * [[核心模块]] * [[最佳实践]] * [[pom]] * [[yml]] * [[定义消息通道]] * [[定义消息生产者]] * [[定义消息消费者]] * [[定义Http接口]] * [[总结]] * [[引用]] </div> __EOF__ <div id="articleSuffix"> <div class="articleSuffix-left"> [[Image:20191223145109.png]] </div> <div class="articleSuffix-right"> '''本文作者:''' [[一颗苹果]] '''本文链接:''' [[https://www.cnblogs.com/Naylor/p/18590377]] '''关于博主:''' 评论和私信会在第一时间回复。或者[[直接私信]]我。 '''版权声明:''' 本博客所有文章除特别声明外,均采用 [[BY-NC-SA]] 许可协议。转载请注明出处! '''声援博主:''' 如果您觉得文章对您有帮助,可以点击文章右下角'''【】'''一下。 </div> </div> </div> <div id="MySignature" role="contentinfo"> <div> 邮箱:cnaylor@163.com </div> <div> 技术交流QQ群:1158377441 </div> <div> 欢迎关注我的微信公众号【TechnologyRamble】,后续博文将在公众号首发: </div> <div> [[Image:o_240521084523_2.png|TechnologyRamble]] </div> </div> <div id="blog_post_info_block" role="contentinfo"> <div id="BlogCollection"> 合集: [[SpringBoot]] , [[极简教程]] </div> <div id="BlogPostCategory"> 分类: [[Spring Cloud]] , [[微服务]] , [[极简教程]] </div> <div id="EntryTag"> 标签: [[SpringCloudStream]] , [[SpringIntegration]] , [[kafka]] </div> <div id="blog_post_info"> <div id="green_channel"> 推荐该文 关注博主关注博主 收藏本文 分享微信 </div> <div id="author_profile"> <div class="author_profile_info" id="author_profile_info"> [[Image:20191223145109.png|20191223145109.png]] <div class="author_profile_info" id="author_profile_detail"> [[Naylor]] [[粉丝 - 64]] [[关注 - 176]] </div> </div> </div> <div id="div_digg"> <div class="diggit" onclick="votePost(18590377,'Digg')"> 0 </div> <div class="buryit" onclick="votePost(18590377,'Bury')"> 0 </div> </div> </div> <div id="post_next_prev"> [[«]] 上一篇: [[使用Nginx搭建流媒体服务器]] </div> </div> </div> <div class="postDesc" style="display: block;">posted @ 2024-12-06 11:41 [[Naylor]] 阅读(71) 评论(0) [[编辑]] </div> </div> </div> </div> <div class="commentform" id="comment_form"> <div id="comment_nav"> <div class="comment-nav-right"> [[刷新页面]][[返回顶部]] </div> </div> <div id="comment_form_container"> <div id="commentform_title"> 发表评论 [[升级成为园子VIP会员]] </div> <div class="commentbox_main comment_textarea"> <div class="commentbox_title"> <div class="commentbox_title_left"> 编辑 预览 </div> </div> <div style="display:none"> cea86a70-79e4-4e09-8cc4-08d8d71b3f30 </div> <div class="commentbox_footer"> 自动补全 </div> </div> [[我的博客]] [Ctrl+Enter快捷键提交] </div> <div id="cnblogs_ch">[[【推荐】100%开源!大型工业跨平台软件C++源码提供,建模,组态!]] [[【推荐】FFA 2024大会视频回放:Apache Flink 的过去、现在及未来]] [[【推荐】抖音旗下AI助手豆包,你的智能百科全书,全免费不限次数]] [[【推荐】轻量又高性能的 SSH 工具 IShell:AI 加持,快人一步]] </div> <div class="under-post-card" id="blog_c1"> [[]] </div> <div id="under_post_card1"> <div class="under-post-card"> '''编辑推荐:''' · [[.NET Core 锁(Lock)底层原理浅谈]] · [[ASP.NET Core 9.0 中新增的MapStaticAssets() 中间件]] · [[Asp.net MVC 中的 Http 管道事件为什么要以 Application_ 开头?]] · [[.NET Core 异步(Async)底层原理浅谈]] · [[什么是.NET的强类型字符串(Strongly typed string)?]] </div> </div> <div class="under-post-card" id="cnblogs_c2"> [[]] </div> <div id="under_post_card2"> <div class="itnews under-post-card"> '''阅读排行:''' · [[一个有趣的插件,让写代码变成打怪升级的游戏]] · [[《熬夜整理》保姆级系列教程-玩转Wireshark抓包神器教程(8)-Wireshark的TCP包详]] · [[构建你的.NET Aspire解决方案]] · [[.NET Core 锁(Lock)底层原理浅谈]] · [[技术项目文档书写规范指南]] </div> </div> <div class="under-post-card" id="HistoryToday"> '''历史上的今天:''' 2019-12-06 [[在ASP.NET 中调用 WebService 服务]] </div> </div> </div> </div>
返回
测试robot
。
开关有限宽度模式