rocketmq 随笔

发布时间 2023-06-30 16:47:39作者: LiuChengloong

sudo docker run -itd --name ubuntu2204-rocketmq
-v /home/cl/docker/ubuntu2204-rocketmq:/home
--restart always
-p 9876:9876
-p 10911:10911
-p 10909:10909
ubuntu:22.04

sudo docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.50.228:9876" -p 18080:8080 -t apacherocketmq/rocketmq-dashboard:latest

关闭 Linux 防火墙,方便测试。

修改 $MQ_HOME/conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

autoCreateTopicEnable=true
namesrvAddr=localhost:9876
brokerIP1=服务器IP地址

声明了自动创建 toplic 和指定了 namesrvAddr

启动 mqnamesrv 和 mqbroker

cd $MQ_HOME

nohup sh bin/mqnamesrv &

nohup sh bin/mqbroker -c conf/broker.conf &

Java 程序
pom.xml

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.1.0</version>
</dependency>

192.168.1.146 为安装服务器 IP

public class ProducerExample {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("DefaultCluster");
        producer.setNamesrvAddr("192.168.1.146:9876");
        producer.start();

        Message message = new Message("TestTopic2", "tags", "test".getBytes());
        SendResult send = producer.send(message);

        producer.shutdown();
    }
}


public class ConsumerExample {
    public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultCluster");
        consumer.setNamesrvAddr("192.168.1.146:9876");
        consumer.subscribe("TestTopic2", "*");

        consumer.setMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {
            for (MessageExt messageExt : list) {
                byte[] body = messageExt.getBody();
                System.out.println(" ==> " + new String(body));
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });

        consumer.start();
    }
}