1 引言
Stream就是在消息队列的基础上,对其进行封装,让咱们更方便的去操作MQ消息队列。
2 Stream快速入门
启动RabbitMQ
消费者-导入依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
消费者-配置文件
spring:# 连接RabbitMQrabbitmq:host: 192.168.199.109port: 5672username: testpassword: testvirtual-host: /test
消费者-监听的队列
public interface StreamClient {@Input("myMessage")SubscribableChannel input();}//-------------------------------------------------@Component@EnableBinding(StreamClient.class)public class StreamReceiver {@StreamListener("myMessage")public void msg(Object msg){System.out.println("接收到消息: " + msg);}}
生产者-导入依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
生产者-配置文件
spring:# 连接RabbitMQrabbitmq:host: 192.168.199.109port: 5672username: testpassword: testvirtual-host: /test
生产者-发布消息
public interface StreamClient {@Output("myMessage")MessageChannel output();}//---------------------------------------------- 在启动类中添加注解 @EnableBinding(StreamClient.class)@Autowiredprivate StreamClient streamClient;@GetMapping("/send")public String send(){streamClient.output().send(MessageBuilder.withPayload("Hello Stream!!").build());return "消息发送成功!!";}
3 Stream重复消费问题
只需要添加一个配置,指定消费者组
spring:cloud:stream:bindings:myMessage: # 队列名称group: customer # 消费者组
4 Stream的消费者手动ack
编写配置
spring:cloud:stream:# 实现手动ACKrabbit:bindings:myMessage:consumer:acknowledgeMode: MANUAL
修改消费端方法
@StreamListener("myMessage")public void msg(Object msg,@Header(name = AmqpHeaders.CHANNEL) Channel channel,@Header(name = AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {System.out.println("接收到消息: " + msg);channel.basicAck(deliveryTag,false);}
