sudo docker run -itd --name ubuntu2204-rocketmq
-v /home/cl/docker/ubuntu2204-rocketmq:/home
--restart always
-p 9876:9876
-p 10911:10911
-p 10909:10909
ubuntu:22.04
sudo docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.50.228:9876" -p 18080:8080 -t apacherocketmq/rocketmq-dashboard:latest
关闭 Linux 防火墙,方便测试。
修改 $MQ_HOME/conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
namesrvAddr=localhost:9876
brokerIP1=服务器IP地址
声明了自动创建 toplic 和指定了 namesrvAddr
启动 mqnamesrv 和 mqbroker
cd $MQ_HOME
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -c conf/broker.conf &
Java 程序
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.0</version>
</dependency>
192.168.1.146
为安装服务器 IP
public class ProducerExample {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("DefaultCluster");
producer.setNamesrvAddr("192.168.1.146:9876");
producer.start();
Message message = new Message("TestTopic2", "tags", "test".getBytes());
SendResult send = producer.send(message);
producer.shutdown();
}
}
public class ConsumerExample {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultCluster");
consumer.setNamesrvAddr("192.168.1.146:9876");
consumer.subscribe("TestTopic2", "*");
consumer.setMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {
for (MessageExt messageExt : list) {
byte[] body = messageExt.getBody();
System.out.println(" ==> " + new String(body));
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
}
}