使用 Spring boot 启动的 amqp 包实现手动配置队列、交换机、并使用延迟队列来实现一个回调策略功能
boot 版本:2.4.4
依赖包为
implementation 'org.springframework.boot:spring-boot-starter-amqp'
背景
之前用 Spring cloud stream-rabbit,在大部分的场景下,简单配置配置就可以使用了,比较方便,没有怎么用过原生的,如今不在 cloud 环境下,仅仅是 boot 场景下,使用方式又不一样了
本次要实现的需求有:
- 同项目发送消息到 mq,同项目再自己消费 mq 消息
- 实现延迟队列,来实现 http 回调重试逻辑
这里同项目也要丢到 mq 中的考虑:生成消息可能远远大于消费消息的速度,另外数据库中未保存消息,一方面使用 mq 来充当持久化机制,保证项目重启数据不丢失,另一方面充当齿轮保证消费消息的稳定性
同项目产生消息和消费消息
这里演示的配置是最简单的,直接是队列,没有交换机
首先配置 mq 的数据源
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:prefetch: 2
listener.simple.prefetch :每个消费线程获取消息的数量(每个使用者可以处理的未确认消息的最大数量),默认是好像是 250,大概上它的作用
产生消息
@Autowiredprivate RabbitTemplate rabbitTemplate;rabbitTemplate.convertAndSend(Names.QUEUE, JSON.toJSONString(params));
消费消息
public static final String QUEUE = "send.mail";/*** <pre>* 并发消费者数量,m-n* m: 最小消费者数量* n:最大消费者数量* </pre>*/public static final String CONCURRENCY = "3-7";/**声明一个队列*/@Beanpublic Queue sendMail() {return new Queue(QUEUE);}// 监听队列配置@RabbitListener(queues = QUEUE, // 监听队列concurrency = CONCURRENCY, // 配置消费者数量,也就是并发线程数量ackMode = "AUTO" // ack 模式,其他可选枚举看这个 listener 的注释,上面写得有)public void process(String message) {NoticeMailRequest request = JSON.parseObject(message, NoticeMailRequest.class);log.debug("队列开始处理:{}", request);try {// 处理消息} catch (Exception e) {// 看你自己的业务,如果有异常不捕获,会被 aop 判定为消息处理失败,不会 ack}}
延迟队列
实现思路
延迟队列实现的思路:
- 创建一个正常的队列,但是要设置过期时间,绑定到 死信队列的交换机上
- 创建一个死信队列
- 程序只消费死信队列
这样一来,当一个消息进入到正常队列中,当消息超时之后,就会自动进入死信队列,就可以被程序消费,达到延迟消费的效果,但是需要知道: RabbitMQ 的 ttl 消息生存时间,可以在队列上设置,还可以在消息上设置,但是强烈建议设置在队列上
比如:a 消息是超时 1 分钟,b 消息是超时 30 秒,a 先进队列,b 后进队列,那么你会发现,就算 30 秒到了,b 消息也不会进入到死信队列中,而是等待 a 消息出来之后,跟着出来的
本次要实现的延迟队列功能背景如下:接受第三方系统调用 API 后,结果会通过 HTTP 方式异步回调会第三方系统,要保证在如下的规则中回调
- 有回调结果时:立即回调给第三方系统
- 如果回调第三方系统异常,每次重试间隔时间翻倍;假如所有回调都是错误的,那么将在第:0 、1、2、4、8、16、32、64 分钟时,进行回调操作,也就是说,加上正常回调的那一次,每一次的通知回调,最多调用 8 次,总共用时 64 分钟
关于回调策略问题,你可以自己定制,使用一些小算法进行,下面是具体的实现思路:
- 创建一个正常的队列:用来接收第一次的正常回调,目的是:及时回调,而不是都要等待 1 分钟后再回调
- 创建一个延迟队列,也就是将消息存活期设置成 1 分钟,目的是:当正常队列回调失败后,就投递到该队列中
- 创建一个死信队列
- 延迟队列绑定该死信队列的交换机
- 程序也消费该死信队列,如果还是回调失败,则手动将消息投递到延迟队列中
在死信队列中的处理需要注意:由于延迟队列设置成 1 分钟过期一次,那么也就是说,死信队列中的消费者,一分钟会消费到消息,这个时候还需要根据回调的时间间隔策略,做一定的判定,如果没有到达间隔,则再次投递回延迟队列中,因为所有消息都是 1 分钟过期,也就保证了一分钟后,只要消费者足够,就能如期消费到消息
具体代码
首先声明 3 个队列的交换机和队列,与绑定关系
package cn.mrcode.httpcallback;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/*** @author mrcode*/@Configurationpublic class HttpCallbackMqConfig {//普通队列名称public static final String NORMAL_QUEUE = "http.callback";//普通交换机名称public static final String NORMAL_EXCHANGE = "http.callback";//延迟队列名称public static final String DELAY_QUEUE = "http.callback.delay";//延迟交换机名称public static final String DELAY_EXCHANGE = "http.callback.delay";// 延迟间隔,单位秒public static final Integer DELAY_INTERVAL = 10 * 6;//死信队列名称public static final String DEAD_QUEUE = "http.callback.dlq";//死信交换机名称public static final String DEAD_EXCHANGE = "http.callback.dlx";/*** 普通:队列** @return*/@Beanpublic Queue normalQueue() {return new Queue(NORMAL_QUEUE, true, false, false, null);}/*** 普通:交换机** @return*/@Beanpublic DirectExchange normalExchange() {//交换机名 是否持久化 是否自动删除return new DirectExchange(NORMAL_EXCHANGE, true, false);}/*** 普通:绑定交换机和队列** @return*/@Beanpublic Binding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("");}/*** 延迟:队列** @return*/@Beanpublic Queue delayQueue() {Map<String, Object> map = new HashMap<>();//绑定死信交换机map.put("x-dead-letter-exchange", DEAD_EXCHANGE);//绑定 key// map.put("x-dead-letter-routing-key", DeadKey);// 设置超时时间, 单位毫秒map.put("x-message-ttl", DELAY_INTERVAL * 1000); // 1 分钟//设置队列长度// map.put("x-max-length",5);//队列名 是否持久化 是否排他 是否自动删除 其他参数return new Queue(DELAY_QUEUE, true, false, false, map);}/*** 延迟:交换机** @return*/@Beanpublic DirectExchange delayExchange() {//交换机名 是否持久化 是否自动删除return new DirectExchange(DELAY_EXCHANGE, true, false);}/*** 延迟:绑定交换机和队列** @return*/@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("");}/*** 死信:队列** @return*/@Beanpublic Queue deadQueue() {//队列名 是否持久化 是否排他 是否自动删除 其他参数return new Queue(DEAD_QUEUE, true, false, false, null);}/*** 死信交换机** @return*/@Beanpublic DirectExchange deadExchange() {//交换机名 是否持久化 是否自动删除return new DirectExchange(DEAD_EXCHANGE, true, false);}/*** 死信:绑定** @return*/@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("");}}
回调处理逻辑
package cn.mrcode.httpcallback;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import cn.mrcode.BaseMqResponse;import cn.mrcode.NoticeMailRequest;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;import java.util.Date;import cn.hutool.http.HttpRequest;import cn.hutool.http.HttpUtil;import lombok.extern.slf4j.Slf4j;/*** @author mrcode*/@Component@Slf4jpublic class HttpCallbackReceiver {/*** 最大重试次数,每次重试间隔时间翻倍; 也就是说, 64 分钟内会回调 7 次*/public static final int MAX_RETRY_COUNT = 7;/*** <pre>* 并发消费者数量,m-n* m: 最小消费者数量* n:最大消费者数量* </pre>*/public static final String CONCURRENCY = "3-7";@Autowiredprivate NoticeHttpCallbacklMqService noticeHttpCallbacklMqService;/**正常消费,立即回调一次*/@RabbitListener(queues = HttpCallbackMqConfig.NORMAL_QUEUE,concurrency = CONCURRENCY,ackMode = "AUTO")public void process(String message) {log.info("首次回调:{}", JSONObject.toJSONString(message));HttpCallbackMqRequest request = JSON.parseObject(message, HttpCallbackMqRequest.class);try {final String url = request.getUrl();final String dadaJson = request.getDataJson();push(url, dadaJson);} catch (Exception e) {log.error("首次回调异常,进入延迟队列:message:{},Exception={} ", message, e.getMessage());request.setRetryCount(0);request.setLastRetryTime(new Date());noticeHttpCallbacklMqService.delay(request);}}private void push(String url, String dadaJson) {String response = HttpRequest.post(url).body(dadaJson).timeout(1000 * 5).execute().body();}/**处理死信消息、处理间隔时间逻辑*/@RabbitListener(queues = HttpCallbackMqConfig.DEAD_QUEUE,concurrency = CONCURRENCY,ackMode = "AUTO")public void deadProcess(String message) {HttpCallbackMqRequest request = JSON.parseObject(message, HttpCallbackMqRequest.class);final String url = request.getUrl();final String dadaJson = request.getDataJson();final Integer retryCount = request.getRetryCount() + 1;final Integer reEnterCount = request.getReEnterCount() + 1;try {// 下一次执行的次数:1,2,4,8 ...int nextExecCount = (int) Math.pow(2, request.getRetryCount());if (nextExecCount == reEnterCount) {log.info("重试回调处理信息:{}", JSONObject.toJSONString(message));push(url, dadaJson);} else {request.setReEnterCount(reEnterCount);noticeHttpCallbacklMqService.delay(request);}} catch (Exception e) {log.error("重试回调异常,当前重试次数{}/{}:message:{},Exception={} ", retryCount, MAX_RETRY_COUNT, message, e.getMessage());// 超过重试次数,则不再继续,直接丢弃if (retryCount >= MAX_RETRY_COUNT) {return;}request.setRetryCount(retryCount);request.setReEnterCount(reEnterCount);noticeHttpCallbacklMqService.delay(request);}}public static void main(String[] args) {for (int i = 0; i < 10; i++) {System.out.println((int) Math.pow(2, i));}}}
辅助投递消息的工具服务
package cn.mrcode.httpcallback;import com.alibaba.fastjson.JSON;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.Date;import lombok.extern.slf4j.Slf4j;/*** @author mrcode*/@Service@Slf4jpublic class NoticeHttpCallbacklMqService {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 投递到普通队列** @param callbackDestination* @param toJSONString*/public void send(String callbackDestination, String toJSONString) {final HttpCallbackMqRequest request = new HttpCallbackMqRequest();request.setDataJson(toJSONString);request.setTime(new Date());request.setUrl(callbackDestination);rabbitTemplate.convertAndSend(HttpCallbackMqConfig.NORMAL_QUEUE,"",JSON.toJSONString(request));}/*** 投递到延迟队列:一分钟后自动过期转移到死信队列** @param request*/public void delay(HttpCallbackMqRequest request) {rabbitTemplate.convertAndSend(HttpCallbackMqConfig.DELAY_QUEUE,"",JSON.toJSONString(request));}/*** 重新入延迟队列** @param request*/public void reDelay(HttpCallbackMqRequest request) {rabbitTemplate.convertAndSend(HttpCallbackMqConfig.DELAY_QUEUE,"",JSON.toJSONString(request));}}
包装原始响应消息的消息体
package cn.mrcode.httpcallback;import cn.mrcode.BaseMqRequest;import cn.mrcode.BaseMqResponse;import java.util.Date;import lombok.Data;import lombok.EqualsAndHashCode;import lombok.ToString;/*** <pre>* 注意,本类不提供结果响应* 如果回调失败,直接丢弃* </pre>*/@Data@ToString@EqualsAndHashCode(callSuper = true)public class HttpCallbackMqRequest extends BaseMqRequest {/*** 要调用的 地址,只支持 POST JSON 方式*/private String url;/*** 要提交的 json 信息*/private String dataJson;// 当前重试次数private Integer retryCount = 0;// 当前重新排队次数private Integer reEnterCount = 0;// 上一次重试时间private Date lastRetryTime;@Overridepublic BaseMqResponse newInstance() {return null;}}@Data@ApiModelpublic abstract class BaseMqRequest {@ApiModelProperty("投递结果回调地址,必须是 POST URL, 接受 JSON 类型,如果回调失败,那么将会在 64 分钟内持续回调 7 次,直至成功,请在 5 秒内给出响应,否则会超时 ")private String callbackDestination;@ApiModelProperty("消息产生时间,格式 2020-03-05 02:06:07")@NotNullprivate Date time;@ApiModelProperty("自定义消息 ID,如果有,则在回调时原样返回,这里你可以自由发挥,不仅仅只能传递一个简单的 ID 字符串")private String cid;@ApiModelProperty(value = "简单的系统 ID", hidden = true)private String id;public BaseMqResponse buildBaseResponse() {BaseMqResponse response = newInstance();if (response == null) {return null;}response.setCid(cid);response.setId(id);response.setTime(new Date());response.setSuccess(true);return response;}public abstract BaseMqResponse newInstance();@Overridepublic String toString() {return"id='" + id + '\'' +", cid='" + cid + '\'' +", time=" + time + '\'' +"callbackDestination='" + callbackDestination + '\'';}}
获取队列中消息数量
记得有一篇文章说过,使用消息队列不只是会投递、消费消息,最重要的一个指标是消息积累数量,需要监控,如果堆积严重,需要进行改进,那么就是如何通过程序获取到队列中消息数量
先配置 RabbitAdmin ,通过它来的 API 来获取到消息数量
import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMqConfig {@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true); // 服务启动时候开启自动启动return rabbitAdmin;}}
获取指定队列消息数量
import org.springframework.amqp.core.QueueInformation;@Autowiredprivate RabbitAdmin rabbitAdmin;@GetMapping("/queue-monitor")@ApiOperation(value = "队列监控", notes = "获取消息队列中的消息堆积数量等信息,请不要大量并发调用该接口")public Result<QueueInformation> queueMonitor() {final QueueInformation queueInfo = rabbitAdmin.getQueueInfo(Names.QUEUE);return ResultHelper.ok(queueInfo);}
该对象能获取到的信息有:队列名称、消息数量、消费者数量
"name": "send.mail","messageCount": 0,"consumerCount": 3
