一、新建工程 rocketmq-springboot-producer-demo 以及 rocketmq-springboot-consumer-demo
任何第三方库和SpringBoot进行整合,都是这三步
- 改pom (导入这个组件的依赖)
- 写yml (写这个组件的一些配置)
-
二、修改pom.xml,添加依赖
完整依赖如下,截止至 2020.7.13号,rocketmq-spring-boot-starter的最新版为2.1.0 ```xml <?xml version=”1.0” encoding=”UTF-8”?> <project xmlns=”http://maven.apache.org/POM/4.0.0“
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>mq-demo</artifactId><groupId>cn.spectrumrpc</groupId><version>1.0-SNAPSHOT</version>
4.0.0 rocketmq-springboot-demo org.springframework.boot spring-boot-starter 2.2.2.RELEASE org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0
<a name="eusr0"></a>## 三、修改application.yml```yamlrocketmq:name-server: 127.0.0.1:9876producer:group: default-group
四、编写测试用例
在SpringBoot中,都是采用xxxTemplate来进行封装。
所以采用RocketMQTemplate 来发送消息
4.1 同步发送消息
@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendSyncMessage() {rocketMQTemplate.syncSend("syncTopic","hello,rocketmq-springboot");}
4.2 异步发送消息
public void sendAsyncMessage() {rocketMQTemplate.asyncSend("asyncTopic", "hello,async", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("onSuccess:" + sendResult);}@Overridepublic void onException(Throwable throwable) {throwable.printStackTrace();}});}
发送结果:
onSuccess:SendResult [sendStatus=SEND_OK, msgId=FE80000000000000E8AD87FFFE95B346000018B4AAC2457020CC0000, offsetMsgId=AC11000100002A9F000000000000F735, messageQueue=MessageQueue [topic=asyncTopic, brokerName=broker-a, queueId=3], queueOffset=0]
4.3 单向发送消息
public void sendOneWay() {rocketMQTemplate.sendOneWay("oneWayTopic", "onewayMessage");}
4.4 消费端消费消息
通过 @RocketMQMessageListener 注解,并继承 RocketMQListener,就可以监听 生产者发送的消息
其中 onMessage 即为业务处理的方法。
@Component@RocketMQMessageListener(topic = "asyncTopic",consumerGroup = "springboot-mq-consumer-1")public class MQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("info = " + s);}}
@RocketMQMessageListener 中有许多的属性,最常用的为 topic:指定消费的主题, consumerGroup指定当前消费者所在的组(同一个消费组)。
其中rocketmq的消费者有两种消费模式,负载均衡和轮询。在 @RocketMQMessageListener 注解中,是通过messageModel这个属性进行配置的,如下。(其余的更多属性,在后续的案例中再进行解释)
// 广播@RocketMQMessageListener(topic = "asyncTopic",consumerGroup = "springboot-mq-consumer-1",messageModel = MessageModel.BROADCASTING)// 轮循@RocketMQMessageListener(topic = "asyncTopic",consumerGroup = "springboot-mq-consumer-1",messageModel = MessageModel.CLUSTERING)
4.5 发送顺序消息
生产者代码
调用带Orderly的api即可
public void sendOrderly() {List<Order> orders = Order.buildOrders();for (Order order : orders) {rocketMQTemplate.syncSendOrderly("orderTopic",order, String.valueOf(order.getOrderId()));}}
如果期望异步发送消息,采用
public void sendOrderly() {List<Order> orders = Order.buildOrders();for (Order order : orders) {rocketMQTemplate.asyncSendOrderly("orderTopic", order, String.valueOf(order.getOrderId()), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("onSuccess, result = " + sendResult);}@Overridepublic void onException(Throwable throwable) {throwable.printStackTrace();}});}}
消费者代码
消费端,需要在@RocketMQMessageListener注解上,修改一个属性,标明这个消费者是顺序消费,否则,还是会乱序消费。 需要指定 consumeMode = ConsumeMode.ORDERLY,指明顺序消费,指定这个,相当于 MessageListenerOrderly 来监听
@Component@RocketMQMessageListener(topic = "orderTopic",consumerGroup = "orderly-consumer", consumeMode = ConsumeMode.ORDERLY)public class OrderlyConsumer implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {System.out.println("receive order: " + order + "\t currentThread" + Thread.currentThread().getName());}}
测试结果:
4.6 发送延时消息
生产者:
使用如下的api,第四个参数就是延时的级别
public SendResult syncSend(String destination, Message<?> message,long timeout, int delayLevel){}
具体的示例如下:
public void sendDelayMessage() {Message<String> message = MessageBuilder.withPayload("delayMessage").build();rocketMQTemplate.syncSend("delayTopic", message, 1000, 3);}
消费者:
消费者端和之前没有任何差别。
@Component@RocketMQMessageListener(topic = "delayTopic",consumerGroup = "delay-consumer")public class DelayConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println("currentTime = " + System.currentTimeMillis() + "");System.out.println("recevice message = " + message + "");}}
4.7 批量发送消息
生产者:
public void batchMessage() {List<Message> messages = new ArrayList<>();for (int i = 0; i < 10; i++) {messages.add(MessageBuilder.withPayload("batchMessage " + i).build());}rocketMQTemplate.syncSend("batchTopic", messages, 1000);}
4.8 过滤消息
生产者:
public void filterMessage(){for (int i = 0; i < 10; i++) {HashMap<String, Object> map = new HashMap<>();map.put("a", i);MessageHeaders messageHeaders = new MessageHeaders(map);Message<String> message = MessageBuilder.createMessage("filter message " + i, messageHeaders);rocketMQTemplate.syncSend("filterTopic", message);}}
消费者:
主要添加 @RocketMQMessageListener 的 selectorType 为 SelectorType.SQL92。
selectorExpression 为自己想要过滤的SQL语法,例如此例子中,添加的属性为a,判断a的范围。
@Component@RocketMQMessageListener(topic = "filterTopic",consumerGroup = "filter-consumer",selectorType = SelectorType.SQL92, selectorExpression = "a between 0 and 3")public class FilterConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("on message, message body: " + s);}}
4.9 事务消息
五、RocketMQMessageListener 的属性解释
@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface RocketMQMessageListener {// rocketmq 的 namesrv地址String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";// 当前消费者的所在组String consumerGroup();// 当前消费者所消费的主题String topic();// 消费者过滤消息的方式,默认为TAG,还有SQL92这种类型SelectorType selectorType() default SelectorType.TAG;// 消费者过滤消息的表达式,默认是*,即所有消息都拿过来,// 结合上默认的TAG的话,默认就是所有TAG都拿String selectorExpression() default "*";// 消费者的默认,默认为并发争抢消费,// 还有另一种模式是ConsumeMode.ORDERLY,即顺序消费ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;// 消息的模式, 默认是负载均衡模式,即多个客户端一起完成这个消息的消费// 还有另一种模式是 MessageModel.BROADCASTING 即订阅模式,所有人看到所有消息MessageModel messageModel() default MessageModel.CLUSTERING;// 消费者的最大线程数int consumeThreadMax() default 64;// 消费者的超时时间,默认30Slong consumeTimeout() default 30000L;String accessKey() default "${rocketmq.consumer.access-key:}";String secretKey() default "${rocketmq.consumer.secret-key:}";boolean enableMsgTrace() default true;String customizedTraceTopic() default "${rocketmq.consumer.customized-trace-topic:}";String nameServer() default "${rocketmq.name-server:}";String accessChannel() default "${rocketmq.access-channel:}";}
