介绍
:::tips 因为延迟队列的需求非常多,所以RabbitMQ官方推出了DelayExchange插件,原生支持延迟队列效果
使用DelayExchange插件需要将一个交换机声明为delayed类型,当我们发送消息到delayExchange时,流程如下:
- 接收消息
- 判断消息是否具备x-delay属性
- 如果消息中有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
:::
安装
下载
:::tips RabbitMQ官方插件社区:社区地址,里面包含各种各样的插件,其中就包括DelayExchange插件:插件地址,对应RabbitMQ 3.8.5以上的版本
下载好之后,将这个文件上传到RabbitMQ容器的插件目录 :::
进入容器
:::tips 上传之后,进入容器内部来安装这个插件 :::
docker exec -it RabbitMQ容器名 bash
开启插件
:::tips 进入容器之后,执行下面的命令来开启插件 :::
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
发送消息
:::tips 消息生产者在发送消息时需要携带x-delay属性,指定消息的延迟时间 :::
@SpringBootTestpublic class MyTest{//注入RabbitTemplate对象@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){//指定交换机名称String exchangeName = "exchange.delay";//指定路由keyString routingKey = "delay";//创建消息Message message = MessageBuilder.withBody("这是一条测试消息".getBytes(StandardCharsets.UTF_8)).setHeader("x-delay", 10000) //设置消息延迟时间,单位:毫秒.build();//发送消息rabbitTemplate.convertAndSend(exchangeName, routingKey, message);}}
接收消息
:::tips 在消息消费者中声明交换机的类型为DelayExchange类型 :::
基于注解方式
//将这个类注册到Spring容器中@Componentpublic class RabbitMqListener{//声明并绑定队列和交换机,同时监听队列中的消息@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue"),exchange = @Exchange(name = "exchange.delay", delayed = "true"),key = "delay"))public void listen(String msg){System.out.println("接收到消息:" + msg);}}
基于@Bean方式
@Configurationpublic class XxxConfig{//声明DelayExchange类型的交换机@Beanpublic DirectExchange delayedExchange(){return ExchangeBuilder.directExchange("exchange.delay") //指定交换机的类型和名称.delayed() //指定delay属性为true.durable(true) //设置为持久化队列.build();}}
