RabbitMQ的死信队列,延时队列

发布时间 2023-06-28 12:06:02作者: 广州大雄

死信队列简介

RabbitMQ 的死信队列(Dead Letter Queue)是一种特殊的队列,用于存储那些被标记为“死信”的消息。所谓死信即无法被正常消费和处理的消息,通常是由于一些特定的情况或条件导致的,比如过期、重试次数超过限制等。

普通消息成为死信的常见原因有

  • 消息被拒(basic.reject or basic.nack)丢弃消息(requeue=false);
  • 消息过期:当消息的生存周期超过了设置的过期时间,即消息在队列中等待被消费的时间超过了预定的时间。(可以通过设置 x-message-ttl 参数来指定消息的过期时间 或者 可以为每条消息单独设置过期时间。通过在消息的属性中设置 expiration 字段,以毫秒为单位指定消息的过期时间)
  • 队列达到最大长度:当队列中的消息数量超过了设置的最大长度时,新到达的消息无法进入队列,而被视为死信。(创建队列时指定" x-max-length参数设置队列最大消息数量)

死信处理过程

image

消息被拒

class Program
{
	static IConnectionFactory factory = new ConnectionFactory()
	{
		HostName = "192.168.100.2",
		UserName = "uat",
		Password = "137955aaA",
		VirtualHost = "uat_vhost"
	};
	/// <summary>
	/// 生产者
	/// </summary>
	public static void SendMessage()
	{
		//死信交换机
		string dlxexChange = "dlx.exchange";
		//死信队列
		string dlxQueueName = "dlx.queue";

		//消息交换机
		string exchange = "direct.exchange";
		//消息队列
		string queueName = "direct.queue";

		using (var connection = factory.CreateConnection())
		{
			using (var channel = connection.CreateModel())
			{

				//创建死信交换机
				channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
				//创建死信队列
				channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
				//死信队列绑定死信交换机
				channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

				// 创建消息交换机
				channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
				//创建消息队列,并指定死信队列
				channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
									new Dictionary<string, object> {
										 { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
										 { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
									 });
				//消息队列绑定消息交换机
				channel.QueueBind(queueName, exchange, routingKey: queueName);

				string message = "hello rabbitmq message";
				var properties = channel.CreateBasicProperties();
				properties.Persistent = true;
				//发布消息
				channel.BasicPublish(exchange: exchange,
									 routingKey: queueName,
									 basicProperties: properties,
									 body: Encoding.UTF8.GetBytes(message));
				Console.WriteLine($"向队列:{queueName}发送消息:{message}");
			}
		}
	}

	/// <summary>
	/// 消费者
	/// </summary>
	public static void Consumer()
	{
		//死信交换机
		string dlxexChange = "dlx.exchange";
		//死信队列
		string dlxQueueName = "dlx.queue";

		//消息交换机
		string exchange = "direct.exchange";
		//消息队列
		string queueName = "direct.queue";

		var connection = factory.CreateConnection();
		{
			//创建信道
			var channel = connection.CreateModel();
			{

				//创建死信交换机
				channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
				//创建死信队列
				channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
				//死信队列绑定死信交换机
				channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

				// 创建消息交换机
				channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
				//创建消息队列,并指定死信队列
				channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
									new Dictionary<string, object> {
										 { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX
										 { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
									 });
				//消息队列绑定消息交换机
				channel.QueueBind(queueName, exchange, routingKey: queueName);

				var consumer = new EventingBasicConsumer(channel);
				channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
				consumer.Received += (model, ea) =>
				{
					//处理业务
					var message = Encoding.UTF8.GetString(ea.Body.ToArray());
					Console.WriteLine($"队列{queueName}消费消息:{message},不做ack确认");
					//不ack(BasicNack),且不把消息放回队列(requeue:false)
					channel.BasicNack(ea.DeliveryTag, false, requeue: false);
				};
				channel.BasicConsume(queueName, autoAck: false, consumer);
			}
		}
	}
	static void Main(string[] args)
	{
		SendMessage();
		Consumer();
		Console.ReadLine();
	}
}

image

不ack(BasicNack),且不把消息放回队列(requeue:false),产生死信队列

image

延时队列(相同的过期时间)

延时队列通常与死信队列结合使用,以实现消息的延迟投递和处理。它的基本思想是给消息队列设置消息的过期时间(TTL),当消息在一定时间内未被消费者处理时,将其标记为过期并发送到死信队列。
image

class Program
{
	static IConnectionFactory factory = new ConnectionFactory()
	{
		HostName = "192.168.100.2",
		UserName = "uat",
		Password = "137955aaA",
		VirtualHost = "uat_vhost"
	};
	/// <summary>
	/// 生产者
	/// </summary>
	public static void SendMessage()
	{
		//死信交换机
		string dlxexChange = "dlx.exchange";
		//死信队列
		string dlxQueueName = "dlx.queue";

		//消息交换机
		string exchange = "direct.exchange";
		//消息队列
		string queueName = "direct.queue.delay";

		using (var connection = factory.CreateConnection())
		{
			using (var channel = connection.CreateModel())
			{

				//创建死信交换机
				channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
				//创建死信队列
				channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
				//死信队列绑定死信交换机
				channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

				// 创建消息交换机
				channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
				//创建消息队列,并指定死信队列
				channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
									new Dictionary<string, object> {
										 { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
										 { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
										 { "x-message-ttl",3000}
									 });
				//消息队列绑定消息交换机
				channel.QueueBind(queueName, exchange, routingKey: queueName);

				string message = "hello rabbitmq message";
				var properties = channel.CreateBasicProperties();
				properties.Persistent = true;
				//发布消息
				channel.BasicPublish(exchange: exchange,
									 routingKey: queueName,
									 basicProperties: properties,
									 body: Encoding.UTF8.GetBytes(message));
				Console.WriteLine($"{DateTime.Now}队列:{queueName}发送消息:{message}");
			}
		}
	}

	/// <summary>
	/// 消费延时队列
	/// </summary>
	public static void Consumer_delay()
	{
		//死信交换机
		string dlxexChange = "dlx.exchange";
		//死信队列
		string dlxQueueName = "dlx.queue";

		var connection = factory.CreateConnection();
		{
			//创建信道
			var channel = connection.CreateModel();
			{

				//创建死信交换机
				channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
				//创建死信队列
				channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
				//死信队列绑定死信交换机
				channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

				var consumer = new EventingBasicConsumer(channel);
				channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
				consumer.Received += (model, ea) =>
				{
					//处理业务
					var message = Encoding.UTF8.GetString(ea.Body.ToArray());
					Console.WriteLine($"{DateTime.Now}队列{dlxQueueName}消费消息:{message}");
					channel.BasicAck(ea.DeliveryTag, false);
				};
				channel.BasicConsume(dlxQueueName, autoAck: false, consumer);
			}
		}
	}
	static void Main(string[] args)
	{
		Task.Factory.StartNew(() =>
		{
			for (int i = 0; i < 10; i++)
			{
				SendMessage();
				System.Threading.Thread.Sleep(1000);
			}
		});

		Consumer_delay();
		Console.ReadLine();
	}
}

image

延时队列(不同的过期时间)

延时队列处理,不同的过期时间的消息,会堵塞后面的消息,导致过期了还没消费。
可以通过rabbitmq_delayed_message_exchange 插件,解决这个问题。通过该插件,可以在 RabbitMQ 中实现延时消息投递和消费。

class Program
{
	static IConnectionFactory factory = new ConnectionFactory()
	{
		HostName = "192.168.100.2",
		UserName = "uat",
		Password = "137955aaA",
		VirtualHost = "uat_vhost"
	};
	/// <summary>
	/// 生产者
	/// </summary>
	public static void SendMessage()
	{
		//延时交换机
		string delayExchange = "dlx.exchange.delayed";
		//延时队列
		string delayQueueName = "dlx.queue.delayed";
		using (var connection = factory.CreateConnection())
		{
			using (var channel = connection.CreateModel())
			{

				//创建延时交换机
				channel.ExchangeDeclare(delayExchange, type: "x-delayed-message", durable: true, autoDelete: false, new Dictionary<string, object> {
					{ "x-delayed-type","direct"}
				});
				//创建死信队列
				channel.QueueDeclare(delayQueueName, durable: true, exclusive: false, autoDelete: false);
				//死信队列绑定死信交换机
				channel.QueueBind(delayQueueName, delayExchange, routingKey: delayQueueName);


				string message = "hello rabbitmq message 10s后处理";
				var properties = channel.CreateBasicProperties();
				properties.Persistent = true;
				properties.Headers = new Dictionary<string, object> { { "x-delay", "10000" } };
				//发布消息
				channel.BasicPublish(exchange: delayExchange,
								 routingKey: delayQueueName,
								 basicProperties: properties,
								 body: Encoding.UTF8.GetBytes(message));
				Console.WriteLine($"{DateTime.Now}队列:{delayQueueName}发送消息:{message}");


				message = "hello rabbitmq message 5s后处理";
				properties = channel.CreateBasicProperties();
				properties.Persistent = true;
				properties.Headers = new Dictionary<string, object> { { "x-delay", "5000" } };
				channel.BasicPublish(exchange: delayExchange,
								routingKey: delayQueueName,
								basicProperties: properties,
								body: Encoding.UTF8.GetBytes(message));
				Console.WriteLine($"{DateTime.Now}队列:{delayQueueName}发送消息:{message}");

			}
		}
	}

	/// <summary>
	/// 消费延时队列
	/// </summary>
	public static void Consumer_delay()
	{
		//死信队列
		string dlxQueueName = "dlx.queue.delayed";

		var connection = factory.CreateConnection();
		{
			//创建信道
			var channel = connection.CreateModel();
			{
				var consumer = new EventingBasicConsumer(channel);
				channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
				consumer.Received += (model, ea) =>
				{
					//处理业务
					var message = Encoding.UTF8.GetString(ea.Body.ToArray());
					Console.WriteLine($"{DateTime.Now}队列{dlxQueueName}消费消息:{message}");
					System.Threading.Thread.Sleep(20);
					channel.BasicAck(ea.DeliveryTag, false);
				};
				channel.BasicConsume(dlxQueueName, autoAck: false, consumer);
			}
		}
	}
	static void Main(string[] args)
	{
		SendMessage();
		Consumer_delay();
		Console.ReadLine();
	}
}

image