源码搭建
源码地址:https://github.com/apache/rocketmq
关键模块
- broker :broke 启动进程
- client:客户端,包括生产者消费者
- example:示例代码
- namesrv:nameServer
- store:消息存储实现相关
其他在docs文件有相关Junit测试代码,重要
带注解的源码:
rocketmq-all-4.7.1-source-release-带注释.zip
调试时,先在项目目录下创建一个conf目录,
并从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
1:nameServ启动
配置环境变量

核心问题:
- 维护broker服务地址以及及时更新
- 给producer,consumer 提供获取broker列表
流程
重要类:
NamesrvController
类似web中的conroller,相应客户端请求的NamesrvConfig
- NettyServerConfig
2:broker启动
配置broker.conf文件
配置对应的文件路径
brokerClusterName = DefaultCluster
brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
自动创建Topic
autoCreateTopicEnable=true
nameServ地址
namesrvAddr=127.0.0.1:9876
存储路径
storePathRootDir=E:\RocketMQ\data\rocketmq\dataDir
commitLog路径
storePathCommitLog=E:\RocketMQ\data\rocketmq\dataDir\commitlog
消息队列存储路径
storePathConsumeQueue=E:\RocketMQ\data\rocketmq\dataDir\consumequeue
消息索引存储路径
storePathIndex=E:\RocketMQ\data\rocketmq\dataDir\index
checkpoint文件路径
storeCheckpoint=E:\RocketMQ\data\rocketmq\dataDir\checkpoint
abort文件存储路径
abortFile=E:\RocketMQ\data\rocketmq\dataDir\abort
启动类:BrokerStartup
启动方法:
BrokerStartup.createBrokerController
管理类:
- BrokerController
Broker核心配置:
BrokerConfig
NettyServerConfig :Netty服务端占用了10911端口。
NettyClientConfig
MessageStoreConfig
this.messageStore.start();启动核心的消息存储组件this.remotingServer.start();this.fastRemotingServer.start(); 启动两个Netty服务this.brokerOuterAPI.start();启动客户端,往外发请求BrokerController.this.registerBrokerAll: 向NameServer注册心跳。this.brokerStatsManager.start();this.brokerFastFailure.start();这也是一些负责具体业务的功能组件
Broker架构图

3:broker注册
方法:
BrokerController.this.registerBrokerAll
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {//Topic配置相关的东东TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}//这里才是比较关键的地方。先判断是否需要注册,然后调用doRegisterBrokerAll方法真正去注册。if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}

4:producer
注意Producer有两种。 一个是普通发送者。这个只需要构建一个Netty客户端。
- 普通的发送者DefaultMQProducer
- 事务的发送者TransactionMQProducer
public void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));this.defaultMQProducerImpl.start();if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}}
主线流程
- produccer需要拉去broker列表
- 并不建立连接,
Send方法:
先获取topic路由,可从本地缓存区,没有再去NameServer申请
然后选取一个MessageQueue,轮训采用取模方式
最后netty请求发送消息,到broker后,由commitlog写入commitlog文件

//Producer选择MessageQueue的方法public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {//这个sendLatencyFaultEnable默认是关闭的,Broker故障延迟机制,表示一种发送消息失败后一定时间内不在往同一个Queue重复发送的机制if (this.sendLatencyFaultEnable) {try {//K2 这里可以看到,Producer选择MessageQueue的方法就是自增,然后取模。并且只有这一种方法。int index = tpInfo.getSendWhichQueue().getAndIncrement();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);//Broker轮询。尽量将请求平均分配给不同的Brokerif (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {//这里计算也还是自增取模final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);}
5:消息存储
方法入口: DefaultMessageStore.putMessage
最终存储的文件:
- commitLog:消息存储目录
broker写入消息的实际接口,
会把消息追加到MappedFile内存,不是直接写入磁盘
串行化的 - config:运行期间一些配置信息
- consumerqueue:消息消费队列存储目录
commitlog写入后,后台线程reputMessageService会拉去commitlog的最新消息,
分别转发到ComsumeQueue和indexFile
如果宕机,会存在不一致问题
DefaultMappedStore #load提供恢复方法
文件同步刷盘,异步刷盘
入口CommitLog.putMessage -> CommitLog.handleDiskFlush
是否启动对外内存:TransientStorePoolEnable
如果开启会申请一个commitlog大小文件一致的内存
- index:消息索引文件存储目录
- abort:如果存在改文件寿命Broker非正常关闭
过期文件删除
入口:
DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()
会检查commitlog,comsumerQueue,超过72小时会删除
- checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
RocketMQ文件包括:
- commitlog:消息文件
- comsumerQueue:消息消费队列文件
- IndexFile:hash索引文件
- CheckPoint:检测点文件
- abort:关闭异常文件
MappedFile:涉及零拷贝的实现
6:消费者
集群模式
广播模式
推模式:DefaultMQPushConsumerImpl
拉模式
消息顺序:只支持一个队列上的局部消息顺序
ConsumeMessageOrderlyService 使用加锁机制保证
RebalanceImpl:负载均衡,
启动:
DefaultMQPushConsumer.start方法
DefaultMQPushConsumerImpl
客户断mQClientFactory 主要是启动了一大堆的服务
消息拉取
拉模式:PullMessageService
messageQueue:负责拉取消息,
processQueue:拉取的消息存在processQueue
长轮训拉取机制
配置项longPollingEnable=true开启长轮训模式
PullMessageProcessor#processRequest()
case ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;//K2 消息长轮询if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();//没有拉取到消息,就再创建一个拉取请求PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);//将请求放入ManyRequestPull请求队列this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}
PullRequestHoldService会每个五秒查看是否有新消息,
NotifyMessageArrivingListener监听机制:
if (dispatchRequest.isSuccess()) {if (size > 0) {//分发CommitLog写入消息DefaultMessageStore.this.doDispatch(dispatchRequest);//K2 长轮询: 如果有消息到了主节点,并且开启了长轮询。if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {//唤醒NotifyMessageArrivingListener的arriving方法,进行一次请求线程的检查DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}
负责均衡
RebalanceService 负责客户端负责聚恒
RebalanceImpl
consumer的allocateMessageQueueStrategy属性来选择。
五种策略:
- AllocateMessageQueueAveragely,最常用,平均分
- AllocateMessageQueueAveragelyByCircle平均轮训分配
是把MessageQueue按照组内消费平均轮训
7:延迟消息
入口scheduleMessageService
start方法 有个CAS锁,只有一个线程进行消息搬运
可以在这个地方进行改写
写入的消息会转入SCHEDULE_TOPIC_XXXX这个Topic
默认是18个级别的队列
入口:CommitLog.putMessage
ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务
将消息从延迟队列中写入正常Topic中
入口:ScheduleMessageService#DeliverDelayedMessageTimerTask.executeOnTimeup

ConsumeMessageOrderlyService.
