1.环境准备
1.创建Maven工程,quickstart的就好
2.在自己的机器上启动rocketmq,windows的上篇文章已经讲过
3.maven中导入rocketmq的连接jar包和测试包
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.0</version></dependency>
2.RocketMQ整体存储原理
2.1消息存储的相关文件
2.1.1CommitLog文件
①存储消息的主体内容
②一个CommitLog文件大小为1G
③文件的命名为20位的二进制表示(文件位置:${HOME}/store/commitlog/),如下图
④如果第一文件0000**0000文件存满1G后,会生成名称为 :00000000001073741824文件(第二个文件)。
2.1.2ConsumeQueue文件
①消费者要消费消息时,需要到2.1.1中的commitlog文件中寻找数据,那么如何定位消息在commitLog文件的位置呢??
②ConsumeQueue文件中存贮每个消息在commitLog中的物理偏移量,消息的大小和消息Tag的hashCode
③图形展示ConsumeQueue文件和CommitLog文件的关系
2.1.3IndexFile文件
①提供了可以根据时间和key值来查询commitlog消息的索引
②提供的就是另一种查询消息的方式,官方说是个IndexFile实现的是hashMap结构。
③文件名称为 时间戳,按照时间查询就到时间对应的文件查询即可
④按照key查询,那么大概的原理图应该是:
3.RokectMQ相关概念
2.各个概念的图形化
3.一条消息包含的内容
①所属的Topic名称
②Tag标签
③key值
④消息内容
4.一个消息保存到RocketMq的一个过程
①生产者向nameserver请求发送消息
②nameserver将返回brokerserver的一个ip:port
③生产者向这个brokerserver发送消息(包含topic tag key和消息主题)
④这台brokerserver的commitlog文件会记录这个消息
⑤indexfile中会记录这个消息索引。
⑥消息会自动选择Topic下的一个consumequeue,将消息索引放入这个队列中。
⑦broker返回ack
4.RockectMQ的使用
4.1消息发送
发送一个消息需要确定3个问题。
①同步、异步或单项发送。
②发送普通消息、顺序消息还是批量发送(只能同步发)
③消息设不设置属性、设不设置延时。
4.1.1发送同步消息
- 步骤
①创建一个生产者,并指定生产者组
②为生产者指定 nameserver的ip和端口
③启动生产者实例
④创建消息实例
⑤通过生产者将消息发送。
⑥关闭生产者
2.代码
@Testpublic void testSendSyncMsg() throws Exception{//1.创建一个生产者,并指定生产者组DefaultMQProducer producer = new DefaultMQProducer("syncMsgProducerGroup");//2.为生产者指定 nameserver的ip和端口producer.setNamesrvAddr("localhost:9876");//3.启动生产者实例producer.start();//4.创建消息实例Message message = new Message("syncMsgTopic", "Tag", "key1", "Hello RocketMq".getBytes(RemotingHelper.DEFAULT_CHARSET));//5.通过生产者将消息发送。producer.send(message);//6.关闭生产者producer.shutdown();}
4.1.2发送异步消息
- 步骤
①创建一个生产者,并指定生产者组
②为生产者指定 nameserver的ip和端口
③启动生产者实例
④创建消息实例
⑤通过生产者将消息发送,并设置回调方法。
⑥关闭生产者
2.代码
@Testpublic void testSendAsyncMsg() throws Exception{//1.创建一个生产者,并指定生产者组DefaultMQProducer producer = new DefaultMQProducer("aSyncMsgProducerGroup");//2.为生产者指定 nameserver的ip和端口producer.setNamesrvAddr("localhost:9876");//3.启动生产者实例producer.start();//4.创建消息实例Message message = new Message("aSyncMsgTopic", "Tag", "key2", "Hello RocketMq Async".getBytes(RemotingHelper.DEFAULT_CHARSET));//5.通过生产者将消息发送,并设置回调方法producer.send(message, new SendCallback() {//成功时的回调@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送异步消息成功了!");System.out.println(sendResult);}//异常时的回调@Overridepublic void onException(Throwable throwable) {System.out.println("发送异步消息失败了");throwable.printStackTrace();}});//这里我可以使用闭锁,但是为了简单直接就睡一会Thread.sleep(8000);//6.关闭生产者producer.shutdown();}
4.1.3发送单向消息
- 步骤
①创建一个生产者,并指定生产者组
②为生产者指定 nameserver的ip和端口
③启动生产者实例
④创建消息实例
⑤通过生产者发送单向消息,没有任何的返回值
⑥关闭生产者
2.代码
@Testpublic void testSendSingleMsg() throws Exception{//1.创建一个生产者,并指定生产者组DefaultMQProducer producer = new DefaultMQProducer("singleMsgProducerGroup");//2.为生产者指定 nameserver的ip和端口producer.setNamesrvAddr("localhost:9876");//3.启动生产者实例producer.start();//4.创建消息实例Message message = new Message("singleMsgTopic", "Tag", "key3", "Hello RocketMq Single".getBytes(RemotingHelper.DEFAULT_CHARSET));//5.通过生产者发送单向消息,没有任何的返回值producer.sendOneway(message);//6.关闭生产者producer.shutdown();}
4.1.4同步顺序消息(异步和单向的不再举例)
1.步骤
①创建生产者实例
②设置nameserver的ip和端口
③启动生产者实例
④创建3条消息
⑤通过producer发送3条消息,并设置序号
⑥关闭生产者
2.代码
@Testpublic void testSendSequenceMsg() throws Exception{//1.创建一个生产者,并指定生产者组DefaultMQProducer producer = new DefaultMQProducer("sequenceMsgProducerGroup");//2.为生产者指定 nameserver的ip和端口producer.setNamesrvAddr("localhost:9876");//3.启动生产者实例producer.start();//4.创建3个消息实例ArrayList<Message> messages = new ArrayList<>();Message message = new Message("sequenceMsgTopic", "Tag", "key1", "Hello RocketMq Sequence".getBytes(RemotingHelper.DEFAULT_CHARSET));Message message1 = new Message("sequenceMsgTopic", "Tag", "key2", "Hello RocketMq Sequence".getBytes(RemotingHelper.DEFAULT_CHARSET));Message message2 = new Message("sequenceMsgTopic", "Tag", "key3", "Hello RocketMq Sequence".getBytes(RemotingHelper.DEFAULT_CHARSET));messages.add(message);messages.add(message1);messages.add(message2);//5.遍历消息并发送在第一消息队列中for(Message msg:messages){SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {//都发在第一个队列中return list.get(0);}}, messages.indexOf(msg)+1);System.out.println(sendResult);}//6.关闭生产者producer.shutdown();}
3.实际上就是下面这个send使用
// 消息 指定消息队列 指定排序的public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
4.1.5延时消息
1.代码
@Testpublic void testSendDlayMsg() throws Exception{//1.创建一个生产者,并指定生产者组DefaultMQProducer producer = new DefaultMQProducer("delayMsgProducerGroup");//2.为生产者指定 nameserver的ip和端口producer.setNamesrvAddr("localhost:9876");//3.启动生产者实例producer.start();//4.创建消息实例Message message = new Message("delayMsgTopic", "Tag", "key1", "Hello RocketMq Delay".getBytes(RemotingHelper.DEFAULT_CHARSET));//5.为消息设置延时等级 1~18级message.setDelayTimeLevel(3);//6.通过生产者将消息发送。SendResult send = producer.send(message);System.out.println(send);//7.关闭生产者producer.shutdown();}
2.1~18级对应的时间
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
4.1.6批量消息(只能以同步方式发送)
1.代码
@Testpublic void testSendBatchMsg()throws Exception{//1.创建一个生产者,并指定生产者组DefaultMQProducer producer = new DefaultMQProducer("batchMsgProducerGroup");//2.为生产者指定 nameserver的ip和端口producer.setNamesrvAddr("localhost:9876");//3.启动生产者实例producer.start();//4.创建消息实例ArrayList<Message> messages = new ArrayList<>();messages.add(new Message("batchMsgTopic", "Tag", "key1", "Hello RocketMq Batch1".getBytes(RemotingHelper.DEFAULT_CHARSET)));messages.add(new Message("batchMsgTopic", "Tag", "key2", "Hello RocketMq Batch2".getBytes(RemotingHelper.DEFAULT_CHARSET)));messages.add(new Message("batchMsgTopic", "Tag", "key3", "Hello RocketMq Batch3".getBytes(RemotingHelper.DEFAULT_CHARSET)));messages.add(new Message("batchMsgTopic", "Tag", "key4", "Hello RocketMq Batch4".getBytes(RemotingHelper.DEFAULT_CHARSET)));//5.发送SendResult send = producer.send(messages);System.out.println(send);//6.关闭生产者producer.shutdown();}
2.注意:批量消息大小要小于 4M
3.当消息多了后使用官方提供的切list的类
public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int startIndex = getStartIndex();int nextIndex = startIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = calcMessageSize(message);if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(startIndex, nextIndex);currIndex = nextIndex;return subList;}private int getStartIndex() {Message currMessage = messages.get(currIndex);int tmpSize = calcMessageSize(currMessage);while(tmpSize > SIZE_LIMIT) {currIndex += 1;Message message = messages.get(curIndex);tmpSize = calcMessageSize(message);}return currIndex;}private int calcMessageSize(Message message) {int tmpSize = message.getTopic().length() + message.getBody().length();Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节return tmpSize;}}===============================================================//把大的消息分裂成若干个小的消息ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {try {List<Message> listItem = splitter.next();producer.send(listItem);} catch (Exception e) {e.printStackTrace();//处理error}}
4.1.7带有属性的消息
@Testpublic void testSendHasPropertyMsg()throws Exception{//1.创建一个生产者,并指定生产者组DefaultMQProducer producer = new DefaultMQProducer("propertyMsgProducerGroup");//2.为生产者指定 nameserver的ip和端口producer.setNamesrvAddr("localhost:9876");//3.启动生产者实例producer.start();//4.创建消息实例Message message = new Message("propertyMsgTopic", "Tag", "key1", "Hello RocketMq Property1".getBytes(RemotingHelper.DEFAULT_CHARSET));//5.设置属性message.putUserProperty("name","JamesLeBron");//6.发送SendResult send = producer.send(message);System.out.println(send);//7.关闭生产者producer.shutdown();}
4.2消费消息
4.2.1集群推动式普通消费
1.集群消费:同一个消费者组下均摊消息
2.推动式:Broker主动向消费者推送消息
3.代码
@Testpublic void testClusterPushCommonConsume() throws Exception{//1.实例化推动式消费模式DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("syncConsume");//2.设置消费者消费模式,默认也是ClusteringdefaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);//3.设置nameServer的ip和端口和读取的偏移量(默认最后)defaultMQPushConsumer.setNamesrvAddr("localhost:9876");defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//4.设置订阅的主题和标签defaultMQPushConsumer.subscribe("syncTopic","");//5.注册监听器defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(list.size());System.out.println(list);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//6.启动消费者实例defaultMQPushConsumer.start();//7.睡眠Thread.sleep(100000);}
