RabbitMQ延迟队列

发布时间 2024-01-11 18:37:06作者: 雨中遐想

RabbitMQ延迟队列

简单理解:

交换机在消息达到指定延迟时间后才会发送到指定的队列中,类似定时器的功能。

安装rabbitmq_delayed_message_exchange插件

该插件可以创建延迟交换机,消息在达到指定延迟时间后才会发送到指定的队列中。

Rabbit plugins

插件GitHub

下载

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez

上传到docker的插件目录下

# docker cp 目的主机路径 容器ID:容器内路径
docker cp rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins

# 修改一下文件的所属用户及分组,我这里复制过去后是root了。
# 在容器内执行 chown 命令修改文件的属主
docker exec rabbitmq chown rabbitmq:rabbitmq /plugins/rabbitmq_delayed_message_exchange-3.12.0.ez

设置需要开启的插件

我这里把需要开启的插件放在了容器外部,方便重新重建容器的时候可以生效。

cd /data/rabbitmq/conf
vim enabled_plugins

enabled_plugins文件内容

[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus].

也可以进入容器内部执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange开启插件,不过这样重启会失效。

重启docker容器

docker restart rabbitmq

验证是否安装成功

重启后查看运行的插件

docker exec -it rabbitmq rabbitmq-plugins list

登录http://127.0.0.1:15672/查看新建exchange时有没有type为x-delayed-message

如果存在代表安装成功。

代码实现

MQ交换机和队列结构图

配置交换机和队列

package com.zjw.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * Rabbitmq插件实现延迟队列,RabbitMQ需要安装rabbitmq_delayed_message_exchange插件
 */
@Configuration
public class DelayedQueueConfig {

    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

    /**
     * 创建队列
     * @return 队列
     */
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }

    /**
     * 创建交换机
     * @return 交换机
     */
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true,false, args);
    }

    /**
     *  队列和交换机绑定
     * @param queue 队列
     * @param delayedExchange 交换机
     * @return 绑定关系
     */
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange delayedExchange){
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者

package com.zjw.controller;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

import static com.zjw.config.DelayedQueueConfig.DELAYED_EXCHANGE_NAME;
import static com.zjw.config.DelayedQueueConfig.DELAYED_ROUTING_KEY;

/**
 * 实现发送延迟消息,创建延迟的交换机,消息在达到指定延迟时间后才会发送到指定的队列中
 */
@AllArgsConstructor
@Slf4j
@RestController
@RequestMapping("/delay")
public class SendDelayMsgController {

    private RabbitTemplate rabbitTemplate;

    /**
     * 发送延迟消息
     * @param message 消息内容
     * @param delayTime 延迟时间
     */
    @GetMapping("/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
        log.info("当前时间:{},发送一条时长{}毫秒信息给队列delayed.queue:{}", new Date(), delayTime, message);
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> {
            //delayTime 单位:ms
            correlationData.getMessageProperties().setDelay(delayTime);
            return correlationData;
        });
    }
}

消费者

package com.zjw.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 消费者,基于插件的延迟消息
 */
@Slf4j
@Component
public class DelayQueueConsumer {

    @RabbitListener(queues = {"delayed.queue"})
    public void receiveD(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(), msg);
    }
}

测试

通过浏览器发送一个延迟10s的消息。

http://localhost:8080/delay/hello/10000

生产者日志

2024-01-11T18:34:15.923+08:00  INFO 19184 --- [nio-8080-exec-7] c.zjw.controller.SendDelayMsgController  : 当前时间:Thu Jan 11 18:34:15 CST 2024,发送一条时长10000毫秒信息给队列delayed.queue:hello

消费者日志

2024-01-11T18:34:25.929+08:00  INFO 3368 --- [ntContainer#2-1] com.zjw.consumer.DelayQueueConsumer      : 当前时间:Thu Jan 11 18:34:25 CST 2024,收到死信队列的消息:hello

发现消息在10s后被消费者消费了。