1、引入 rabbitmq 依赖包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、修改 application.properties 配置
spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=rootspring.rabbitmq.password=root@mqspring.rabbitmq.virtual-host=/# 发送者开启 confirm 确认机制# 监听消息是否 到达 exchangespring.rabbitmq.publisher-confirms=true# 发送者开启 return 确认机制#监听消息是否 没有到达 queuespring.rabbitmq.publisher-returns=true##################################################### 设置消费端手动 ackspring.rabbitmq.listener.simple.acknowledge-mode=manual# 是否支持重试#spring.rabbitmq.listener.simple.retry.enabled=true
3、定义交换机和队列
@Configurationpublic class QueueConfig {/*** 确认队列* @return*/@Bean(name = "confirmTestQueue")public Queue confirmTestQueue() {return new Queue("confirm_test_queue", true, false, false);}/*** 交换机* @return*/@Bean(name = "confirmTestExchange")public FanoutExchange confirmTestExchange() {return new FanoutExchange("confirmTestExchange");}/*** 测试绑定队列和广播交换机* @param confirmTestExchange* @param confirmTestQueue* @return*/@Beanpublic Binding confirmTestFanoutExchangeAndQueue(@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,@Qualifier("confirmTestQueue") Queue confirmTestQueue) {return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);}}
4、回调确认配置
4.1、什么是确认
发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue 的过程中,消息是否成功投递。
- 消息从 producer 到 rabbitmq broker 有一个 confirmCallback 确认模式。
- 消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
4.2、 ConfirmCallback 确认模式
@Slf4jpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {/*** correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。* ack:消息投递到 broker 的状态,true 表示成功。* cause:表示投递失败的原因。* 消息没有到达 exchange 会回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.error("消息发送异常!");} else {log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);}}}
4.3、ReturnCallback 退回模式
//旧版本 RabbitTemplate.ReturnCallback@Slf4jpublic class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {/*** 如果消息未能投递到目标 queue 里将触发回调 returnCallback ,* 一旦向 queue 投递消息未成功,* 这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。* @param returnedMessage*/@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(), returnedMessage.getRoutingKey());}}
4.4、配置 RabbitTemplate
@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate(connectionFactory);//消息投递到 交换机 确认回调template.setConfirmCallback(confirmCallbackService());//消息投递到 队列 确认回调template.setReturnsCallback(returnCallbackService());return template;}
4.5、配置消息过期(可选)
设置消息的过期时间
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties messageProperties = message.getMessageProperties();//设置单个消息过期messageProperties.setExpiration();return message;}});
设置队列过期(超时之后不进入死信)
@Beanpublic Queue newMerchantQueue(){Map<String,Object> args = new HashMap<>(3);//消息过期后,进入到死信交换机args.put("x-dead-letter-exchange","lock_merchant_dead_exchange");//消息过期后,进入到死信交换机的路由keyargs.put("x-dead-letter-routing-key","lock_merchant_routing_key");//过期时间,单位毫秒args.put("x-message-ttl",10000);return QueueBuilder.durable("new_merchant_queue").withArguments(args).build();}
5、消息监听
@Slf4j@Component//监听队列@RabbitListener(queues = "confirm_test_queue")public class ReceiverMessage1 {@RabbitHandlerpublic void processHandler(String msg, Channel channel, Message message) throws IOException {try {log.info("小富收到消息:{}", msg);//TODO 具体业务//手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.error("消息即将再次返回队列处理...");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}}
6、踩坑日志
6.1、不消息确认
开启消息确认机制,消费消息别忘了 channel.basicAck,否则消息会一直存在,导致重复消费。
6.2、消息无限投递
开启消息确认机制,发生异常后将消息重新投入队列, channel.basicNack 是从重新入队列头部。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
解决方案
- 先将消息进行应答,重新发送消息到队列,此时,消息是入队尾
- 优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入 MySQL 并推送报警,进行人工处理和定时任务做补偿。
6.3、重复消费
借助 MySQL、或者 redis 将消息持久化,通过再消息中的唯一性属性校验。
7、延迟队列实现
7.1、方式
- 使用死信队列
- 使用插件 ,需要安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
7.2、死信队列实现
7.2.1、介绍
- RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现
- TTL:RabbitMQ可以对队列和消息各自设置存活时间,规则是两者中较小的值,即队列无消费者连接的消息过期时间,或者消息在队列中一直未被消费的过期时间
- DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息
进入死信队列的情况
- 消息被拒绝(basic.reject / basic.nack),并且requeue = false
- 消息TTL过期。TTL:Time To Live的简称,即过期时间。RabbitMQ可以对消息和队列设置TTL。
-
7.2.2、声明队列
```java /**
- 创建死信交换机
- @return */ @Bean public Exchange lockMerchantDeadExchange(){ return new TopicExchange(“lock_merchant_dead_exchange”,true,false); }
/**
- 创建死信队列
- @return */ @Bean public Queue lockMerchantDeadQueue(){ return QueueBuilder.durable(“lock_merchant_dead_queue”).build(); }
/**
- 绑定死信交换机和死信队列
@return */ @Bean public Binding lockMerchantBinding(){
return new Binding(“lock_merchant_dead_queue”
,Binding.DestinationType.QUEUE,"lock_merchant_dead_exchange","lock_merchant_routing_key",null);
}
/**
* 创建普通交换机* @return*/@Beanpublic Exchange newMerchantExchange(){return new TopicExchange("new_merchant_exchange",true,false);}/*** 创建普通队列* @return*/@Beanpublic Queue newMerchantQueue(){Map<String,Object> args = new HashMap<>(3);//消息过期后,进入到死信交换机args.put("x-dead-letter-exchange","lock_merchant_dead_exchange");//消息过期后,进入到死信交换机的路由keyargs.put("x-dead-letter-routing-key","lock_merchant_routing_key");//过期时间,单位毫秒args.put("x-message-ttl",10000);return QueueBuilder.durable("new_merchant_queue").withArguments(args).build();}/*** 绑定交换机和队列* @return*/@Beanpublic Binding newMerchantBinding(){return new Binding("new_merchant_queue",Binding.DestinationType.QUEUE,"new_merchant_exchange","new_merchant_routing_key",null);}
<a name="yACOr"></a>##### 7.2.3、监听队列```java@Slf4j@Component//监听队列@RabbitListener(queues = "lock_merchant_dead_queue")public class DelayReceiverMessageHandler {@RabbitHandlerpublic void processHandler(String msg, Channel channel, Message message) throws IOException {try {log.info("小富收到消息:{}", msg);//TODO 具体业务log.info("队列延迟的时间 {} ",message.getMessageProperties().getDelay());log.info("接受时间 {}", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));//手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.error("消息即将再次返回队列处理...");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}}
7.2.4、发送
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties messageProperties = message.getMessageProperties();//设置单个消息过期(优先级高于队列的过期时间配置)messageProperties.setExpiration("6000");return message;}});
