pom文件
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.4.Final</version></dependency>
配置文件
- 配置一些基础的配置信息
#启动测试之前请替换如下 XXX 为您的配置rocketmq.accessKey=XXXrocketmq.secretKey=XXrocketmq.nameSrvAddr=XXXrocketmq.topic=XXXrocketmq.groupId=XXXrocketmq.tag=*rocketmq.orderTopic=XXXrocketmq.orderGroupId=XXXrocketmq.orderTag=*
- 创建一个配置项实体类,用于读取配置信息
@Configuration@ConfigurationProperties(prefix = "rocketmq")@Datepublic class MqConfig { private String accessKey; private String secretKey; private String nameSrvAddr; private String topic; private String groupId; private String tag; private String orderTopic; private String orderGroupId; private String orderTag; public Properties getMqPropertie() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); return properties; }}
普通类型的生产者和消费者定义
生产者
建立连接
@Configurationpublic class ProducerClient { @Autowired private MqConfig mqConfig; @Bean(initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(mqConfig.getMqPropertie()); return producer; }}
生产者发送消息
同步
@RunWith(SpringRunner.class)@SpringBootTestpublic class SyncProducerTest { //普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中 @Autowired private ProducerBean producer; @Autowired private MqConfig mqConfig; @Test public void testSend() { //循环发送消息 for (int i = 0; i < 100; i++) { Message msg = new Message( // // Message所属的Topic mqConfig.getTopic(), // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤 mqConfig.getTag(), // Message Body 可以是任何二进制形式的数据, MQ不做任何干预 // 需要Producer与Consumer协商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一 // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 msg.setKey("ORDERID_100"); // 发送消息,只要不抛异常就是成功 try { SendResult sendResult = producer.send(msg); assert sendResult != null; System.out.println(sendResult); } catch (ONSClientException e) { System.out.println("发送失败"); //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 } } }}
异步
@RunWith(SpringRunner.class)@SpringBootTestpublic class AsyncProducerTest { //普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中 @Autowired private ProducerBean producer; @Autowired private MqConfig mqConfig; @Test public void testSend() { //对于使用异步接口,建议设置单独的回调处理线程池,拥有更灵活的配置和监控能力。 //如下构造线程的方式请求队列为无界仅用作示例,有OOM的风险。 //更合理的构造方式请参考阿里巴巴Java开发手册:https://github.com/alibaba/p3c producer.setCallbackExecutor(Executors.newFixedThreadPool(10)); //循环发送消息 for (int i = 0; i < 1; i++) { Message msg = new Message( // // Message所属的Topic mqConfig.getTopic(), // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤 mqConfig.getTag(), // Message Body 可以是任何二进制形式的数据, MQ不做任何干预 // 需要Producer与Consumer协商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一 // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 msg.setKey("ORDERID_100"); // 发送消息,只要不抛异常就是成功 try { producer.sendAsync(msg, new SendCallback() { @Override public void onSuccess(final SendResult sendResult) { assert sendResult != null; System.out.println(sendResult); } @Override public void onException(final OnExceptionContext context) { //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 } }); } catch (ONSClientException e) { System.out.println("发送失败"); //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 } } }}
sql 过滤
@RunWith(SpringRunner.class)@SpringBootTestpublic class SqlProducerTest { //普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中 @Autowired private ProducerBean producer; @Autowired private MqConfig mqConfig; @Test public void testSend() { //循环发送消息 for (int i = 0; i < 100; i++) { String tag; int div = i % 3; if (div == 0) { tag = "TagA"; } else if (div == 1) { tag = "TagB"; } else { tag = "TagC"; } Message msg = new Message( // // Message所属的Topic mqConfig.getTopic(), // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤 tag, // Message Body 可以是任何二进制形式的数据, MQ不做任何干预 // 需要Producer与Consumer协商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一 // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 msg.setKey("ORDERID_100"); // 设置自定义属性,该属性可用于做SQL属性过滤 msg.putUserProperties("a", String.valueOf(i)); // 发送消息,只要不抛异常就是成功 try { SendResult sendResult = producer.send(msg); assert sendResult != null; System.out.println(sendResult); } catch (ONSClientException e) { System.out.println("发送失败"); //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 } } }}
消费者
监听
@Componentpublic class DemoMessageListener implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); try { //do something.. return Action.CommitMessage; } catch (Exception e) { //消费失败 return Action.ReconsumeLater; } }}
普通消费方式
//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了public class ConsumerClient { @Autowired private MqConfig mqConfig; @Autowired private DemoMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId()); //将消费者线程数固定为20个 20为默认值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); consumerBean.setProperties(properties); //订阅关系 Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getTopic()); subscription.setExpression(mqConfig.getTag()); subscriptionTable.put(subscription, messageListener); //订阅多个topic如上面设置 consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; }}
sql 过滤消费
//正式开发时可以加上 @Configuration 注解,这样服务启动时consumer也启动了//sql92只有mq铂金版才支持public class SqlConsumerClient { @Autowired private MqConfig mqConfig; @Autowired private DemoMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildSqlConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId()); consumerBean.setProperties(properties); //订阅关系 Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getTopic()); // 表示需要使用SQL来过滤消息 subscription.setType("SQL92"); //需要消息的tag是'TagA'或'TagB'并且自定义属性a(在发送消息的时候通过putUserProperties方法放入)需要在[0,3] //SQL过滤同样可以使用消息的tag作为过滤条件(消息的tag在消息的属性中叫做 TAGS) //SQL过滤同样可以在顺序消费中使用 subscription.setExpression("(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)"); subscriptionTable.put(subscription, messageListener); //订阅多个topic如上面设置 consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; }}