- 1:springboot整合
- NameServer地址
rocketmq.name-server=192.168.232.128:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup - 2: SpringCloudStream整合
- ScStream通用的配置以spring.cloud.stream开头
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic - 代码
1:springboot整合
注意版本问题 版本差距比较大, 如果是rocketmq-spring-boot-starter:2.0.4版本开发的代码
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.1.6.RELEASE</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency></dependencies>
rocketmq-spring-boot-starter:2.1.1引入的SpringBoot包版本是2.0.5.RELEASE,这里把SpringBoot的依赖包升级了一下。
配置文件:
NameServer地址
rocketmq.name-server=192.168.232.128:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup
生产者
@Componentpublic class SpringProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;//发送普通消息的示例public void sendMessage(String topic,String msg){this.rocketMQTemplate.convertAndSend(topic,msg);}//发送事务消息的示例public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {Message<String> message = MessageBuilder.withPayload(msg).build();String destination =topic+":"+tags[i % tags.length];SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);System.out.printf("%s%n", sendResult);Thread.sleep(10);}}}
消费者
@Component@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}}
监听类
RocketMQMessageListener
selectorType属性和selectorExpression来定制 consumeMode属性定性并发还是有序 messageModel属性定制广播,集群
事务监听
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")public class MyTransactionImpl implements RocketMQLocalTransactionListener {private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {Object id = msg.getHeaders().get("id");String destination = arg.toString();localTrans.put(id,destination);org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);String tags = message.getTags();if(StringUtils.contains(tags,"TagA")){return RocketMQLocalTransactionState.COMMIT;}else if(StringUtils.contains(tags,"TagB")){return RocketMQLocalTransactionState.ROLLBACK;}else{return RocketMQLocalTransactionState.UNKNOWN;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {//SpringBoot的消息对象中,并没有transactionId这个属性。跟原生API不一样。// String destination = localTrans.get(msg.getTransactionId());return RocketMQLocalTransactionState.COMMIT;}}
总结
- SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。具体所有的配置信息可以参见org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。
- SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,这在使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指定。
- 最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。apache有一个官方的rocketmq-spring示例,地址:https://github.com/apache/rocketmq-spring.git 以后如果版本更新了,可以参考下这个示例代码。
2: SpringCloudStream整合
官网:
sample:
https://github.com/spring-cloud/spring-cloud-stream-samples/
spring社区提供的一个统一的消息驱动框架
目的:统一一个编程模型对接所有MQ消息中间件
目前已接入 :
引入maven
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.7.1</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2.2.3.RELEASE</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.3.RELEASE</version></dependency></dependencies>
启动类:
// Source类,Sink类@EnableBinding({Source.class, Sink.class})@SpringBootApplicationpublic class ScRocketMQApplication {public static void main(String[] args) {SpringApplication.run(ScRocketMQApplication.class,args);}}
配置文件:
ScStream通用的配置以spring.cloud.stream开头
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic
#rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头 spring.cloud.stream.rocketmq.binder.nameserver=192.168.232.128:9876;192.168.232.129:9876
一个binding 对应一个消息通道,对应input 对应Sink.class,对应的消费者
output 在Source.class中定义,对应的生产者
生产者:
@Componentpublic class ScProducer {@Resourceprivate Source source;public void sendMessage(String msg){Map<String, Object> headers = new HashMap<>();headers.put(MessageConst.PROPERTY_TAGS, "testTag");MessageHeaders messageHeaders = new MessageHeaders(headers);Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);this.source.output().send(message);}}
消费者
@Componentpublic class ScConsumer {@StreamListener(Sink.INPUT)public void onMessage(String messsage){System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT);}}
总结
- 对于RocketMQ,慎用,kafka,rabbit还不清楚
- 隔离了具体消息的实现方式,可以方便切换底层的实现
- 目前封装的中间件不多,spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了,当中关于RocketMQ的文档较少
代码
rocketMQ-demo.7z
