RocketMQ源码(一):源码环境搭建

发布时间 2023-05-08 11:38:58作者: 无虑的小猪

一、源码地址下载

  RocketMQ官网下载地址:https://github.com/apache/rocketmq/tags

 0

  当前搭建的是4.8.0版本的rocketmq,下载zip压缩包至本地,并解压。

  0

  当解压后的RocketMQ源码导入IDEA。 

  0

二、源码环境搭建

1、启动NameServer

1、NameServer启动源码入口

  启动namesrv命令如下:

nohup ./mqnamesrv >../logs/namesrv.log &

  查看mqnamesrv文件详情,发现启动namesrv是通过org.apache.rocketmq.namesrv.NamesrvStartup调起的,NamesrvStartup为namesrv启动源码分析入口。

  

2、执行NamesrvStartup,启动NameServer

  执行NamesrvStartup,启动namesrv报错,原因是未配置RocketMQ的环境变量。

 

2.1、RocketMQ环境变量配置

2.1.1、新建conf文件夹

  在当前RocketMQ源码工程中创建conf文件夹,用于存放distributor子模块相关配置文件。 

 

2.1.2、复制配置文件

  将distribution模块中的logback_namesrv.xml、logback_broker.xml、broker.conf配置文件复制到新创建的conf文件夹下。

 

2.1.3、新建store文件夹

  在conf文件夹下新建store文件夹,用于RocketMQ对消息的持久化存储。

 

2.1.4、修改配置文件详情

  调整conf/broker.conf,broker.conf详情如下

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
## 消息持久化存储根路径
storePathRootDir=D:/Source/mq_source/rocketmq/store
## 消息持久化存储路径
storePathCommitLog=D:/Source/mq_source/rocketmq/store/commitlog
namesrvAddr=127.0.0.1:9876
brokerIP1=127.0.0.1
autoCreateTopicEnable=true

   根据实际需要,调整日志输出位置,logback_namesrv.xml详情如下: 

   

   logback_broker.xml详情如下: 

  

2.1.5、配置环境变量

  配置NamesrvStartup启动时的环境变量如下:

  

6、指定配置文件

  指定启动的配置文件,创建的conf配置文件夹下的broker.conf配置。

 

2.2、启动namesrv

  出现如下提示,表示已经成功启动了namesrv。

  

  在日志输出位置查看namesrv.log启动日志详情:

  

2、启动Broker

1、broker启动源码入口

  启动broker命令如下:

nohup ./mqbroker -n ip:port -c ../conf/broker.conf  >../logs/broker.log &

  查看mqbroker文件详情,发现启动broker是通过org.apache.rocketmq.broker.BrokerStartup调起的,BrokerStartup为broker启动源码分析入口。

  

2、执行BrokerStartup,启动broker

2.1、broker子模块out文件夹依赖问题

  启动broker,发现如下报错;

 

  查看broker子模块,发现out文件夹未显示:

  0

  源码文件中的out文件夹是存在的。

  0

  在IDEA中可以设置一些忽略文件,这些文件不被显示,查看IDEA的配置详情: Editor -> File Types,发现out文件夹设置忽略了,移除out文件设置。

  0

  可以看到out文件夹正常显示,报错问题已解决。

  0

2.2、配置环境变量

  0

2.3、启动broker

  查看启动日志,发现未找到指定的文件夹。

  0

  在RocketMQ中,默认获取根路径的配置信息,一般情况下,在默认的根路径没有指定的配置文件。

System.getProperty("user.home")

  通过IDEA启动参数设置,调整user.home属性的值,调整内容如下:

  0

  在D:\Source\mq_source\rocketmq创建store文件夹下,创建commitlog文件夹,重启启动rocketmq。

再次查看启动日志

  0

  IDAE控制台出现如下提示,表示已经成功启动了broker。

  0

3、Producer/Consumer消息发送测试

  启动producer发现,broker没有自动创建Topic,配置文件中的autoCreateTopicEnable=true设置未生效。

  0

  配置broker启动时加载的配置文件,详情如下:

  0

  重启Broker,再次通过producer发送消息,发送详情如下:

  0

  consumer消费结果:

 0

3.1、Producer示例代码

  生产者代码详情

 1 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 2 import org.apache.rocketmq.client.producer.SendResult;
 3 import org.apache.rocketmq.common.message.Message;
 4 import org.apache.rocketmq.remoting.common.RemotingHelper;
 5 
 6 /**
 7  * 同步发送
 8  */
 9 public class SyncProducer {
10     public static void main(String[] args) throws Exception{
11         // 实例化消息生产者Producer
12         DefaultMQProducer producer = new DefaultMQProducer("group_test");
13 
14         try{
15             // 设置NameServer的地址
16             producer.setNamesrvAddr("127.0.0.1:9876");
17             // 启动Producer实例
18             producer.start();
19 
20             for (int i = 0; i < 2; i++) {
21                 // 创建消息,并指定Topic,Tag和消息体
22                 Message msg = new Message("TopicTest",
23                         "TagA",
24                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
25                 );
26                 // 发送消息到一个Broker
27                 SendResult sendResult = producer.send(msg);
28                 System.out.printf("%s%n", sendResult);
29             }
30         }finally {
31             //如果不再发送消息,关闭Producer实例。
32             producer.shutdown();
33         }
34     }
35 }

3.2、Consumer示例代码

  消费者代码详情:

 1 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 2 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 4 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 5 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 6 import org.apache.rocketmq.common.message.MessageExt;
 7 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 8 import java.nio.charset.StandardCharsets;
 9 import java.util.List;
10 
11 /**
12  * 集群模式消费
13  */
14 public class ClusterComuser {
15     public static void main(String[] args) throws Exception {
16         // 实例化消费者,指定组名: group_test
17         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
18         // 指定Namesrv地址信息.
19         consumer.setNamesrvAddr("127.0.0.1:9876");
20         // 订阅Topic
21         consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
22         // 如果非第一次启动,那么按照上次消费的位置继续消费
23         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
24         // 默认消费模式 - 集群模式消费
25         consumer.setMessageModel(MessageModel.CLUSTERING);
26 
27         // 注册回调函数,处理消息
28         consumer.registerMessageListener(new MessageListenerConcurrently() {
29             @Override
30             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
31                                                             ConsumeConcurrentlyContext context) {
32                 try {
33                     for(MessageExt msg : msgs) {
34                         Thread.sleep(500);
35                         System.out.println("收到消息:" + " topic :" + msg.getTopic() + " ,tags : " + msg.getTags() + " ,msg : " + new String(msg.getBody(), StandardCharsets.UTF_8));
36                     }
37                 } catch (Exception e) {
38                     e.printStackTrace();
39                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;
40 
41                 }
42                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
43             }
44         });
45         // 启动消息者
46         consumer.start();
47         System.out.printf("Consumer Started.%n");
48     }
49 }