RocketMq发送消息之延迟消息

发布时间 2023-09-23 22:02:55作者: 自学Java笔记本

延迟消息

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

使用限制

对比于rabbitmq中的延迟消息来说,rockermq并不支持任意时间的延迟,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18级

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

生产者代码

通过 msg.setDelayTimeLevel(2); 设置延迟等级,2 表示5s的延迟

public class producer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer,指定一个生产者组
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 设置NameServer的地址
        producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 创建消息,并指定Topic(主题),Tag(消息Tag)和消息体(消息内容)
            Message msg = new Message("DelayTopic",
                    "Tag1",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 设置 延迟时间 , 目前只支持"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";对应18个等级
            msg.setDelayTimeLevel(2);// 表示5s
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            TimeUnit.SECONDS.sleep(1);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

消费者代码

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
//        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 3.订阅主题Topic和Tag
        consumer.subscribe("DelayTopic","Tag1");//消费Tag1中的消息,通过 || xxx 可以消费多个消息,通过*可以消费 DelayTopic 下的所有消息
        // 4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 接收消息内容的方法
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("接收到的消息消息ID:"+msg.getMsgId()+"延迟时间:"+(System.currentTimeMillis()-msg.getStoreTimestamp()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

运行结果

image