rocketmq记录

发布时间 2023-10-19 19:43:27作者: wyy1

Rocketmq  生产者、消费者


maven引用
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
<!--一个好用的工具包,可以不引入-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.0</version>
</dependency>

生产者实现

1、NamesrvAddr参数在多个节点时,用英文分号分隔,例: 192.168.9.58:9876;192.168.9.59:9876

import cn.hutool.core.util.RandomUtil;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”);
producer.setNamesrvAddr(“127.0.0.1:9876”);
//发送超时时间,默认3000 单位ms
producer.setSendMsgTimeout(5000);
producer.start();

try {
Message msg = new Message("TestTopic",// topic
"177", // tag 可以为空,用以简单的筛选。
RandomUtil.randomString(8), // key 可以为空,可用以查询。
("test" + RandomUtil.randomString(8)).getBytes()); // body ,我常将对象转json再获取byte[] 进行传输。
SendResult send = producer.send(msg);
if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
//发送成功处理
}else {
//发送失败处理
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}

多线程加批量生产者模拟实现

1、批量发送时,topic必须为同一个,否则发送会报异常。

2、批量发送相较于单条发送速度提升很大。

import cn.hutool.core.util.RandomUtil;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”);
producer.setNamesrvAddr(“127.0.0.1:9876”);
//发送超时时间,默认3000 单位ms
producer.setSendMsgTimeout(5000);
producer.start();

int threadCount = 20;
int forCount = 100000;
CountDownLatch latch = new CountDownLatch(threadCount);
long start = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
List<Message> list = new ArrayList<>();
for (int j = 0; j < forCount; j++) {
try {
Message msg = new Message("TestTopic",// topic
"177", // tag
RandomUtil.randomString(8), // key
("test" + RandomUtil.randomString(8)).getBytes()); // body
list.add(msg);
if (list.size() >= 100) {
SendResult send = producer.send(list);
if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
//发送成功处理
list.clear();
}else {
//发送失败处理
}
}
} catch (Exception e) {
//发送失败处理
e.printStackTrace();
}
}
if (list.size() > 0) {
SendResult send = producer.send(list);
if (!send.getSendStatus().equals(SendStatus.SEND_OK)) {
System.out.println(send);
}
list.clear();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
}
latch.await();
long hs = System.currentTimeMillis() - start;
System.out.println(hs);

long speed = (threadCount * forCount) / (hs >= 0 ? 1 : hs / 1000);
System.out.println("速度" + speed);
//正式环境不要发完就就shutdown,要在应用退出时再shutdown。
producer.shutdown();
}
}

push消费者
import cn.hutool.core.lang.Console;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

public class PushConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“PushConsumerGroupName”);
consumer.setNamesrvAddr(“127.0.0.1:9876”);
//一个GroupName第一次消费时的位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);
//要消费的topic,可使用tag进行简单过滤
consumer.subscribe(“TestTopic”, “*”);
//一次最大消费的条数
consumer.setConsumeMessageBatchMaxSize(100);
//消费模式,广播或者集群,默认集群。
consumer.setMessageModel(MessageModel.CLUSTERING);
//在同一jvm中 需要启动两个同一GroupName的情况需要这个参数不一样。
consumer.setInstanceName(“InstanceName”);
//配置消息监听
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
//业务处理
msgs.forEach(msg -> {
Console.log(msg);
});
} catch (Exception e) {
System.err.println(“接收异常” + e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println(“Consumer Started.”);
}
}

 

pull消费者
从4.6之后,提供了DefaultLitePullConsumer 大大简化了pull的操作。以下为新实现,4.6之前的版本不支持。

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Console;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class PullConsumer {
private static boolean runFlag = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(“PullConsumerGroupName”);
consumer.setNamesrvAddr(“127.0.0.1:9876”);
//要消费的topic,可使用tag进行简单过滤
consumer.subscribe(“TestTopic”, “*”);
//一次最大消费的条数
consumer.setPullBatchSize(100);
//无消息时,最大阻塞时间。默认5000 单位ms
consumer.setPollTimeoutMillis(5000);
consumer.start();
while (runFlag){
try {
//拉取消息,无消息时会阻塞
List msgs = consumer.poll();
if (CollUtil.isEmpty(msgs)){
continue;
}
//业务处理
msgs.forEach(msg-> Console.log(new String(msg.getBody())));
//同步消费位置。不执行该方法,应用重启会存在重复消费。
consumer.commitSync();
}catch (Exception e){
e.printStackTrace();
}
}
consumer.shutdown();
}
}