生产端消息确认
tx机制
tx机制叫做事务机制,RabbitMQ中有三个与tx机制的方法:
txSelect()、txCommit()、txRollback()
channel.txSelect()用于将当前channel设置成transaction模式channel.txCommit()提交事务channel.txRollback()回滚事务
使用tx机制,首先要通过txSelect方法开启事务,然后发布消息给broker服务器,如果txCommit提交成功,则说明消息成功被broker接收;如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候可以捕获异常,通过
txRollback回滚事务
// 创建名称为hello的队列channel.QueueDeclare("hello", false, false, false, null);for (int i = 0; i < 5; i++){// 构建消息数据包byte[] body = Encoding.UTF8.GetBytes(i.ToString());try{// 开启事务机制channel.TxSelect();// 消息发送channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: null,body: body);// 事务提交channel.TxCommit();}catch (Exception ex){channel.TxRollback();Assert.Fail(ex.Message);}}
Confirm模式
C#的RabbitMQ API中,有三个与Confirm相关的方法:
ConfirmSelect()、WaitForConfirms()、WaitForConfirmOrDie
channel.ConfirmSelect()表示开启Confirm模式channel.WaitForConfirms()等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回falsechannel.WaitForConfirmsOrDie()和WaitForConfirms作用类型,也是等待所有消息确认。区别在于该方法没有返回值(Void),如果有任意一条消息没有被成功接收,该方法会立即抛出OperationInterrupedException类型异常
channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);string message = "hello world";byte[] messageBody = Encoding.UTF8.GetBytes(message);// 开启Confirm模式channel.ConfirmSelect();// 消息发送channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: null,body: messageBody);// WaitForConfirms确认消息(可以同时确认多条消息)是否发送成功if (channel.WaitForConfirms()){Console.WriteLine($"Message发送成功");}else{Assert.Fail();}
消费端消息确认
自动确认
当RabbbitMQ将消息发送给消费者后,消费者接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。自动确认的用法十分简单,设置消费方法的参数autoAck为true即可
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{string message =Encoding.UTF8.GetString(ea.Body.ToArray());};channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
可能存在的问题
- 丢失数据:Broker会在接收到确认回执时删除消息,如果消费者接收到消息并返回了确认回执,然后这个消费者在处理消息时挂了,那么这条消息就再也找不回来了
- 只要队列不空,RabbitMQ会源源不断的把消息推送给客户端,而不管客户端能否消费的完,如果其中一个消费端消费的较慢,会极大的浪费性能
手动确认(BasicAck)
消费从队列中获取消息后,服务器会将该消息处于不可用状态,等待消费者反馈。
Resume方法的参数autoAck设置为false,然后在消费端使用代码channel.BasicAck()/BasicReject()等方法来确认和拒绝消息即可实现手动确认
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{string message =Encoding.wUTF8.GetString(ea.Body.ToArray());// 手动ackchannel.BasicAck(deliveryTag: ea.DeliveryTag,multiple: false);};channel.BasicConsume(queue: "hello",autoAck: false,consumer: consumer);
改为手动确认方式只需改两处
- 开启监听时将
autoAck参数改为false - 消息消费成功后返回确认
这段代码中,先处理消息,完成后,再做ack响应,失败就不做ack响应,这样消息会储存在MQ的Unacked消息里,不会丢失,看起来没啥问题,但是如果其中一条消息在处理时抛出了异常,将导致后续所有消息都会无法消费
消息拒绝
BasicNack()
与
BasicReject()不同的是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息
EventingBasicConsumer consumer =new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{string message =Encoding.UTF8.GetString(ea.Body.ToArray());try{/* 消费到某条消息时出错* 导致Broker无法拿到正常回执信息引发后续消息都无法被正常消费* 如果MQ没得到ack响应,这些消息会堆积在Unacked消息里,不会丢弃,直至客户端断开重连时,才变回ready* 如果Consumer客户端不断开连接,这些Unacked消息,永远不会变回ready状态* Unacked消息多了,占用内存越来越大,就会异常*/MessageConsumer(ea);channel.BasicAck(deliveryTag: ea.DeliveryTag,multiple: false);}catch (Exception ex){// 出错了,发nack,并通知MQ把消息塞回的队列头部(不是尾部)channel.BasicNack(deliveryTag: ea.DeliveryTag,multiple: false,requeue: true);}};channel.BasicConsume(queue: "hello",autoAck: false,consumer: consumer);
这里将代码调整为消费正常就ack,不正常就nack,并等下一次重新消费。看起来没问题,但是如果某条消息在消费时又抛出异常,该消息将会被Nack机制重新扔回队列头部,下一步又消费这条会出异常的消息,又出错,塞回队列……进入死循环,所以要谨慎使用Nack机制。这里可以在catch中记录错误日志依旧使用ack确认消费
BasicReject()
消费端告诉服务器这个消息拒绝接收,不处理,可以设置是否放回到队列中还是丢掉(只能一次拒绝一个消息)
MessagePublisher("hello", $"1");MessagePublisher("hello", $"2");MessagePublisher("hello", $"3");channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);EventingBasicConsumer consumer =new EventingBasicConsumer(channel);channel.BasicQos(0, 1, false);consumer.Received += (model, ea) =>{string message =Encoding.UTF8.GetString(ea.Body.ToArray());if (message == "2"){Console.WriteLine($"Message:{message}");channel.BasicAck(deliveryTag: ea.DeliveryTag,multiple: false);}else{Console.WriteLine($"拒绝处理");/* BasicReject用于拒绝消息requeue参数指定了拒绝后是否重新放回queue一次只能拒绝一条消息设置为true: 消息会被重新仍回queue中设置为false:消息将被丢弃*/channel.BasicReject(deliveryTag: ea.DeliveryTag,requeue: true);}};channel.BasicConsume(queue: "hello",autoAck: false,consumer: consumer);
BasicRecover()
路由不成功的消息可以使用
recovery重新发送到队列中,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己
消息持久化/优先级
Persistent
| 参数 | 重启RabbitMQ |
|---|---|
| exchange.durable=fasle/queue.durable=false | exchange/queue将会被丢弃 |
| exchange.durable=fasle | exchange将会被丢弃 |
| queue.durable=fasle | queue将会被丢弃 |
| exchange.durable=fasle/queue.durable=true | exchange将会被丢弃,queue虽然会存在,但队列内消息会全部丢失 |
| exchange.durable=true/queue.durable=true | exchange/queue会存在,但队列内消息会全部丢失 |
| exchange.durable=true&&queue.durable=true/消息发布时(persistent=true) | 消息真正的持久化 |
for (int i = 0; i < 100; i++){byte[] messageBody = Encoding.UTF8.GetBytes(i.ToString());// 设置消息持久化var props = channel.CreateBasicProperties();props.Persistent = true;// 消息发送channel.BasicPublish(exchange: "TestExchange",routingKey: "",basicProperties: props,body: messageBody);}
Priority
queue是先进先出的,即先发送的消息,先被消费。但是在具体业务中可能会遇到要提前处理某些消息的需求,如一个常见的需求:普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。消息实现优先级控制的实现方式是:首先在声明queue是设置队列的
x-max-priority属性,然后在publish消息时,设置消息的优先级等级即可
生产者
// 声明交换机exchangchannel.ExchangeDeclare(exchange: "myexchange",type: ExchangeType.Fanout,durable: true,autoDelete: false,arguments: null);// 声明队列queuechannel.QueueDeclare(queue: "myqueue",durable: true,exclusive: false,autoDelete: false,arguments: new Dictionary<string, object>() {//队列优先级最高为10,不加x-max-priority的话,消息发布时设置了消息的优先级也不会生效{"x-max-priority",10 }});// 绑定exchange和queuechannel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "mykey");Console.WriteLine("生产者准备就绪....");// 测试数据string[] msgs = { "vip1", "hello1", "hello2", "hello3", "vip5" };// 设置消息优先级var props = channel.CreateBasicProperties();foreach (string msg in msgs){// vip开头的消息,优先级设置为9if (msg.StartsWith("vip")){props.Priority = 9;channel.BasicPublish(exchange: "myexchange",routingKey: "mykey",basicProperties: props,body: Encoding.UTF8.GetBytes(msg));}// 其他消息优先级为1else{props.Priority = 1;channel.BasicPublish(exchange: "myexchange",routingKey: "mykey",basicProperties: props,body: Encoding.UTF8.GetBytes(msg));}
消费端
EventingBasicConsumer consumer =new EventingBasicConsumer(channel);// 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{string message =Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"Message:{message}");Assert.IsNotNull(message);};channel.BasicConsume(queue: "myqueue",autoAck: true,consumer: consumer);

