RocketMQ原生API使用
1:项目搭建
**maven
**
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version></dependency>
其中源码中 example项目有详细的测试代码
生产者消费者需要依赖的是NameServer
多个nameServer分号分隔
2、RocketMQ的编程模型
生产者
- 创建生产者 producer,制定生产者组名
- 制定nameServer地址
- producer启动
- 创建消息对象,制定topic, tag ,消息体
- 发送消息
-
消费者
创建consumer,制定消费组名
- 制定nameServer地址
- 订阅topic, tag
- 设置回调函数,处理消息
- 启动消费者
3、RocketMQ的消息样例
基本样例
发送方式
1:同步发送
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.setNamesrvAddr("192.168.232.128:9876");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 20; i++) {try {Message msg = new Message("lite_pull_consumer_test","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));//同步传递消息,消息会发给集群中的一个Broker节点。SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);//返回结果,执行下面的code} catch (Exception e) {e.printStackTrace();}}producer.shutdown();
2:异步发送
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");producer.setNamesrvAddr("localhost:9876");producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 100;//由于是异步发送,这里引入一个countDownLatch,//保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) {try {final int index = i;Message msg = new Message("Jodie_topic_1023","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {// 成功的回调@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}//异常的回调@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();
3:单向发送
没有结果,没有回调,只管发送出去
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 100; i++) {//Create a message instance, specifying topic, tag and message body.Message msg = new Message("TopicTest" ,"TagA" ,("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//Call send message to deliver message to one of brokers.producer.sendOneway(msg);}//Wait for sending to completeThread.sleep(5000);producer.shutdown();
消费模式:
1:主动拉模式
//已经删除了,不推荐了DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Jodie_topic_1023");for (MessageQueue mq : mqs) {System.out.printf("Consume from the queue: %s%n", mq);SINGLE_MQ:while (true) {try {PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.printf("%s%n", pullResult);putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}} catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();
2:等待broker推模式push
实际是pull模式封装的
//也已经过期了,替换的类是DefaultLitePullConsumerImplDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("lite_pull_consumer_test", "*");//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//wrong time format 2017_0422_221800//consumer.setConsumeTimestamp("20181109221800");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");
消息类型
1:顺序消息
只保证局部有序,不能保证圈住有序
发送者默认会round robin轮询把消息发送到不同的MessageQueue(分区队列)
消费者也会从messageQueue上拉取,只有一组有序的消息发在同一个MessageQueue才能利用MessageQueue
先进先出保证有序
brokerz中一个队列内的消息是保证有序的
消费者:会从多个消息队列上取消息,所以多个队列上的消息是无序的。
消费者要有序,需要一个队列一个队列的取数据。
消费者注入MessageListenerOrderly 会通过锁队列,一个一个的取消息
MessageListenerConcurrently 不会锁队列,只会随机取一批无法保证顺序
try {MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;} else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;} else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;} else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");
2:广播消息
默认是集群状态MessageModel.CLUSTERING: 一个消息只会被消费组中的一个实例消费
广播模式MessageModel.BROADCASTING:消费组里的所有实例都会消费这个消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setMessageModel(MessageModel.BROADCASTING);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");
3:延迟消息
是等一会
message.setDelayTimeLevel(3); 3:是隔离级别
分别对应:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
开源版本一共18个隔离级别,底层实现是18个队列实现的
public static void main(String[] args) throws Exception {// Instantiate a producer to send scheduled messagesDefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// Launch producerproducer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// This message will be delivered to consumer 10 seconds later.message.setDelayTimeLevel(3);// Send the messageproducer.send(message);}// Shutdown producer after use.producer.shutdown();}
4:批量消息
把多个消息一次发送出去,减少IO
如果批量消息大于1M就不要一个批次了,可以拆分
实际情况是不要大于4M,需要是同一个topic的消息,等
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");producer.start();//If you just send messages of no more than 1MiB at a time, it is easy to use batch//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule supportString topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));producer.send(messages);
5:过滤消息
tag过滤
生产者:
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 60; i++) {Message msg = new Message("TagFilterTest",tags[i % tags.length],"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();
消费者:
RocketMQ最佳实践 ,一个应用可以用一个Topic,不同的业务用tag区分
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");//订阅 tagA tagC的消息consumer.subscribe("TagFilterTest", "TagA || TagC");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");
SQL过滤
s生产者,没啥区别的
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 10; i++) {Message msg = new Message("SqlFilterTest",tags[i % tags.length],("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();
消费者
使用MessageSelector.bySql
按照SQL92标准,
SQL可以使用的tage和生产者加入的属性
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Don't forget to set enablePropertyFilter=true in brokerconsumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");
SQL92语法
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。
6:事务消息

这是事务消息被分成了两个事务:发送时,消费时
只能保证发送消息与本地事务的两个操作的原子性
类: TransactionMQProducer,TransactionListener
TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("127.0.0.1:9876");ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {try {Message msg =new Message("TestTopic", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();
需要有个监听的实现
class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();//half消息发送成功后回调此方法,执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {String tags = msg.getTags();//do 本地事务,本地数据库的操作boolean isSuccess = true;//本地事务crud成功,就提交消息if(isSuccess){//会立刻被消费者消费return LocalTransactionState.COMMIT_MESSAGE;}else if(!isSuccess){//如果本地事务失败,就回滚mq消息return LocalTransactionState.ROLLBACK_MESSAGE;}else{return LocalTransactionState.UNKNOW;}}//COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {//回调,查询本地事务的状态吧,算是对上面的一种补充Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}}
不支持延迟消息,不支持批量消息
单个消息检查的次数是15次,可以在broker tansactionCheckMax配置
默认超过max会丢弃,也是重写AbstractTransactionCheckListener改变行为
实际检查的次数会在messge保持的用户属性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES
配置文件中transactionMsgTimeout 指定特定时间后被检查
可以在用户属性CHECK_IMMUNITY_TIME_IN_SECONDS 修改时间限制
BrokerConfig.transactionTimeOut参数配置,默认6秒,可在broker.conf配置
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, “10000”);
设置未为10s
事务消息可能不止一次被检查消费
建议使用双重写入机制
事务机制的顺序图:
在发送half办消息时:其实是吧消息存入 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic
对消费者不可见,最终提交才会转存到对应的topic
4:ACL权限控制
文档说明: docs/cn/acl/user_guide.md
主要在broker.conf配置,
打开acl标志:aclEnable=true
plain_acl.yml进行权限配置,热加载不需要重启
全局白名单,不受ACL控制
#通常需要将主从架构中的所有节点加进来
globalWhiteRemoteAddresses:
- 10.10.103.
- 192.168.0.
accounts:
#第一个账户
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY #默认Topic访问策略是拒绝
defaultGroupPerm: SUB #默认Group访问策略是只允许订阅
topicPerms:
- topicA=DENY #topicA拒绝
- topicB=PUB|SUB #topicB允许发布和订阅消息
- topicC=SUB #topicC只允许订阅
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
#第二个账户,只要是来自192.168.1.的IP,就可以访问所有资源
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.
# if it is admin, it could access all resources
admin: true
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());producer.setNamesrvAddr("127.0.0.1:9876");producer.start();producer.shutdown();//设置用户和密码static RPCHook getAclRPCHook() {return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));}
