Spring Cloud Stream 消息驱动组件帮助我们更快速,更方便,更友好地去构建消息驱动微服务。
Stream解决的痛点问题
MQ消息中间件广泛应用在 应用解耦合、 异步消息处理、 流量削峰等场景中。
但是,不同的MQ消息中间件内部机制、使用方式都有所不同。比如 RabbitMQ中有Exchange(交换机/交换器)的概念,kafka有Topic、Partition分区的概念。MQ消息中间件的差异性不利于我们上层的开发应用,如果系统在做MQ消息中间件的切换时,会比较困难。
Spring Cloud Stream 进行了很好的上层抽象,让我们与具体的消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,类似Hibernate屏蔽掉具体的数据库MySQL/Oracle一样。
目前Spring Cloud Stream支持RabbitMQ和Kafka。
本质上:屏蔽掉了底层不同MQ消息中间件之间的差异,统一了MQ的编程模型,降低了学习、开发、维护MQ的成本。
Stream重要概念
inputs:相当于MQ的消息消费者Consumer;
outputs:相当于MQ的消息生产者Producer;
Binder对象:用来屏蔽底层MQ的细节,负责与具体的消息中间件交互。
应用程序通过inputs或者outputs与Spring Cloud Stream中的Binder对象交互。

Application Core: 应用服务(生产者/消费者)的业务逻辑代码。
inputs:关联消费者(相对应用服务来说,消息输入/消息消费)。
outputs:关联生产者。(相对应用程序来说,消息输出/消息生产)。
Binder:绑定器对象,用于屏蔽底层MQ差异,Stream提供不同的Binder,当需要切换MQ产品时,只需要切换Binder即可,而不需要修改任何应用逻辑(Binder绑定器的实现是框架内置的,Spring Cloud Stream目前支持RabbitMQ、Kafka两种消息队列)。
传统MQ模型与Stream消息驱动模型
传统MQ模型
Stream消息驱动模型
Stream消息通信方式及编程模型
Stream消息通信方式
在Spring Cloud Stream中消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,会通过共享的Topic主题进行广播,消息消费者在订阅的主题中收到消息并触发自身的业务逻辑处理。
Topic:Stream中的抽象概念,用来代表发布共享消息给消费者的地方。 在RabbitMQ中,Topic对应的是Exchange,在Kafka中,Topic对应的是Topic。
Stream编程注解
注解主要是把上述结构图的组成部分上下关联起来,打通通道。
这样生产者的message数据才能进入mq,mq中的数据才能进入消费者。
| 注解 | 描述 |
|---|---|
| @Input 在消费者工程中使用 |
注解标识输入通道。 通过该输入通道接收到的消息进入应用程序。 |
| @Output 在生产者工程中使用 |
注解标识输出通道。 发布的消息通过该通道离开应用程序。 |
| @StreamListener 在消费者工程中使用,监听message的到来 |
监听队列。 用于消费者的队列的消息接收(消息监听) |
| @EnableBinding | 把Channel和Exchange/Topic绑定在一起 |
Stream消息驱动工程开发
cloud-stream-producer-9090 : 作为生产者端发送消息。cloud-stream-consumer-9091 : 作为消费者端接收消息。cloud-stream-consumer-9092 : 作为消费者端接收消息。
生产者工程
(1)新建子工程模块
在 cloud-parent 下新建子module: cloud-stream-producer-9090
(2)引入坐标依赖
<!--eureka client 客户端依赖引⼊--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eurekaclient</artifactId></dependency><!--spring cloud stream 依赖(rabbit)--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
(3)application.yml添加配置
server:port: 9090spring:application:name: cloud-stream-producercloud:stream:binders: # 绑定MQ服务信息(此处我们是RabbitMQ)lagouRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联type: rabbit # MQ类型,如果是Kafka的话,此处配置kafkaenvironment: # MQ环境配置(⽤户名、密码等)spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 关联整合通道和binder对象output: # output是我们定义的通道名称,此处不能乱改destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)content-type: text/plain # application/json # 消息类型设置,⽐如jsonbinder: lagouRabbitBinder # 关联MQ服务eureka:client:serviceUrl: # eureka server的路径defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/instance:prefer-ip-address: true #使⽤ip注册
(4) 启动类
@SpringBootApplication@EnableDiscoveryClientpublic class StreamProducerApplication9090 {public static void main(String[] args) {SpringApplication.run(StreamProducerApplication9090.class,args);}}
(5) 业务类开发
发送消息接口
public interface IMessageProducer {public void sendMessage(String content);}
发送消息实现类
// Source.class⾥⾯就是对输出通道的定义(这是Spring Cloud Stream内置的通道封装)@EnableBinding(Source.class)public class MessageProducerImpl implements IMessageProducer {// 将MessageChannel的封装对象Source注⼊到这⾥使⽤@Autowiredprivate Source source;@Overridepublic void sendMessage(String content) {// 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)// 使⽤通道向外发出消息(指的是Source⾥⾯的output通道)source.output().send(MessageBuilder.withPayload(content).build());}}
测试类
SpringBootTest(classes = {StreamProducerApplication9090.class})@RunWith(SpringJUnit4ClassRunner.class)public class MessageProducerTest {@Autowiredprivate IMessageProducer iMessageProducer;@Testpublic void testSendMessage() {iMessageProducer.sendMessage("hello world-lagou101");}}
消费者工程
(1)新建子工程模块
(2)引入坐标依赖
(3)application.yml添加配置
server:port: 9091spring:application:name: cloud-stream-consumercloud:stream:binders: # 绑定MQ服务信息(此处我们是RabbitMQ)lagouRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联type: rabbit # MQ类型,如果是Kafka的话,此处配置kafkaenvironment: # MQ环境配置(⽤户名、密码等)spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 关联整合通道和binder对象input: # input是我们定义的通道名称,此处不能乱改destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)content-type: text/plain # application/json # 消息类型设置,⽐如jsonbinder: lagouRabbitBinder # 关联MQ服务eureka:client:serviceUrl: # eureka server的路径defaultZone: http://lagoucloudeurekaservera:8761/eureka/,http://lagoucloudeurekaserverb:8762/eureka/instance:prefer-ip-address: true #使⽤ip注册
(4) 消息消费者监听
@EnableBinding(Sink.class)public class MessageConsumerService {@StreamListener(Sink.INPUT)public void recevieMessages(Message<String> message) {System.out.println("=========接收到的消息:" + message);}}
Stream自定义消息通道
Stream内置了两种接口Source和Sink,分别定义了binding为”input”的输入流和”output”的输出流。
可以自定义输入通道和输出通道。
定义接口
interface CustomChannel {String INPUT_LOG = "inputLog";String OUTPUT_LOG = "outputLog";@Input(INPUT_LOG)SubscribableChannel inputLog();@Output(OUTPUT_LOG)MessageChannel outputLog();}
使用
(1)在@EnableBinding注解中,绑定自定义接口
(2)使用@StreamListener做监听时,指定绑定 CustomChannel.INPUT_LOG
(3)服务配置文件补充bindings信息
bindings:inputLog:destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)outputLog:destination: eduExchange
Stream消息分组
业务场景中,希望主题上的一个Message只能被一个消费者端(多实例)消费处理,就需要通过使用消息分组的方式,来防止一个消费者端的多个实例重复消费消息。
在服务消费者端设置 spring.cloud.stream.bindings.input.group 属性,多个消费者实例配置为同一个group名称(同一个group中的消费者只能有一个可以获取到消费并消费)。
