RocketMQ简单入门

发布时间 2023-10-02 14:22:11作者: strongmore

服务端安装及配置

docker安装

docker pull rocketmqinc/rocketmq:4.4.0

指定版本号是为了后面确定配置文件的路径

启动namesrv

docker run -d -p 9876:9876 --name rocketmq-nameservice -e MAX_POSSIBLE_HEAP=100000000 rocketmqinc/rocketmq:4.4.0 sh mqnamesrv

运行成功执行mqnamesrv脚本

启动broker

broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 如果是本地程序调用云主机 mq,这个需要设置成 云主机 IP
# 如果Docker环境需要设置成宿主机IP
brokerIP1 = 42.192.20.119
# 开启acl权限控制
aclEnable=true

plain_acl.yml

globalWhiteRemoteAddresses:
accounts:
- accessKey: admin
  secretKey: szz123
  whiteRemoteAddress:
  admin: true
docker run -d -p 10911:10911 -p 10909:10909 -v  /root/test_rocketmq/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -v /root/test_rocketmq/plain_acl.yml:/opt/rocketmq-4.4.0/conf/plain_acl.yml --name rocketmq-broker --link rocketmq-nameservice:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

--link可以用来链接2个容器,使得源容器(被链接的容器)和接收容器(主动去链接的容器)之间可以互相通信,并且接收容器可以获取源容器的一些数据,如源容器的环境变量。

启动broker遇到的问题

java.lang.Class cannot be cast to org.apache.rocketmq.acl.AccessValidator

登录用户名accessKey,长度必须大于6个字符。登录密码secretKey,长度也必须大于6个字符。

客户端报错

CODE: 1  DESC: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=[10015:signature-failed] unable to calculate a request signature. 
error=Algorithm HmacSHA1 not available, org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:84)

缺少sunjce_provider.jar

docker exec -it -u root rocketmq-broker /bin/bash # 必须以root权限进入
find / -name sunjce_provider.jar
cp xxx/sunjce_provider.jar /opt/rocketmq-4.4.0/lib/ # 如果没有以root权限进入,这个就没有权限操作
exit
docker restart rocketmq-broker

又遇到了其他问题

CODE: 1  DESC: java.lang.NullPointerException, org.apache.rocketmq.acl.plain.PlainPermissionLoader.checkPerm(PlainPermissionLoader.java:131)

可能是因为rocketmq4.4.0版本对acl功能的支持还不完善 具体issue

安装更新的版本

docker run -d -p 9876:9876 --name rocketmq-nameservice -e MAX_POSSIBLE_HEAP=100000000 apache/rocketmq:4.9.2 sh mqnamesrv
docker run -d -p 10911:10911 -p 10909:10909 -v  /root/test_rocketmq/broker.conf:/home/rocketmq/rocketmq-4.9.2/conf/broker.conf -v /root/test_rocketmq/plain_acl.yml:/home/rocketmq/rocketmq-4.9.2/conf/plain_acl.yml --name rocketmq-broker --link rocketmq-nameservice:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" apache/rocketmq:4.9.2 sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/conf/broker.conf

注意,这个镜像的rocketmq目录和上面的不一致,运行过程也没有上面出现的那些问题。

新版本broker启动失败

/home/rocketmq/rocketmq-4.9.2/bin/runbroker.sh: line 156:    28 Killed                  $JAVA ${JAVA_OPT} $@

原因:内存设置太大

docker run -d -p 10911:10911 -p 10909:10909  -e "JAVA_OPT_EXT=-Xmx128m -Xms128m" -v  /root/test_rocketmq/broker.conf:/home/rocketmq/rocketmq-4.9.2/conf/broker.conf -v /root/test_rocketmq/plain_acl.yml:/home/rocketmq/rocketmq-4.9.2/conf/plain_acl.yml --name rocketmq-broker --link rocketmq-nameservice:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" apache/rocketmq:4.9.2 sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/conf/broker.conf

注意是 JAVA_OPT_EXT

安装控制台

docker pull styletang/rocketmq-console-ng
docker run -d -p 9999:8080 -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=ip:9876 -Drocketmq.config.isVIPChannel=false -Drocketmq.config.accessKey=my_admin -Drocketmq.config.secretKey=szz123456" --name rocketmq-console styletang/rocketmq-console-ng

通过 http://ip:9999 访问,注意,此镜像不支持acl权限控制,可能使用的rocketmq版本太低,所以使用其他镜像

docker pull apacherocketmq/rocketmq-dashboard
docker run -d -p 9999:8080 -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=ip:9876 -Drocketmq.config.isVIPChannel=false -Drocketmq.config.accessKey=my_admin -Drocketmq.config.secretKey=szz123456" --name rocketmq-console apacherocketmq/rocketmq-dashboard

也是通过 http://ip:9999 访问

客户端访问

使用java客户端发送及消费消息

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.9.4</version>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-acl</artifactId>
  <version>4.9.4</version>
</dependency>
点击查看代码
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class TestRocketMq1 {
    public static void main(String[] args) throws Exception {
        testSyncSendMessage();
//        testAsyncSendMessage()
//        testSyncSendDelayMessage();
//        testConcurrentlyConsumerMessage();
        TimeUnit.SECONDS.sleep(3);
        testOrderlyConsumerMessage();
    }

    /**
     * 并发消费(非顺序)
     *
     * @throws Exception
     */
    private static void testConcurrentlyConsumerMessage() throws Exception {
        DefaultMQPushConsumer consumer = createConsumer();
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String content = new String(msg.getBody(), StandardCharsets.UTF_8);
                    System.out.println("content:" + content + "," + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

    /**
     * 顺序消费 先发送的消息先消费
     *
     * @throws Exception
     */
    private static void testOrderlyConsumerMessage() throws Exception {
        DefaultMQPushConsumer consumer = createConsumer();
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    String content = new String(msg.getBody(), StandardCharsets.UTF_8);
                    System.out.println("content:" + content + "," + msg);
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }

    /**
     * 发送同步消息
     *
     * @throws Exception
     */
    private static void testSyncSendMessage() throws Exception {
        MQProducer producer = createProducer();
        producer.start();
        //第一个参数是topic,这是主题
        //第二个参数是tags,这是可选参数,用于消费端过滤消息
        //第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
        //消息的索引,可以设置为消息的唯一编号(主  键)。
        Message msg = new Message("test_topic", "TagA", "6666", "RocketMQ Test message".getBytes());
        //SendResult是发送结果的封装,包括消息状态,消息id,选择的队列等等,只要不抛异常,就代表发送成功
        SendResult sendResult = producer.send(msg);
        System.out.println("sync sendResult: " + sendResult);
        producer.shutdown();
    }

    /**
     * 发送同步延迟消息
     *
     * @throws Exception
     */
    private static void testSyncSendDelayMessage() throws Exception {
        MQProducer producer = createProducer();
        producer.start();
        //第一个参数是topic,这是主题
        //第二个参数是tags,这是可选参数,用于消费端过滤消息
        //第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
        //消息的索引,可以设置为消息的唯一编号(主  键)。
        Message msg = new Message("test_topic", "TagA", "6666", "RocketMQ Test message".getBytes());
        //delayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        //设置消息延迟级别等于0时,则该消息为非延迟消息。
        //设置消息延迟级别大于等于1并且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于2,则延迟5s,以此类推。
        //设置消息延迟级别大于18时,则该消息延迟级别为18,如:设置消息延迟级别等于20,则延迟2h。
        //设置了此延迟级别就是延迟消息了
        msg.setDelayTimeLevel(2);
        //SendResult是发送结果的封装,包括消息状态,消息id,选择的队列等等,只要不抛异常,就代表发送成功
        SendResult sendResult = producer.send(msg);
        System.out.println("sync sendResult: " + sendResult);
        producer.shutdown();
    }

    /**
     * 发送异步消息
     *
     * @throws Exception
     */
    private static void testAsyncSendMessage() throws Exception {
        MQProducer producer = createProducer();
        producer.start();
        //第一个参数是topic,这是主题
        //第二个参数是tags,这是可选参数,用于消费端过滤消息
        //第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
        //消息的索引,可以设置为消息的唯一编号(主  键)。
        Message msg = new Message("test_topic", "TagA", "6666", "RocketMQ Async message".getBytes());
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("async sendResult: " + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("async sendResult: " + e);
            }
        });
        //必须等回调结果之后才能shutdown
        TimeUnit.SECONDS.sleep(5);
        producer.shutdown();
    }

    private static MQProducer createProducer() {

        DefaultMQProducer producer = new DefaultMQProducer("test_producer_group", createRpcHook());
        //生产者需用通过NameServer获取所有broker的路由信息,多个用分号隔开,这个跟Redis哨兵一样
        producer.setNamesrvAddr("42.192.20.119:9876");
        return producer;
    }

    private static DefaultMQPushConsumer createConsumer() throws Exception {
        //消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(createRpcHook());
        consumer.setConsumerGroup("test_consumer_group");
        //消费者从NameServer拿到topic的queue所在的Broker地址,多个用分号隔开
        consumer.setNamesrvAddr("42.192.20.119:9876");
        //设置Consumer第一次启动是从队列头部开始消费
        //如果非第一次启动,那么按照上次消费的位置继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //subscribe订阅的第一个参数就是topic,第二个参数为生产者发送时候的tags,*代表匹配所有消息,
        //想要接收具体消息时用||隔开,如"TagA||TagB||TagD"
        consumer.subscribe("test_topic", "TagA");
        //Consumer可以用两种模式启动,广播(Broadcast)和集群(Cluster),广播模式下,一条消息会发送给所有Consumer,
        //集群模式下消息只会发送给一个Consumer
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //批量消费,每次拉取10条
        consumer.setConsumeMessageBatchMaxSize(10);
        return consumer;
    }

    private static RPCHook createRpcHook() {
        //账号密码
        String accessKey = "my_admin";
        String secretKey = "szz123456";
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }
}

注意,consumer被关闭之后,就不能接收到消息了

顺序消息

  1. 需要保证顺序的消息要发送到同一个message queue中。其次,一个message queue只能被一个消费者消费,这点是由消息队列的分配机制来保证的。最后,一个消费者内部对一个mq的消费要保证是有序的。我们要做到生产者 - message queue - 消费者之间是一对一对一的关系。

  2. 生产者发送消息的时候,到达Broker应该是有序的。所以对于生产者,不能使用多线程异步发送,而是顺序发送。
    写入Broker的时候,应该是顺序写入的。也就是相同主题的消息应该集中写入,选择同一个Message Queue,而不是分散写入。要达到这个效果很简单,只需要我们在发送的时候传入相同的hashKey,就会选择同一个队列。
    image

  3. 消费者消费的时候只能有一个线程,否则由于消费的速率不同,有可能出现记录到数据库的时候无序。 在Spring Boot中,consumeMode设置为ORDERLY,在Java API中,传入MessageListenerOrderly的实现类即可。

当然顺序消费会带来一些问题:

  1. 遇到消息失败的消息,无法跳过,当前队列消费暂停
  2. 降低了消息处理的性能

整合Spring

  • 主要是RocketMQAutoConfiguration配置类,它会创建DefaultMQProducer,RocketMQTemplate等Bean
  • ListenerContainerConfiguration配置类来查询出所有包含@RocketMQMessageListener注解的类,就是我们定义的消息消费者
  • 将查询到的么一个类封装为DefaultRocketMQListenerContainer类,它会创建DefaultMQPushConsumer,用来实际处理接收到的消息

MQ选型分析

image

参考

RocketMQ扫盲贴及Java API使用精讲
深度好文!RocketMQ高级进阶知识精讲!
RocketMQ-Docker安装