RocketMQ源码

1627035152656-8be0d5ef-c592-4a4d-a078-a3c42ad99c97.png

4.7.1版本

将源码导入IDEA后,需要先对源码进行编译。编译指令 clean install -Dmaven.test.skip=true

1626951137662-695b78b3-fb1e-477e-8b6d-041f8b0d3d32.png

  • broker:这个里面存放的就是RocketMQ的Broker相关的代码,这里的代码可以用来启动Broker进程
  • client:这个里面就是RocketMQ的Producer、Consumer这些客户端的代码,生产消息、消费消息的代码都在里面
  • common:这里放的是一些公共的代码
  • dev:这里放的是开发相关的一些信息
  • distribution:这里放的就是用来部署RocketMQ的一些东西,比如bin目录 ,conf目录,等等
  • example:这里放的是RocketMQ的一些例子 ,抄代码的地方
  • filter:这里放的是RocketMQ的一些过滤器的东西
  • logappender和logging:这里放的是RocketMQ的日志打印相关的东西
  • namesvr:这里放的就是NameServer的源码
  • openmessaging:这是开放消息标准,这个可以先忽略
  • remoting:这个很重要,这里放的是RocketMQ的远程网络通信模块的代码,基于netty实现的
  • srvutil:这里放的是一些工具类
  • store:这个也很重要,这里放的是消息在Broker上进行存储相关的一些源码
  • style、test、tools:这里放的是checkstyle代码检查的东西,一些测试相关的类,还有就是tools里放的一些命令行监控工具类

这些模块有些东西还是要关注的。例如docs文件夹下的文档,以及各个模块下都有非常丰富 的junit测试代码,这些都是非常有用的。

方法论

很多人写的源码解析的书籍或者技术博客,往往是站在自己已经理解源码之 后的角度去写的。分析源码的时候,是先分析一个模块的源码,再分析一个模块的源码,接着分析下一个模块的源码。如果是已经读懂这个技术的源码的人,是能看懂这本书的,但是如果是初次看这个技术源码的大多数人, 按照这种顺序来,是很难理解的!要分析RocketMQ源码的话,我们是直接就没头没脑的去翻看里面的源码吗?错误的,

正确的做法,应该是尝试**在Intellij IDEA中去启动RocketMQ,然后你就可以在源码中打一些断 点,去观察RocketMQ源码的运行过程,而且在这个过程中,还需要从RocketMQ实际运行和使用的角度,去观察他的源码运行的流程**

用场景来驱动源码的分析,RocketMQ使用的时候,第一个步骤一定是先启动NameServer,那么我们就先来分析NameServer启动这块的源码,然后第二个步骤一定是启动Broker,那么我们再来分析Broker启动的流程。

接着Broker启动之后,必然会把自己注册到NameServer上去,那我们接着分析Broker注册到NameServer这部分源码,然后 Broker必然会跟NameServer保持一个心跳,那我们继续分析Broker的心跳的源码。

包括我们的客户端发送消息到Broker,Broker的主从同步,实际上我们都可以用这种方式在源码里打断点,然后在Intellij IDEA中启动 和运行RocketMQ,来观察各种场景下的源码运行流程。

所以我们首先肯定要先能在Intellij IDEA中启动和调试RocketMQ的源码,接着才能进一步继续去分析他的源码运行流程。完全**按照我们平时使用RocketMQ的各种场景来进行源码的分析,在一个场景中把各种 源码串联起来分析**

核心流程按照由大到小,由粗到细的方式几条主线。各种高级特性有更深入的理解。

对有些有争议的问题,带着问题来源码中找答案是最好的。

例如我们经常有人讨论NameServer全部挂了之后,生产者和消费者是否能够用他本地的缓存继续工作一段时 间? 这样的一些问题,看过源码之后是不是有更清晰的了解?

可以以业务线的方式来逐步解读。

启动NameServer以及本地调试源码

在Intellij IDEA中对NameServer启动类配置环境变量

在上面那个界面的左上角有一个+号,我们可以点一下,然后选择Application,此时会出现一个新的配置模板

,配置模板此时是没有名字的,我们在Name中输入NamesrvStartup,Main class可以选择 broker 模块下的NamesrvStartup类,Use classpath of module中可以选择 namesrv 这个module ,也可以直接启动报错之后就有默认配置了:

1626951469837-7fd9ac82-e817-475b-b35a-43c0a3cf272f.png

/Users/deltaqin/workspace/myworkspace2021/code2021/mq-learn/run-rokcet-mq/rocketmq-nameserver

1626951677389-840498de-8395-43a0-a4e9-02a7360b4949.png

在rocketmq运行目录中创建需要的目录结构以及拷贝配置文件

创建:/Users/deltaqin/workspace/myworkspace2021/code2021/mq-learn/run-rokcet-mq/rocketmq-nameserver

1626951779829-e7999421-f7b8-4ebc-aac6-17ee0a40df20.png

把RocketMQ源码目录中的distrbution目录下的broker.conf、logback_namesvr.xml两个配置文件拷贝到刚才新建的conf目 录中去,接着就需要修改这两个配置文件。

1626951975169-dd0a0e0d-4ce4-4cba-9ac7-256ea2bc693c.png

logback_namesvr.xml这个文件,修改里面的日志的目录,修改为你的rocketmq运行目录中的logs目录。里面有很多的 ${user.home},你直接把这些${user.home}全部替换为你的rocketmq运行目录就可以了。

修改broker.conf文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0


# 这是nameserver的地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
rokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 这是存储路径,你设置为你的rocketmq运行目录的store子目录
storePathRootDir=/Users/deltaqin/workspace/myworkspace2021/code2021/mq-learn/run-rokcet-mq/rocketmq-nameserver/store
# 这是commitLog的存储路径
storePathCommitLog=/Users/deltaqin/workspace/myworkspace2021/code2021/mq-learn/run-rokcet-mq/rocketmq-nameserver/store/commitlog
# consume queue文件的存储路径
storePathConsumeQueue=/Users/deltaqin/workspace/myworkspace2021/code2021/mq-learn/run-rokcet-mq/rocketmq-nameserver/store/consumequeue
# 消息索引文件的存储路径
storePathIndex=/Users/deltaqin/workspace/myworkspace2021/code2021/mq-learn/run-rokcet-mq/rocketmq-nameserver/store/index
# checkpoint文件的存储路径
storeCheckpoint=/Users/deltaqin/workspace/myworkspace2021/code2021/mq-learn/run-rokcet-mq/rocketmq-nameserver/store/checkpoint
# abort文件的存储路径
abortFile=/Users/deltaqin/workspace/myworkspace2021/code2021/mq-learn/run-rokcet-mq/rocketmq-nameserver/abort
# 设置topic会自动创建
autoCreateTopicEnable=true

启动NameServer

Debug NamesvrStartup.main()了,就可以用debug模式去启动 NameServer了,他会自动找到ROCKETMQ_HOME环境变量,这个目录就是你的运行目录,里面有conf、logs、store几个目录。

他会读取conf里的配置文件,所有的日志都会打印在logs目录里,然后数据都会写在store目录里,启动成功之后,在Intellij IDEA的命 令行里就会看到下面的提示。

1626952464907-17d32af1-bf4d-4f15-9e2e-62435de81cc5.png

启动Broker以及本地调试源码

对Intellij IDEA中的broker模块进行配置

首先在Program arguments里,给Broker启动的时候指定一个配置文件存放地址:-c 你的rocketmq运行目录/conf/broker.conf,接着我们需要配置环境变量,也就是ROCKETMQ_HOME,此时我们可以在Environment Variables里面添加一个ROCKETMQ_HOME 环境变量,他的值就是我们的rocketmq运行目录就可以了,就是里面有conf、store、logs几个目录的

这个时候Broker启动会收到一个-c以及配置文件的参数,而 且他知道环境变量ROCKETMQ_HOME,知道运行目录是哪个,接着他就会基于这个配置文件来启动,同时在这个运行目录中存储数据,包括写入日志。

broker配置文件的内容

broker.conf 主要是配置了NameServer的地址,然后配置了Broker的数据存储路径,包括commitlog文件、consume queue文 件、index文件、checkpoint文件的存储路径,

所以只要我们基于上述的broker配置文件来启动broker,那么他就会跟指定的nameserver来进行通信,然后在指定的目录里存放各种 数据文件,包括在运行目录的logs目录里写入他自己的日志。

distribution里,有一个logback-broker.xml,需要把这个拷贝到运行目录的conf 目录中去,然后修改里面的地址,把${user.hom}都修改为你的rocketmq运行目录。

使用debug模式启动Broker

接着我们就可以使用debug模式启动BrokerStartup类了,右击他点击Debug BrokerStartup.main(),就可以启动他。

然后我们在rocketmq运行目录下的logs中,会找到一个子目录是rocketmqlogs,里面有一个broker.log,就可以看到Broker的启动日 志了

1626953623622-a0caae74-b219-43e9-ade0-2d3ec5ef4dbf.png

这就说明Broker已经启动成功了

1626953693918-6763c75b-fb8b-4a4d-b2eb-9774d6d01731.png

基于本地运行的RocketMQ进行消息的生产与消费

1
2
3
4
git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-externals/rocketmq-console
mvn package -DskipTests
java -jar rocketmq-console-ng-2.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=127.0.0.1:9876

或者下载下来使用idea导入直接打包:

http://127.0.0.1:8080/#/

1626954584308-4d98670b-0c22-446b-ae71-c6d8b56dc28e.png

新建topic:

1626954629612-1d3b603b-a0dd-4b20-ba90-32cedfe5e3b8.png

发送消息:

1626954773784-1a99a339-ecb1-4653-95f0-9ae233d3fc6c.png

1626954742122-b34a4360-1054-4360-9bb4-3f4e6c2477e0.png

接收消息:

1626954814155-0ccaac1f-878f-40d6-bcd7-35044d31df3c.png

NameServer启动场景

  • 一是维护Broker的服务地址并进行及时的更新。
  • 二是给Producer和Consumer提供服务获取Broker列表。

1627730457153-1c28f406-d246-46fa-a87a-e8e4e1ce86d9.png

1627014435596-ccb2d8b1-2521-46b8-99b3-c01ecb0643ae.png

后续Broker启动的时候,都是要向NameServer注册的,然后Producer发送消息的时候,需要从 NameServer获取Broker机器信息,才能发送消息到Broker去。

脚本启动–》NamesrvStartup

那么NameServer启动的时候,是通过哪个脚本来启动的呢?

基于rocketmq-master源码中的distribution/bin目录中的mqnamesrv这个脚本来启动 的,在这个脚本中有极为关键的一行命令用于启动NameServer进程,如下。

1626956192036-f0f7a9cb-ee99-4962-bd60-a178a32511f4.png

sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@

runserver.sh脚本启动了NamesrvStartup这个Java 类,那么runserver.sh这个脚本中最为关键的启动NamesrvStartup类的命令是什么呢,如下

1626956268258-1d702bb4-7cac-43f4-8071-8096cf83b9d1.png

简化:

java -server -Xms4g -Xmx4g -Xmn2g org.apache.rocketmq.namesrv.NamesrvStartup

通过java命令 + 一个有main()方法的类,就是会启动一个JVM进程,通过这个JVM进程来执行NamesrvStartup类中的main() 方法,这个main()方法里就完成了NameServer启动的所有流程和工作

使用mqnamesrv脚本启动NameServer的时候,本质就是基于java命令启动了一个JVM进程,执行 NamesrvStartup类中的main()方法,完成NameServer启动的全部流程和逻辑,同时启动NameServer这个JVM进程的时 候,有一大堆的默认JVM参数,你当然可以在这里修改这些JVM参数,甚至进行优化。

NameServer是如何通过脚本来启动的,往往源码分析都是从他的启动脚本开始分析的。

1627729957462-335ebbe9-f549-4453-968f-d37bcc786853.png

重点–NamesrvController

NamesrvController controller = createNamesrvController(args);

整个NameServer的核心就是一个NamesrvController对象。这个controller对象就跟java Web开发中 的Controller功能类似,都是**响应客户端请求的。**

在创建NamesrvController对象时,有两个关键的配置

  • NamesrvConfig 这个是NameServer自己运行需要的配置信息。
  • NettyServerConfig 包含Netty服务端的配置参数,默认占用了9876端口。可以在配置文件中覆盖。

然后在启动服务时,启动几个重要组件:

  • RemotingServer 这个就是用来响应请求的。
  • 还有一个定时任务会定时扫描不活动的Broker。这个Broker管理是通过routeInfoManager这个功 能组件。

在关闭服务时,关闭了四个东西

  • RemotingServer
  • remotingExecutor
  • Netty服务线程池;
  • scheduledExecutorService 定时任务;
  • fileWatchService 这个是用来跟踪TLS配置的。这是跟权限相关的,我们暂不关注。

NamesrvController是如何被创建出来的?

阅读源码的一个技巧:哪些需要细看,哪些可以暂时先跳过,在阅读源码的时候,有些源码 是要细看的,但是有些源码你可以大致猜测一下他的作用,就直接略过去了,抓住真正的重点去看!

createNamesrvController()方法,进入之后,刚开始就有一段让人看不太懂的代码,有的人喜欢钻牛角尖的,直接去分析上面代码中的一些细节,比如看看 ServerUtil.buildCommandlineOptions(new Options())是在干什么,或者看看ServerUtil.parseCmdLine()是在干什 么,那你就误入迷途了。因为很明显代码**并不存在什么核心逻辑**,你从他的代码的字面意思就可以大致猜测出来,他里面包含了很多 CommandLine相关的字眼,那么顾名思义,这就是一段跟命令行参数相关的代码!就是解析一下我们传递进去的一些命令 行参数而已!

接着创建了NamesrvConfig和NettyServerConfig两个关键的配置类!

通过nettyServerConfig.setListenPort(9876)这行代码就可以发现,NameServer他默认固定的监听请求的 端口号就是9876,因为他直接在代码里写死了这个端口号了,所以NettyServer应该就是监听了9876这个端口号,来 接收Broker和客户端的请求的!

NameServer的核心配置到底是如何进行解析的?

在启动NameServer的时候,用-c选项带上了一个配置文件的 地址,然后此时他启动的时候,运行到上面的代码,就会把你配置文件里的配置,放入两个核心配置类里去。 比如你有一个配置文件是:nameserver.properties,里面有一个配置是serverWorkerThreads=16,那么就会读取出来这个配置,然后覆盖到NettyServerConfig里去!

NameServer启动的时候后,刚开始就是在初始化和解析 NameServerConfig、NettyServerConfig相关的配置信息,但是一般情况下,我们其实不会特意设置什么配置,所以 他这里一般都是用默认配置的!

跟NameServer启动日志配合起来看

其实我们知道NameServer刚启动就会初始化和解析一些核心配置信息,尤其是NettyServer的一些网络配置信息,然后初始化完毕配置信息之后,他就会打印这些配置信息,

NameServer的启动日志,通过分析源码以及其中的日志打印

Broker启动场景

Broker配置与启动

Broker是整个RocketMQ的业务核心,所有消息存储、转发这些最为重要的业务都是在Broker中进行处 理的。

而Broker的内部架构,有点类似于JavaWeb开发的MVC架构。有Controller负责响应请求,各种Service 组件负责具体业务,然后还有负责消息存盘的功能模块则类似于Dao。

通过Broker的启动过程,观察总结出Broker的内部结构。

1627731087364-ca5264c2-9bb3-412c-9590-bfaa125e017b.png

Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试。 启动过程关键点: 重点也是围绕一个BrokerController对象,先创建,然后再启动。

  • 首先:在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置:
    • BrokerConfig、
    • NettyServerConfig :Netty服务端占用了10911端口。同样也可以在配置文件中覆盖。
    • NettyClientConfig、
    • MessageStoreConfig
  • 然后:在BrokerController.start方法可以看到启动了一大堆Broker的核心服务
    • this.messageStore.start();启动核心的消息存储组件
    • this.remotingServer.start(); this.fastRemotingServer.start(); 启动两个Netty服务
    • this.brokerOuterAPI.start();启动客户端,往外发请求 BrokerController.this.registerBrokerAll: 向NameServer注册心跳。
    • this.brokerStatsManager.start(); this.brokerFastFailure.start();这也是一些负责具体业务的功能组件

我们现在不需要了解这些核心组件的具体功能,只要有个大概,Broker中有一大堆的功能组件负责具体 的业务。后面等到分析具体业务时再去深入每个服务的细节。

1627015612271-d51d17c9-bbaf-4835-b571-82b8c159719b.png

Broker的注册

**注意是一个定时的任务,broker向nameserver注册和心跳,**不是第一次注册就看做是心跳。

NameServer会维护Broker的路由列表,并对路由列表进行实时更新。

1627022960792-cb21c8b3-9420-4b13-81d9-bdbf264e4caa.png

1627731252936-940a2707-d123-49ab-a45f-6758227975d7.png

在NameServer中也会启动一个定时任务,扫描不活动的Broker。具体观察 NamesrvController.initialize方法

BrokerOuter API是如何发送注册请求的?

1627023118589-8d45d9d0-c493-4a73-91a0-b5ae0cd52736.png

NameServer是如何检测心跳的

开启了一个定时任务,一个120s的定时任务,检测当前的列表里面的过期的broker移除出去,

1627023263413-0d09fcd0-c134-4ce2-83f0-dd2549489b6d.png

内部逻辑:

1627023239716-c7486201-b776-44bc-abaa-4298347ff629.png

客户端Producer的启动和初始化

  • Producer刚启动初始化的时候,就会去拉取每个Topic的路由数据呢?还是等你第一次往一个Topic发送消息的时候再拉取路由数据呢
    • 不可能是刚初始化启动的时候就拉取Topic的路由数据,因为你刚开始启动的时候,不知道要发送消息到哪个Topic去啊!
    • 一定是在你第一次发送消息到Topic的时候,才会去拉取一个Topic的路由数据,包括这个Topic有几个 MessageQueue,每个MessageQueue在哪个Broker上,然后从中选择一个MessageQueue,跟那台Broker建立网 络连接,发送消息过去。
  • Producer发送消息必然要跟Broker建立网络,这个是在Producer刚启动的时候就立马跟 所有的Broker建立网络连接吗?不是的,因为此时你也不知道你要跟哪个Broker进行通信。
    • 所以其实很多核心的逻辑,包括Topic路由数据拉取,MessageQueue选择,以及跟Broker建立网络连接,通过网络 连接发送消息到Broker去,这些逻辑都是在Producer发送消息的时候才会有。

所以我们根本没有必要对Producer的初始化过程做太过于详细的分析。其实初始化的过程极为的复杂,但是我们却真的不用过于的深究,因为其实比如拉取Topic的路由数据,选择 MessageQueue,跟Broker构建长连接,发送消息过去,这些核心的逻辑,都是封装在发送消息的方法中的。

Producer发消息场景

Producer有两种:

  • 一种是普通发送者:DefaultMQProducer。这个只需要构建一个Netty客户端,往Broker发送消息 就行了。注意,异步回调只是在Producer接收到Broker的响应后自行调整流程,不需要提供Netty 服务。
  • 另一种是事务消息发送者: TransactionMQProducer。这个需要构建一个Netty客户端,往 Broker发送消息。同时也要构建Netty服务端,供Broker回查本地事务状态。

只关注DefaultMQProducer的整个过程。

整个Producer的流程,大致分两个步骤

  • start方法,进行一大堆的准备工作
  • 各种各样的send方法,进行消息发送。

那我们重点关注以下几个问题:

  • 首先 Broker的核心启动流程:在mQClientFactory的start方法中,启动了生产者的一大堆重要服务。然后在DefaultMQProducerImpl的start方法中,又回到了生产者的mqClientFactory的启动过程,这中间有服务状态的管理。

  • 其次 Borker路由信息的管理: Producer需要拉取Broker列表,然后跟Broker建立连接等等很多 核心的流程,其实都是在发送消息时建立的。因为在启动时,还不知道要拉取哪个Topic的Broker列表 呢。所以对于这个问题,我们关注的重点,不应该是start方法,而是send方法。

    • 而对NameServer的地址管理,则是散布在启动和发送的多个过程当中,并且NameServer地址可以通 过一个Http服务来获取。
    • Send方法中,首先需要获得Topic的路由信息。这会从本地缓存中获取,如果本地缓存中没有,就从 NameServer中去申请。核心在 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo方 法

从NameServer拉取topic路由信息

1627025163081-d675723d-f62c-491a-9103-f09b0c868f95.png

1627731905209-b7883cc3-584f-4a47-a8b6-cfd52c133616.png

Producer根据负载均衡算法选择messageQueue从而确定一个Broker机器

Topic是一个逻辑上的概念,一个Topic的数据往往是分布式存储在多台Broker机器上的,因此 Topic本质是由多个MessageQueue组成的。每个MessageQueue都可以在不同的Broker机器上,当然也可能一个 Topic 的多个MessageQueue在一个Broker机器上。

Producer的负载均衡策略,默认会把消息平均的发送到所 有MessageQueue里的。

获取路由信息后,会**选出一个MessageQueue去发送消息。这个选MessageQueue的方法就是一个索 引自增然后取模的方式。只要你知道了要发送消息到哪个MessageQueue上去,然后就知道这个MessageQueue在哪台Broker机器上,接着 就跟那台Broker机器建立连接**,发送消息给他就可以了。

在发送Netty请求时,实际上是指定的MessageQueue,而不是Topic。Topic只是用来找 MessageQueue。

然后根据MessageQueue再找所在的Broker,往Broker发送请求。

org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue

1627732208660-17bfd696-eb70-4a43-883a-18ab844c93d5.png

1627026456592-0c859c07-7390-49dc-b01f-00392711bb9a.png

Producer 跟Broker建立网络连接发送消息到Broker

1627026551733-ae6dd072-0d0f-4b87-bbf6-dca7e24770e5.png

消息存储到磁盘

  • commitLog:消息存储目录
  • config:运行期间一些配置信息
  • consumerqueue:消息消费队列存储目录
  • index:消息索引文件存储目录
  • abort:如果存在改文件寿命Broker非正常关闭
  • checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一 次刷盘时间,index索引文件最后一次刷盘时间戳。

单个消息存储文件、 消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。

RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。

RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映 射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。

CommitLog,消息存储文件,**RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消 息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费 构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为 消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。**当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列 文件与索引文件。

为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是 异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性, 分别采用不同策略来恢复文件。

RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者 默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被 消费。

Broker获取到一条消息之后,他是如何存储这条消息的?

messageStore就是负责消息存储的核心组件。

CommitLog的doAppend方法就是Broker写入消息的实际入口。这个方法最终会把消息追加到 MappedFile映射的一块内存里,并没有直接写入磁盘。写入消息的过程是串行的,一次只会允许一个 线程写入。

Broker通过Netty网络服务器获取到 一条消息,接着就会把这条消息写入到一个CommitLog文件里去,一个Broker机器上就只有一个CommitLog文件,所有Topic的消息 都会写入到一个文件里去

1627029922764-5c1f0ecb-cda2-4e26-8104-c2a01cf4cf03.png

在Broker上写入消息到CommitLog文件的时候,会申请一个putMessageLock锁,都是串行的,不会让你并发的写入,并发写入文件必然会 有数据错乱的问题,下面是源码片段。

1627029955492-d94f1bad-bd76-43f2-bd08-b3710f4376be.png

对消息做出一通处理,包括设置消息的存储时间、创建全局唯一的消息ID、计算消息的总长度,然后会走 一段很关键的源码,把消息写入到MappedFile

1627030126862-557e35e0-032c-4fd6-bbc2-bd5005123b89.png

cb.doAppend()这行代码,这行代码其实是把消息追加到MappedFile映射的一块内存里去,并没有 直接刷入磁盘

1627030149405-ce9aab8d-9337-4907-a624-9ea92cd01087.png

什么时候才会把内存里的数据刷入磁盘,其实要看我们配置的刷盘策略,另外就是不 管是同步刷盘还是异步刷盘,假设你配置了主从同步,一旦你写入完消息到CommitLog之后,接下来都会进行主从同 步复制的。

分发ConsumeQueue和IndexFile,一条消息写入CommitLog文件之后,如何实时更新索引文件

**当我们把消息写入到CommitLog之后,在DefaultMessageStore的start方法中,有一个后台线程**reputMessageService 每隔1毫秒就会 去拉取CommitLog中最新更新的一批消息,然后分别转发到ConsumeQueue和IndexFile里去,这就是他底层的实现 原理。

并且,如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStore的load方法提供了 恢复索引文件的方法,入口在load方法。

1627031453852-28989502-a477-4b9e-9664-e968d8c22f16.png

在DefaultMessageStore的start()方法里,在里面就是启动了这个ReputMessageService 线程。start()方法就是在Broker启动的时候调用的,所以相当于是Broker启动就会启动这个线 程。

1627028829486-e9d86e55-f6b3-4134-97f7-be0cec24ee37.png

执行:

1627028918716-830837aa-ea35-4858-8265-1a96af0027de.png

Broker启动的时候会开启一个线程 ReputMessageService,他会把CommitLog更新事件转发出去,然后 让任务处理器去更新ConsumeQueue和IndexFile

1627028981322-1060f9a8-35d1-4407-826b-834f796a49cd.png

doreput方法

1627029158511-8ebfd160-c645-4ce5-bced-4589e7163253.png

CommitLogDispatcher的实现类有两个,分别是CommitLogDispatcherBuildConsumeQueue 和CommitLogDispatcherBuildIndex,他们俩分别会负责把消息转发到ConsumeQueue和IndexFile

1627029183066-8c89b66b-dc21-419b-96df-f89c0aaa3e88.png

1627029348134-c317e546-cb5f-4a34-b2f9-e1e8d5716305.png

consumeQueue写入:找到当前Topic的 messageQueueId对应的一个ConsumeQueue文件,一个MessageQueue会对应多个ConsumeQueue文件,找到一个即可,然后消息写入其中。

1627029372809-2681080e-a9ed-4234-be19-8c4ae5c529d0.png

IndexFile的写入逻辑,其实也很简单,无非就是在IndexFile里去构建对应的索引罢了

实现同步刷盘以及异步刷盘两种策略的?

入口:CommitLog.putMessage -> CommitLog.handleDiskFlush

1627030149405-ce9aab8d-9337-4907-a624-9ea92cd01087.png

其中主要涉及到是否开启了对外内存。TransientStorePoolEnable。如果开启了堆外内存,会在启动 时申请一个跟CommitLog文件大小一致的堆外内存,这部分内存就可以确保不会被交换到虚拟内存 中。

1627031445809-bc01ebda-ba6c-4ffd-a083-b872e345105e.png

数据直接写入CommitLog,而且直接进入的 是MappedFile映射的一块内存,不是直接进入磁盘,同时有一个后台线程会把CommitLog里更新的数据给写入到 ConsumeQueue和IndexFile里去

同步

根据你配置的两种不同的刷盘策略分别处理的,我们先看第一种,就是同步刷盘的策略是如何处理的。

1627030572119-b0b956c5-e08a-4898-9b0a-f2aefee0525f.png

1627030596624-acab8268-bc9e-404a-9856-14c37fd10eca.png

唤醒线程之后执行刷盘

1627030629620-13bf9b4a-4673-44d3-b8d7-0e0675494e2e.png

CommitLog.this.mappedFileQueue.flush_**(0)**_;

这个MappedByteBuffer就是JDK NIO包下的API,他的force()方法就是强迫把你写入内存的数据刷入到磁盘文件里 去,到此就是同步刷盘成功了。