生产者确认
事务机制 :发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降
RabbitMQ提供了事务机制保证消息投递,RabbitMQ客户端中与事务机制相关的方法有三个:
- channel.txSelect : 将当前的channel通道设置为事务模式;
- channel.txCommit :用于提交事务;
- channel.txRollback :用于事务回滚;
但是使用事务会大大降低RabbitMQ的性能,在一些较小的吞吐量情况下,也可以采用事务方式,具体情况视各自的系统来决定,这里仅以一段代码来让大家了解事务的机制
try {channel.txSelect();channel.basicPublish(exchange , routingKey ,MessageProperties.PERSISTENT_TEXT_PLAIN , msg.getBytes());int result = 1 / 0 ;channel.txCommit();}catch (Exception e) {e.printStackTrace();channel.txRollback();}
生产者确认机制:
- 生产者将Channel设置成Confirm模式,当设置Confirm模式后所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始,ID在同个Channel范围是唯一的),一旦消息被投递到所有匹配的队列之后Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
- 如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出;
- RabbitMQ回调消息的deliveryTag包含了确认消息的ID,此外RabbitMQ也可以设置channel.basicAck 方法中的multiple参数,表示到这个序号之前的所有消息都己经得到了处理;稍后介绍handleNack 和 handleAck两个方法我们再举个说明;
- confirm的机制是异步的,如果消息成功发送,会返回ack消息供异步处理,如果消息发送失败发生异常,也会返回nack消息,confirm的时间没有明确说明,并且同一个消息只会被confirm一次;
接下来介绍两种confirm方法
- 批量confirm方法 : 每发送一批消息后,调用channel.waitForConfirms方法,等待服务器的确认返回;
先看代码样例,注意看注释
//开启confirm模式channel.confirmSelect();//模拟发送50条消息for(int i =0;i<1000;i++){String message = "Hello World RabbitMQ";//发送消息channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());//每发送2条判断一次是否回复if(i%2==0){//waitForConfirms可以换成带有时间参数的方法waitForConfirms(Long mills)指定等待响应时间if(channel.waitForConfirms()){System.out.println("Message send success.");}}}
批量的方法从数量级上降低了confirm的性能消耗,提高了效率,但是批量confmn方式的问题在于遇到RabbitMQ服务端返回Basic.Nack 需要重发批量消息而导致的性能降低
- 异步confirm方法(推荐) :提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理;
依旧还是先看代码:生产者
import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmListener;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class ConfirmProducer {public static void main(String[] args) throws Exception {//1 创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("huang");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//2 创建ConnectionConnection connection = connectionFactory.newConnection();//3 创建ChannelChannel channel = connection.createChannel();//4 指定我们的消息投递模式: 消息的确认模式channel.confirmSelect();//5 声明交换机 以及 路由KEYString exchangeName = "test_confirm_exchange";String queueName = "test_confirm_queue";//指定类型为topicString exchangeType = "topic";String routingKey = "confirm.send";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);//表示声明了一个队列channel.queueDeclare(queueName, true, false, false, null);//建立一个绑定关系:channel.queueBind(queueName, exchangeName, routingKey);//6 发送一条消息String msg = "Test Confirm Message";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());//7 添加确认监听channel.addConfirmListener(new ConfirmListener(){@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.err.println("收到NACK应答");}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.err.println("收到ACK应答");}});}}
消费者:
public class ConfirmConsumer {public static void main(String[] args) throws Exception {//1 创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory() ;connectionFactory.setHost("192.168.1.28");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("toher");connectionFactory.setPassword("toher888");//2 创建ConnectionConnection connection = connectionFactory.newConnection();//3 创建ChannelChannel channel = connection.createChannel();//4 声明String exchangeName = "test_confirm_exchange";//指定类型为topicString exchangeType = "topic";String queueName = "test_confirm_queue";//因为*号代表匹配一个单词,生产者中routingKey3将匹配不到String routingKey = "confirm.*";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);//表示声明了一个队列channel.queueDeclare(queueName, true, false, false, null);//建立一个绑定关系:channel.queueBind(queueName, exchangeName, routingKey);//5 创建消费者Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String msg = new String(body, "UTF-8");System.out.println("消费端:" + msg);}};//参数:队列名称、是否自动ACK、Consumerchannel.basicConsume(queueName, true, consumer);}}
运行效果:

从上面代码我们可以看到有重写了ConfirmListener两个方法:handleNack 和 handleAck,分别用来处理RabbitMQ 回传的Basic.Nack和Basic.Ack;
它们都有两个参数:
- long deliveryTag : 前面介绍确认消息的ID
- boolean multiple : multiple 是否批量 如果是True 则将比该deliveryTag小的所有数据都移除 否则只移除该条;
我们简单的用一个数组来说明 [1,2,3,4]存储着4条消息ID , 此时确认消息返回的是 deliveryTag = 3 ,multiple = true那么RabbitMQ会通知我们小于ID3的消息得到确认了,如果multiple = false, 就通知我们ID3的确认了
我们再用修改一下上面的代码看一下
//声明一个用来记录消息唯一ID的有序集合SortedSetfinal SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());//开启confirm模式channel.confirmSelect();//异步监听方法 处理ack与nack方法channel.addConfirmListener(new ConfirmListener() {//处理ack multiple 是否批量 如果是批量 则将比该条小的所有数据都移除 否则只移除该条public void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {confirmSet.headSet(deliveryTag).clear();} else {confirmSet.remove(deliveryTag);}}//处理nack 与ack相同public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);if (multiple) {confirmSet.headSet(deliveryTag).clear();} else {confirmSet.remove(deliveryTag);}}});
以上代码按照每一个comfirm的通道维护一个集合,每发送一条数据,集合增加一个元素,每异步响应一条ack或者nack的数据,集合删除一条。SortedSet是一个有序的集合,它的有序是值大小的有序,不是插入时间的有序。JDK中waitForConfirms()方法也是使用了SortedSet集合
[
](https://blog.csdn.net/lhmyy521125/article/details/88064322)
消息确认回调
配置yml
spring:rabbitmq:username: guestpassword: guesthost: localhostport: 5672#消息确认配置项publisher-returns: true #确认消息已发送到队列(Queue)publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
配置相关的消息确认回调函数 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitConfig { private Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
// 创建一个模版,绑定的是connectionFactory这个工厂@Beanpublic RabbitTemplate createRabbitTemplate(CachingConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);// 消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {logger.info("ConfirmCallback:"+"相关数据:"+correlationData+","+"确认情况:"+ack+","+"原因:"+cause);}});// 消息未能投递到目标 queue 里将触发回调 returnCallbackrabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.info("ReturnCallback:"+"消息:"+message+","+"回应码:"+replyCode+","+"回应信息:"+replyText+","+"交换机:"+exchange+","+"路由键:"+routingKey);}});return rabbitTemplate;}
}
两个回调函数ConfirmCallback 、 RetrunCallback在什么情况会触发1. 消息推送到server,但是在server里找不到交换机写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的)```shell@GetMapping("/TestMessageAck")public String TestMessageAck() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: non-existent-exchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);return "ok";}
调用接口,查看控制台输出情况(原因里面有说,没有找到交换机’non-existent-exchange’)
结论:这种情况触发的是ConfirmCallback 回调函数
- 消息推送到server,找到交换机了,但是没找到队列
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作
@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}
写个测试接口,把消息推送到名为lonelyDirectExchange的交换机上(这个交换机是没有任何队列配置的)
@GetMapping("/TestMessageAck2")public String TestMessageAck2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: lonelyDirectExchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);return "ok";}
调用接口,查看项目的控制台输出情况
可以看到这种情况,两个函数都被调用了;
这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。
- 消息推送到sever,交换机和队列啥都没找到
这种情况其实一看就觉得跟①很像,没错 ,③和①情况回调是一致的,所以不做结果说明了。
结论: 这种情况触发的是 ConfirmCallback 回调函数。
- 消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,可以看到控制台输出:
结论: 这种情况触发的是 ConfirmCallback 回调函数。
消息确认机制
和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:
- 自动确认
这也是默认的消息确认情况。 AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
- 手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
- basic.ack用于肯定确认
- basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
- basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。而basic.nack,basic.reject表示没有被正确处理
reject
着重讲下reject,因为有时候一些场景是需要重新入列的。
channel.basicReject(deliveryTag, true); 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。
使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。
但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。
nack
这个也是相当于设置不消费某条消息。
channel.basicNack(deliveryTag, false, true);
- 第一个参数依然是当前消息到的数据的唯一id;
- 第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
- 第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。
实践
在消费者项目里,改为手动确认模式
修改yml文件,添加一下配置
spring:rabbitmq:listener:direct:acknowledge-mode: manualsimple:acknowledge-mode: manual
2.添加配置文件
@Configurationpublic class RabbitConfig {private Logger logger = LoggerFactory.getLogger(RabbitConfig.class);// 手动确认信息// 方法一@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setConnectionFactory(connectionFactory);return factory;}// 方法二@Beanpublic DirectRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory){DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setConnectionFactory(connectionFactory);return factory;}}
在消费者项目里,新建MessageListenerConfig.java上添加代码相关的配置代码 ```java package com.example.demo.rabbit.config;
import com.example.demo.rabbit.consumer.MyAckReceiver; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class MessageListenerConfig {
@Autowiredprivate CachingConnectionFactory connectionFactory;@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);// RabbitMQ默认是自动确认,这里改为手动确认消息container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置一个队列container.setQueueNames("TestDirectQueue");//如果同时设置多个如下: 前提是队列都是必须已经创建存在的// container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues//container.setQueues(new Queue("TestDirectQueue",true));//container.addQueues(new Queue("TestDirectQueue2",true));//container.addQueues(new Queue("TestDirectQueue3",true));return container;}
}
- 手动确认消息监听类,需要手动确认```javaimport com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.io.IOException;@Componentpublic class Consumer {private Logger logger = LoggerFactory.getLogger(Consumer.class);@RabbitHandler@RabbitListener(queues = "queue-direct")public void direct1(String msg, Channel channel, Message message) throws IOException {logger.info("consume direct 1:"+msg);MessageProperties messageProperties= message.getMessageProperties();long deliveryTag = messageProperties.getDeliveryTag();try {logger.info("message from exchange:"+messageProperties.getReceivedExchange()+",queue:"+messageProperties.getConsumerQueue()+",routingKey:"+messageProperties.getReceivedRoutingKey());//第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息channel.basicAck(deliveryTag,false);}catch (Exception e){// 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝channel.basicReject(deliveryTag, false);logger.error(e.getMessage());}}}
