死信队列
producer将消息投递到broker或者直接到queue里,consumer从queue取出消息进行消费,但由于某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息没有后续处理就变成了死信,死信交换机是直接交换机。为了保证业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发送异常时,将消息投递到死信队列中。死信队列消息来源:1.消息TTL过期2.队列达到最大长度3.消息被否定应答nack
Provider
//普通交换机private static final String NORMAL_EXCHANGE = "normal-exchange";public static void main(String[] args) { //获取连接 Connection connection = RabbitMqUtils.getConnection(); Channel channel = null; try { //获取信道 channel = connection.createChannel(); //发送死信消息 设置TTL时间(毫秒) AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build(); for (int i = 0; i < 10; i++) { channel.basicPublish(NORMAL_EXCHANGE, "key1", properties, ("msg" + i).getBytes()); } } catch (IOException e) { e.printStackTrace(); } finally { RabbitMqUtils.close(connection, channel); }}
ConsumerA
//普通交换机private static final String NORMAL_EXCHANGE = "normal-exchange";//死信交换机private static final String DEAD_EXCHANGE = "dead-exchange";//普通队列private static final String NORMAL_QUEUE = "normal-queue";//死信队列private static final String DEAD_QUEUE = "dead-queue";public static void main(String[] args) { //获取信道 Channel channel = RabbitMqUtils.getChannel(); try { //声明普通交换机和死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 Map<String, Object> arguments = new HashMap<>(); //1.设置队列长度[超出最大长度的消息投递到死信队列] //arguments.put("x-max-length", 6); //2.指定过期之后死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //3.设置死信RoutingKey arguments.put("x-dead-letter-routing-key", "key2"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //声明死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定交换机和队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "key1"); channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "key2"); //消费消息 DeliverCallback deliverCallback = (consumerTag, message) -> { //拒绝msg5消息nack if ("msg5".equals(new String(message.getBody()))) { System.err.println(new String(message.getBody()) + "被拒绝了"); //获取消息的标签进行拒绝,不重新放回普通队列中 channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println(new String(message.getBody())); //手动确认应答 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; CancelCallback cancelCallback = message -> { }; System.out.println("ConsumerA准备就绪..."); channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); } catch (IOException e) { e.printStackTrace(); }}
ConsumerB
//死信队列private static final String DEAD_QUEUE = "dead-queue";public static void main(String[] args) { //获取信道 Channel channel = RabbitMqUtils.getChannel(); try { //消费消息 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; CancelCallback cancelCallback = message -> { }; System.out.println("ConsumerA准备就绪..."); channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback); } catch (IOException e) { e.printStackTrace(); }}
Spring Boot for RabbitMQ
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
# 应用名称spring.application.name=spring-boot-rabbitmq# rabbitmq配置spring.rabbitmq.host=47.172.193.131spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123
TTL死信队列
队列内部是有序的,最重要的特性就体现在它的延迟属性上,延迟队列中的元素是希望在指定时间到了以后或之前取出和进行处理,延迟队列就是用来存放需要在指定时间被处理的元素队列。#应用场景未支付订单在10分钟后自动取消
TTLQueueConfig
@Configurationpublic class TTLQueueConfig { //普通交换机名称 private static final String X_EXCHANGE = "X"; //死信交换机名称 private static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列名称 private static final String QUEUE_A = "QA"; private static final String QUEUE_B = "QB"; private static final String QUEUE_C = "QC"; //死信队列名称 private static final String DEAD_LETTER_QUEUE_D = "QD"; //声明普通交换机 @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } //声明死信交换机 @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //声明普通队列 @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); //设置过期时间TTL ms arguments.put("x-message-ttl", 10000); return QueueBuilder .durable(QUEUE_A) .withArguments(arguments) .build(); } //声明普通队列 @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); //设置过期时间TTL ms arguments.put("x-message-ttl", 40000); return QueueBuilder .durable(QUEUE_B) .withArguments(arguments) .build(); } //非延迟队列 @Bean("queueC") public Queue queueC() { Map<String, Object> arguments = new HashMap<>(2); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key", "YD"); return QueueBuilder .durable(QUEUE_C) .withArguments(arguments) .build(); } //声明死信队列 @Bean("queueD") public Queue queueD() { return QueueBuilder .durable(DEAD_LETTER_QUEUE_D) .build(); } //绑定 @Bean public Binding queueABindX() { return BindingBuilder.bind(queueA()).to(xExchange()).with("XA"); } @Bean public Binding queueBBindX() { return BindingBuilder.bind(queueB()).to(xExchange()).with("XB"); } @Bean public Binding queueCBindX() { return BindingBuilder.bind(queueC()).to(xExchange()).with("XC"); } @Bean public Binding queueDBindY() { return BindingBuilder.bind(queueD()).to(yExchange()).with("YD"); }}
Provider
@GetMapping("/send")public void send() { log.info("当前时间:{},发送一条消息", new Date().toString()); rabbitTemplate.convertAndSend("X", "XA", "msg-10"); log.info("当前时间:{},发送一条消息", new Date().toString()); rabbitTemplate.convertAndSend("X", "XB", "msg-40");}//在消息生产端设置消息过期时间可以自由控制消息过期时间@GetMapping("/sendForTime/{time}")public void sendForTime(@PathVariable("time") String time) { log.info("当前时间:{},发送一条消息", new Date().toString()); rabbitTemplate.convertAndSend("X", "XC", "msg-" + Integer.parseInt(time) / 1000, msg -> { msg.getMessageProperties().setExpiration(time); return msg; });}
Consumer
@Slf4j@Componentpublic class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message) { log.info("当前时间{},收到死信队列消息为{}", new Date().toString(), new String(message.getBody())); }}
基于延迟交换机
自定义延迟队列问题在生产端设置消息过期时间很灵活,但如果第一个消息耗时TTL是20秒,第二个消息TTL是2秒,那么第二个消息会在第一个消息进入死信队列后才能进入,会造成消息阻塞问题。
延迟交换机安装
sumv rabbitmq_delayed_message_exchange-3.8.0.ez \/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/rabbitmq-plugins enable rabbitmq_delayed_message_exchange#安装完毕后交换机类型多了一个x-delayed-message
DelayedQueueConfig
@Configurationpublic class DelayedQueueConfig { //延迟交换机名称 private static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; //队列 private static final String DELAYED_QUEUE_NAME = "delayed.queue"; //routingKey private static final String DELAYED_ROUTING_KEY = "delayed.routingKey"; //声明延迟交换机 - 自定义交换机 @Bean("customExchange") public CustomExchange customExchange() { /* * 1.交换机名称 * 2.交换机类型 * 3.是否需要持久化 * 4.是否需要自动删除 * 5.参数 */ Map<String, Object> arguments = new HashMap<>(1); arguments.put("x-delayed-type", "direct");//延迟类型 return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", false, false, arguments); } //声明队列 @Bean("queue") public Queue queue() { return new Queue(DELAYED_QUEUE_NAME); } //绑定 @Bean public Binding delayedQueueBindingDelayedExchange() { return BindingBuilder.bind(queue()).to(customExchange()).with(DELAYED_ROUTING_KEY).noargs(); }}
Provider
//基于延迟消息交换机的TTL队列@GetMapping("/plugins/{time}")public void plugins(@PathVariable("time") String time) { log.info("当前时间{},发送一条消息", new Date().toString()); rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", "msg-" + Integer.parseInt(time) / 1000, msg -> { msg.getMessageProperties().setDelay(Integer.parseInt(time)); return msg; });}
Consumer
@Slf4j@Componentpublic class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message) { log.info("当前时间{},收到死信队列消息为{}", new Date().toString(), new String(message.getBody())); }}