RocketMq发送消息之批量消息

发布时间 2023-09-24 13:21:27作者: 自学Java笔记本

概述

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB

发送批量消息

如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //处理error
}

消息列表分割

复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:

/**
 * 消息分割,在rocketmq中,一次性发送消息的长度不可超过4mb,此时我们需要进行切割,确保消息长度小于4mb
 */
public class ListSplitter implements Iterator<List<Message>> { 
    // 定义消息的大小
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    // 表示索引
    private int currIndex;
    public ListSplitter(List<Message> messages) { 
        this.messages = messages;
    }
    
    @Override 
    public boolean hasNext() {
        return currIndex < messages.size(); 
    }
    
    @Override 
    public List<Message> next() { 
        // 开始的索引
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        // 大小
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex); 
            // 计算消息大小
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break; 
            } else {
                totalSize += tmpSize; 
            }
        }
        List<Message> subList = messages.subList(startIndex, nextIndex); 
        currIndex = nextIndex;
        return subList;
    }
    private int getStartIndex() {
        Message currMessage = messages.get(currIndex); 
        int tmpSize = calcMessageSize(currMessage); 
        while(tmpSize > SIZE_LIMIT) {
            currIndex += 1;
            Message message = messages.get(currIndex); 
            tmpSize = calcMessageSize(message);
        }
        return currIndex; 
    }

    /**
     * 消息大小的计算
     * @param message 消息
     * @return 长度
     */
    private int calcMessageSize(Message message) {
        // 消息大小 = 主题的长度 + 消息体长度 + 日志开销
        int tmpSize = message.getTopic().length() + message.getBody().length; 
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length(); 
        }
        tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
        return tmpSize;
    }
}

然后在发送的时候通过迭代器发送:

 // 启动Producer实例
        producer.start();
        // 构建一个消息列表
        List<Message> messageList = new ArrayList<>();
        messageList.add(new Message("batch","tag1",("发送批量消息【"+"1】").getBytes()));
        messageList.add(new Message("batch","tag1",("发送批量消息【"+"2】").getBytes()));
        messageList.add(new Message("batch","tag1",("发送批量消息【"+"3】").getBytes()));

        // 发送消息到一个Broker
        //把大的消息分裂成若干个小的消息
        ListSplitter splitter = new ListSplitter(messageList);
        while (splitter.hasNext()) {
            try {
                List<Message>  listItem = splitter.next();
                SendResult sendResult = producer.send(listItem);
                // 通过sendResult返回消息是否成功送达
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                //处理error
            }
        }