摘要: 原创出处 www.iocoder.cn/RocketMQ/bu… 「芋道源码」欢迎转载,保留摘要,谢谢!


0. 友情提示

阅读源码之前,建议胖友对 RocketMQ 的文档已经熟读。目前 RocketMQ 4 的中文文档很少,所以英文不太好的胖友,推荐看看如下资料:

1. 依赖工具

  • JDK :1.8+
  • Maven
  • IntelliJ IDEA

2. 源码拉取

从官方仓库 [github.com/apache/rock…) Fork 出属于自己的仓库。为什么要 Fork ?既然开始阅读、调试源码,我们可能会写一些注释,有了自己的仓库,可以进行自由的提交。*

使用 IntelliJ IDEAFork 出来的仓库拉取代码。拉取完成后,Maven 会下载依赖包,可能会花费一些时间,耐心等待下。


在等待的过程中,我来简单说下,搭建调试环境的过程:

  1. 启动 RocketMQ Namesrv
  2. 启动 RocketMQ Broker
  3. 启动 RocketMQ Producer
  4. 启动 RocketMQ Consumer

最小化的 RocketMQ 的环境,暂时不考虑 Namesrv 集群、Broker 集群、Consumer 集群。

* 另外,本文使用的 RocketMQ 版本是 4.4.0-SNAPSHOT

3. 启动 RocketMQ Namesrv

打开 org.apache.rocketmq.namesrv.NameServerInstanceTest 单元测试类,参考 #startup() 方法,我们编写 #main(String[] args) 静态方法,代码如下:

// NameServerInstanceTest.java

public static void main(String[] args) throws Exception {
// NamesrvConfig 配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// NettyServerConfig 配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876); // 设置端口
// 创建 NamesrvController 对象,并启动
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
namesrvController.initialize();
namesrvController.start();
// 睡觉,就不起来
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}

复制代码

然后,右键运行,RocketMQ Namesrv 就启动完成。输出日志如下:

17:54:03.354 [NettyEventExecutor] INFO  RocketmqRemoting - NettyEventExecutor service started
17:54:03.355 [FileWatchService] INFO  RocketmqCommon - FileWatchService service started
复制代码

最后,这是一个可选的步骤,命令行中输入 telnet 127.0.0.1 9876 ,看看是否能连接上 RocketMQ Namesrv 。

4. 启动 RocketMQ Broker

打开 org.apache.rocketmq.broker.BrokerControllerTest 单元测试类,参考 #testBrokerRestart() 方法,我们编写 #main(String[] args) 方法,代码如下:

// BrokerControllerTest.java

public static void main(String[] args) throws Exception {
// 设置版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
// NettyServerConfig 配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(10911);
// BrokerConfig 配置
final BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerName(“broker-a”);
brokerConfig.setNamesrvAddr(“127.0.0.1:9876”);
// MessageStoreConfig 配置
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setDeleteWhen(“04”);
messageStoreConfig.setFileReservedTime(48);
messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
messageStoreConfig.setDuplicationEnable(false);

// BrokerPathConfigHelper.setBrokerConfigPath(“/Users/yunai/ 百度云同步盘 / 开发 /Javascript/Story/incubator-rocketmq/conf/broker.conf”);
// 创建 BrokerController 对象,并启动
BrokerController brokerController = new BrokerController(//
brokerConfig, //
nettyServerConfig, //
new NettyClientConfig(), //
messageStoreConfig);
brokerController.initialize();
brokerController.start();
// 睡觉,就不起来
System.out.println(“你猜”);
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}

复制代码

然后,右键运行,RocketMQ Broker 就启动完成了。输出日志如下:

你猜
复制代码
  • 不要懵逼,我们打开下 RocketMQ Namesrv 那,已经输出日志如下:

    18:17:30.443 [NettyServerCodecThread_5] INFO  RocketmqRemoting - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:63847
    18:17:30.443 [NettyServerCodecThread_5] INFO  RocketmqRemoting - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:63847]
    18:17:30.457 [RemotingExecutorThread_4] DEBUG RocketmqNamesrv - receive request, 103 127.0.0.1:63847 RemotingCommand [code=103, language=JAVA, version=275, opaque=0, flag(B)=0, remark=null, extFields={brokerId=0, bodyCrc32=1880081823, clusterName=DefaultCluster, brokerAddr=192.168.3.26:10911, haServerAddr=192.168.3.26:10912, compressed=false, brokerName=broker-a}, serializeTypeCurrentRPC=JSON]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, BenchmarkTest QueueData [brokerName=broker-a, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, broker-a QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, TBW102 QueueData [brokerName=broker-a, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new topic registered, DefaultCluster QueueData [brokerName=broker-a, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0]
    18:17:30.458 [RemotingExecutorThread_4] INFO  RocketmqNamesrv - new broker registered, 192.168.3.26:10911 HAServer: 192.168.3.26:10912
    复制代码
    • 妥妥的,原来 RocketMQ Broker 已经启动完成,并且注册到 RocketMQ Namesrv 上。

最后,这是一个可选的步骤,命令行中输入 telnet 127.0.0.1 10911 ,看看是否能连接上 RocketMQ Broker 。

5. 启动 RocketMQ Producer

打开 org.apache.rocketmq.example.quickstart.Producer 示例类,代码如下:

// Producer.java

/**
* This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}.
*/

public class Producer {

<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> <span class="hljs-keyword">throws</span> MQClientException, InterruptedException </span>{

    <span class="hljs-comment">/*
     * Instantiate with a producer group name.
     */</span>
    DefaultMQProducer producer = <span class="hljs-keyword">new</span> DefaultMQProducer(<span class="hljs-string">"please_rename_unique_group_name"</span>);

    <span class="hljs-comment">/*
     * Specify name server addresses.
     * &lt;p/&gt;
     *
     * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
     * &lt;pre&gt;
     * {@code
     * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
     * }
     * &lt;/pre&gt;
     */</span>

    <span class="hljs-comment">/*
     * Launch the instance.
     */</span>
    producer.setNamesrvAddr(<span class="hljs-string">"127.0.0.1:9876"</span>); <span class="hljs-comment">// &lt;x&gt; 哈哈哈哈</span>
    producer.start();

    <span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">0</span>; i &lt; <span class="hljs-number">1000</span>; i++) {
        <span class="hljs-keyword">try</span> {

            <span class="hljs-comment">/*
             * Create a message instance, specifying topic, tag and message body.
             */</span>
            Message msg = <span class="hljs-keyword">new</span> Message(<span class="hljs-string">"TopicTest"</span> <span class="hljs-comment">/* Topic */</span>,
                <span class="hljs-string">"TagA"</span> <span class="hljs-comment">/* Tag */</span>,
                (<span class="hljs-string">"Hello RocketMQ "</span> + i).getBytes(RemotingHelper.DEFAULT_CHARSET) <span class="hljs-comment">/* Message body */</span>
            );

            <span class="hljs-comment">/*
             * Call send message to deliver message to one of brokers.
             */</span>
            SendResult sendResult = producer.send(msg);

            System.out.printf(<span class="hljs-string">"%s%n"</span>, sendResult);
        } <span class="hljs-keyword">catch</span> (Exception e) {
            e.printStackTrace();
            Thread.sleep(<span class="hljs-number">1000</span>);
        }
    }

    <span class="hljs-comment">/*
     * Shut down once the producer instance is not longer in use.
     */</span>
    producer.shutdown();
}

}

复制代码
  • 注意,在 <x> 哈哈哈哈处,我们增加了 producer.setNamesrvAddr("127.0.0.1:9876") 代码块,指明 Producer 使用的 RocketMQ Namesrv 。
  • * 可能会有胖友会问,为什么指定的不是 RocketMQ Broker 呢?请退回到 。

然后,右键运行,RocketMQ Producer 就启动完成。输出日志如下:

18:22:13.507 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
SendResult [sendStatus=SEND_OK, msgId=C0A8031AE91718B4AAC27A6364050000, offsetMsgId=C0A8031A00002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
// ... 中间省略 N 条 ... 
SendResult [sendStatus=SEND_OK, msgId=C0A8031AE91718B4AAC27A6369F603E6, offsetMsgId=C0A8031A00002A9F000000000002BD4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8031AE91718B4AAC27A6369F703E7, offsetMsgId=C0A8031A00002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=249]
18:22:15.558 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.3.26:10911] result: true
18:22:15.559 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
18:22:15.560 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.3.26:10909] result: true
复制代码

没有最后。

6. 启动 RocketMQ Consumer

打开 org.apache.rocketmq.example.quickstart.Consumer 示例类,代码如下:

// Consumer.java

public class Consumer {

<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> <span class="hljs-keyword">throws</span> InterruptedException, MQClientException </span>{

    <span class="hljs-comment">/*
     * Instantiate with specified consumer group name.
     */</span>
    DefaultMQPushConsumer consumer = <span class="hljs-keyword">new</span> DefaultMQPushConsumer(<span class="hljs-string">"please_rename_unique_group_name_4"</span>);

    <span class="hljs-comment">/*
     * Specify name server addresses.
     * &lt;p/&gt;
     *
     * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
     * &lt;pre&gt;
     * {@code
     * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
     * }
     * &lt;/pre&gt;
     */</span>

    <span class="hljs-comment">/*
     * Specify where to start in case the specified consumer group is a brand new one.
     */</span>
    consumer.setNamesrvAddr(<span class="hljs-string">"127.0.0.1:9876"</span>); <span class="hljs-comment">// &lt;x&gt; 哈哈哈哈</span>
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    <span class="hljs-comment">/*
     * Subscribe one more more topics to consume.
     */</span>
    consumer.subscribe(<span class="hljs-string">"TopicTest"</span>, <span class="hljs-string">"*"</span>);

    <span class="hljs-comment">/*
     *  Register callback to execute on arrival of messages fetched from brokers.
     */</span>
    consumer.registerMessageListener(<span class="hljs-keyword">new</span> MessageListenerConcurrently() {

        <span class="hljs-meta">@Override</span>
        <span class="hljs-function"><span class="hljs-keyword">public</span> ConsumeConcurrentlyStatus <span class="hljs-title">consumeMessage</span><span class="hljs-params">(List&lt;MessageExt&gt; msgs,
            ConsumeConcurrentlyContext context)</span> </span>{
            System.out.printf(<span class="hljs-string">"%s Receive New Messages: %s %n"</span>, Thread.currentThread().getName(), msgs);
            <span class="hljs-keyword">return</span> ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    <span class="hljs-comment">/*
     *  Launch the consumer instance.
     */</span>
    consumer.start();

    System.out.printf(<span class="hljs-string">"Consumer Started.%n"</span>);
}

}

复制代码
  • 注意,在 <x> 哈哈哈哈处,我们还增加了 consumer.setNamesrvAddr("127.0.0.1:9876") 代码块,指明 Consumer 使用的 RocketMQ Namesrv 。
  • * 再来一道送命题,为什么指定的不是 RocketMQ Broker 呢?

然后,右键运行,RocketMQ Consumer 就启动完成。输入日志如下:

18:37:12.196 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1543054934061, bornHost=/192.168.3.26:64103, storeTimestamp=1543054934065, storeHost=/192.168.3.26:10911, msgId=C0A8031A00002A9F0000000000000164, commitLogOffset=356, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1543055832771, UNIQ_KEY=C0A8031AE91718B4AAC27A63642D0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] 
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=2, storeSize=179, queueOffset=4, sysFlag=0, bornTimestamp=1543054934102, bornHost=/192.168.3.26:64103, storeTimestamp=1543054934103, storeHost=/192.168.3.26:10911, msgId=C0A8031A00002A9F0000000000000BD9, commitLogOffset=3033, bodyCRC=367242165, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1543055832779, UNIQ_KEY=C0A8031AE91718B4AAC27A6364560011, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 55], transactionId='null'}]] 
// ... 中间省略 N 条 ... 
CONSUME_START_TIME=1543055832779, UNIQ_KEY=C0A8031AE91718B4AAC27A636450000F, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 53], transactionId='null'}]] 
复制代码

没有最后。

666. 彩蛋

* 一直想写这篇,一直忘记掉。

妥妥的,徐妈是最胖的。

还是那句话,一定一定一定要看 提供的文档。先懂原理,才能更好的读懂源码。

源码,是原理的具象化
原理,是代码的抽象化

感谢    赞同    分享    收藏    关注    反对    举报    ...