RabbitMQ延迟队列
简单理解:
交换机在消息达到指定延迟时间后才会发送到指定的队列中,类似定时器的功能。
安装rabbitmq_delayed_message_exchange插件
该插件可以创建延迟交换机,消息在达到指定延迟时间后才会发送到指定的队列中。
下载
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez
上传到docker的插件目录下
# docker cp 目的主机路径 容器ID:容器内路径
docker cp rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins
# 修改一下文件的所属用户及分组,我这里复制过去后是root了。
# 在容器内执行 chown 命令修改文件的属主
docker exec rabbitmq chown rabbitmq:rabbitmq /plugins/rabbitmq_delayed_message_exchange-3.12.0.ez
设置需要开启的插件
我这里把需要开启的插件放在了容器外部,方便重新重建容器的时候可以生效。
cd /data/rabbitmq/conf
vim enabled_plugins
enabled_plugins文件内容
[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus].
也可以进入容器内部执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange
开启插件,不过这样重启会失效。
重启docker容器
docker restart rabbitmq
验证是否安装成功
重启后查看运行的插件
docker exec -it rabbitmq rabbitmq-plugins list
登录http://127.0.0.1:15672/查看新建exchange时有没有type为x-delayed-message
如果存在代表安装成功。
代码实现
MQ交换机和队列结构图
配置交换机和队列
package com.zjw.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* Rabbitmq插件实现延迟队列,RabbitMQ需要安装rabbitmq_delayed_message_exchange插件
*/
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
/**
* 创建队列
* @return 队列
*/
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
/**
* 创建交换机
* @return 交换机
*/
@Bean
public CustomExchange delayedExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true,false, args);
}
/**
* 队列和交换机绑定
* @param queue 队列
* @param delayedExchange 交换机
* @return 绑定关系
*/
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者
package com.zjw.controller;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import static com.zjw.config.DelayedQueueConfig.DELAYED_EXCHANGE_NAME;
import static com.zjw.config.DelayedQueueConfig.DELAYED_ROUTING_KEY;
/**
* 实现发送延迟消息,创建延迟的交换机,消息在达到指定延迟时间后才会发送到指定的队列中
*/
@AllArgsConstructor
@Slf4j
@RestController
@RequestMapping("/delay")
public class SendDelayMsgController {
private RabbitTemplate rabbitTemplate;
/**
* 发送延迟消息
* @param message 消息内容
* @param delayTime 延迟时间
*/
@GetMapping("/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
log.info("当前时间:{},发送一条时长{}毫秒信息给队列delayed.queue:{}", new Date(), delayTime, message);
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> {
//delayTime 单位:ms
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
}
}
消费者
package com.zjw.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 消费者,基于插件的延迟消息
*/
@Slf4j
@Component
public class DelayQueueConsumer {
@RabbitListener(queues = {"delayed.queue"})
public void receiveD(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(), msg);
}
}
测试
通过浏览器发送一个延迟10s的消息。
http://localhost:8080/delay/hello/10000
生产者日志
2024-01-11T18:34:15.923+08:00 INFO 19184 --- [nio-8080-exec-7] c.zjw.controller.SendDelayMsgController : 当前时间:Thu Jan 11 18:34:15 CST 2024,发送一条时长10000毫秒信息给队列delayed.queue:hello
消费者日志
2024-01-11T18:34:25.929+08:00 INFO 3368 --- [ntContainer#2-1] com.zjw.consumer.DelayQueueConsumer : 当前时间:Thu Jan 11 18:34:25 CST 2024,收到死信队列的消息:hello
发现消息在10s后被消费者消费了。