.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

发布时间 2023-03-28 16:48:29作者: 95后的码农

一、安装mq

2、创建公共项目Commons用于提供者和消费者引用,nuget安装 RabbitMQ.Client,添加一个帮助类:

public class RabbitMQHelper
    {

        //连接mq
        public static IConnection GetMQConnection()
        {
            var factory = new ConnectionFactory
            {
                HostName = "127.0.0.1",  //mq的ip(我自己虚拟机上的)
                Port = 5672, //端口
                UserName = "guoyingjian",  //账户
                Password = "guoyingjian",  //密码
                VirtualHost = "/" //虚拟机 
            };
            return factory.CreateConnection();  //返回连接
        }
    }
View Code

二、实操

rabbitmq消息队列有几种模式:

1、简单模式

一个提供者,一个消费者,是有序的,消费者只有一个,吞吐量低,工作基本不用,用来学习了解一下还是可以的

2、工作模式

根据队列名发消息,但有多个消费者,无序的,吞吐量高,1和2工作中基本不用,因为他们没有使用自定义交换机,练练手明白就行了。

生产者代码:

using RabbitMQ.Client;

        /// <summary>
        /// MQ 工作队列模式发消息
        /// </summary>
        /// <returns></returns>
        public void SendWorkerMQ()
        {
            //最基础的是点对点的队列模式,他的优势是有序的,

            //下面这个工作队列是无序的
            #region 工作队列模式
            string queueName = "WorkQueue";
            using (var connection = RabbitMQHelper.GetMQConnection())
            {
                //创建通信管道
                using (var channel = connection.CreateModel())
                {
                    //创建队列
                    channel.QueueDeclare(queueName, false, false, false, null);
                    for (int i = 1; i <= 30; i++)
                    {
                        string message = "hello mq" + i;
                        var body = Encoding.UTF8.GetBytes(message);
                        //发送消息到mq,如没绑定交换机,将使用默认交换机路由
                        channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
                        Console.WriteLine("send normal message" + i);
                    }
                }
            }
            #endregion
        }
View Code

消费者代码:

//工作队列接收消息(多个消费者,默认轮循)
        public static void ReceiveMessage()
        {
            string queueName = "WorkQueue";//队列名称与提供者一致
            var connection = RabbitMQHelper.GetMQConnection();

            //创建管道
            var channel = connection.CreateModel();
            channel.QueueDeclare(queueName, false, false, false, null);
            var consumer = new EventingBasicConsumer(channel);

            //prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            //消息处理的事件
            consumer.Received += (model, ea) =>
            {
                //业务逻辑处理
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine($"normal received => {message}");
            };

            //消费消息
            channel.BasicConsume(queueName, true, consumer);

        }
View Code

下面是工作中使用交换机的4种模式:

3、fanout扇形模式(发布订阅)


该类型通常叫作广播类型。fanout类型的Exchange不处理Routing key,而是会将发送给Exchange的消息,路由到所有与它绑定的Queue上。比如现在有一个fanout类型的Exchange,它下面绑定了三个Queue,Routing key分别是ORDER/GOODS/STOCK:

 

 

然后向该Exchange中发送一条消息,消息的Routing key随便填一个值abc(不填也行),如果这个Exchange的路由与这三个Queue绑定,则三个Queue都应该会收到消息

生产者代码:

/// <summary>
/// MQ 扇形交换机模式发消息
/// </summary>
/// <returns></returns>
[HttpGet("SendFanoutWorkerMQ")]
public void SendFanoutWorkerMQ()
{
#region 使用扇形交换机模式
using (var connection = RabbitMQHelper.GetMQConnection())
{
//创建通信管道
using (var channel = connection.CreateModel())
{
string exchangeName = "fanout_exchange";//fanout只提供交换机名称即可

var properties = channel.CreateBasicProperties();
properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性

for (int i = 1; i <= 10; i++)
{
var body = Encoding.UTF8.GetBytes("hello mq" + i);
//这里绑定了交换机,那么就会发送到这个交换机所有绑定过的队列中
channel.BasicPublish(exchange: exchangeName, routingKey: "", properties, body);
}
}
}
#endregion
}
View Code

 

消费者代码:

<summary>
/// 扇形模式队列消费消息
/// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
/// </summary>
public static void FanoutReceiveMessage()
{

var connection = RabbitMQHelper.GetMQConnection();
//创建管道
var channel = connection.CreateModel();

//创建交换机
channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
//创建队列
string queueName1 = "fanoutWorkQueue1";//队列名称
string queueName2 = "fanoutWorkQueue2";
string queueName3 = "fanoutWorkQueue3";
channel.QueueDeclare(queue: queueName1,//队列名
durable: false,//是否持久化
exclusive: false,//排它性
autoDelete: false,//一旦客户端连接断开则自动删除queue
arguments: null);//如果安装了队列优先级插件则可以设置优先级
channel.QueueDeclare(queueName2, false, false, false, null);
channel.QueueDeclare(queueName3, false, false, false, null);

//多个队列绑定到fanout_exchange交换机(似发布订阅)
channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");

//声明消费者
var consumer = new EventingBasicConsumer(channel);

//对消费端进行限流:
//首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 channel.basicConsume(queueName, false, consumer);
//第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
//第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 channel.basicAck(envelope.getDeliveryTag(), true);
//prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
//channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

//消费者处理的事件
consumer.Received += (model, ea) =>
{
//业务逻辑处理
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"normal received => {message}");
};

//消费消息
channel.BasicConsume(queueName2, //队列名
autoAck: true, //确认消费(删除)
consumer: consumer);

}
View Code

4、direct路由模式也叫定向模式

direct类型的Exchange会将消息转发到指定Routing key的Queue上,Routing key的解析规则为精确匹配。也就是只有当producer发送的消息的Routing key与消费端的某个Routing key相等时,消息才会被分发到对应的Queue上。比如现在有一个direct类型的Exchange,它下面绑定了三个Queue,Routing key分别是ORDER/GOODS/STOCK:

 

 

然后向该Exchange中发送一条消息,消息的Routing key是ORDER,那只有Routing key是ORDER的队列有一条消息。(与fanout区别:fanout根据已绑定的交换机的队列发送消息。direct当然也得绑定交换机,只不过再精确匹配到routingkey相等的队列发送消息)
生产者代码:

 /// <summary>
        /// MQ 直接交换机模式发消息(指定routingKey发送)
        /// </summary>
        /// <returns></returns>
        [HttpGet("SendDirectWorkerMQ")]

        public void SendDirectWorkerMQ()
        {
            #region 使用直接交换机模式
            using (var connection = RabbitMQHelper.GetMQConnection())
            {
                //创建通信管道
                using (var channel = connection.CreateModel())
                {
                    //direct只提供交换机名称和routingkey即可,消费端只消费routingkey相匹配的
                    string exchangeName = "direct_exchange";

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性

                    for (int i = 1; i <= 10; i++)
                    {
                        var body = Encoding.UTF8.GetBytes("hello mq" + i + "yellow");
                        //这里绑定了交换机,同时绑定了routekey,就会发送到routekey是yellow的队列中
                        channel.BasicPublish(exchange: exchangeName, routingKey: "yellow", properties, body);

                    }
                }
            }
            #endregion
        }
View Code

消费者代码:

/// <summary>
        /// 直接模式队列消费消息
        /// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
        /// </summary>
        public static void DirectReceiveMessage()
        {
            var connection = RabbitMQHelper.GetMQConnection();
            //创建管道
            var channel = connection.CreateModel();

            //创建交换机
            channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
            //创建队列
            string queueName1 = "directWorkQueue1";//队列名称
            string queueName2 = "directWorkQueue2";
            string queueName3 = "directWorkQueue3";
            channel.QueueDeclare(queue: queueName1,//队列名
                                               durable: false,//是否持久化
                                               exclusive: false,//排它性
                                               autoDelete: false,//一旦客户端连接断开则自动删除queue
                                               arguments: null);//如果安装了队列优先级插件则可以设置优先级
            channel.QueueDeclare(queueName2, false, false, false, null);
            channel.QueueDeclare(queueName3, false, false, false, null);

            //多个队列绑定到fanout_exchange交换机(似发布订阅)
            channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
            channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
            channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");

            //声明消费者
            var consumer = new EventingBasicConsumer(channel);

            //对消费端进行限流:
            //首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
            //第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
            //第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);
            //prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            //消费者处理的事件
            consumer.Received += (model, ea) =>
            {
                //业务逻辑处理
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine($"normal received => {message},routingkey:{ea.RoutingKey}");

                //消费完成后需要手动手动签收消息,如果不写该代码就容易导致重复消费问题
                //可以降低每次签收性能损耗。参数multiple:false就是单个手动签收,true就是批量签收,比如消费30条消息后再确认签收
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            //消息的签收模式
            //手动签收:保证正确消费,不会丢消息(基于客户端而已)
            //自动签收:容易丟失消息
            channel.BasicConsume(queueName1, //消费队列2的消息
                autoAck: false, //代表要手动签收,因可能会出现确认签收了然后宕机了导致没有执行事件,造成消息丢失。解决方案:手动签收操作写在了队列事件完成后。
                consumer: consumer);

        }
View Code

5、topic主题模式也叫通配符模式(路由模式的一种)

根据通配符模糊匹配,将消息交给符合routing pattern(路由模式)的队列。

它与direct相比,都是可以根据routingkey把消息路由到不同的队列。只不过topic类型exchange可以让队列在绑定routingkey的时候使用通配符。

routingkey一般都是有一个或多个单词组成,多个单词以“.”分割,例如:“item.insert”。通配符匹配规则“#”可以匹配一个或多个单词,“*”只能匹配1个单词,例如:“item.#”可以匹配“item.insert.asd”或者“item.insert”,“item.*”只能匹配到“item.insert”。

 

 生产者代码:

public void SendTopicWorkerMQ()
        {
            #region 使用topic交换机模式
            using (var connection = RabbitMQHelper.GetMQConnection())
            {
                //创建通信管道
                using (var channel = connection.CreateModel())
                {
                    //topic只提供交换机名称和routingkey即可,消费端只消费与routingkey通配符匹配的
                    string exchangeName = "topic_exchange";

                    

                    string routingKey1 = "user.america";
                    string routingKey2 = "user.china";
                    string routingKey3 = "user.china.beijing";
                    string routingKey4 = "user.china.beijing.changping";

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性
                    for (int i = 1; i <= 10; i++)
                    {
                        var body = Encoding.UTF8.GetBytes("hello mq" + i + "topic");
                        //传4个不同的routingkey过去,消费者会根据通配符匹配并消费(好像不能在生产者写通配符)
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKey1, properties , body);
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKey2, properties , body);
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKey3, properties , body);
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKey4, properties , body);
                    }
                }
            }
            #endregion
        }
View Code

消费者代码:

/// <summary>
        /// 主题模式队列消费消息
        /// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
        /// </summary>
        public static void TopicReceiveMessage()
        {
            var connection = RabbitMQHelper.GetMQConnection();
            //创建管道
            var channel = connection.CreateModel();

            //创建交换机
            channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
            string exchangeName = "topic_exchange";
            //创建队列
            string queueName1 = "topicWorkQueue1";//队列名称
            string queueName2 = "topicWorkQueue2";
            string queueName3 = "topicWorkQueue3";
            channel.QueueDeclare(queue: queueName1,//队列名
                                               durable: false,//是否持久化
                                               exclusive: false,//排它性
                                               autoDelete: false,//一旦客户端连接断开则自动删除queue
                                               arguments: null);//如果安装了队列优先级插件则可以设置优先级
            channel.QueueDeclare(queueName2, false, false, false, null);
            channel.QueueDeclare(queueName3, false, false, false, null);

            //多个队列绑定到fanout_exchange交换机
            channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "user.*.*");//匹配例如:user.a.b
            channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "user.*");  //匹配例如:user.a
            channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "user.#");  //匹配例如:user...... (user. 后面是啥都行)

            //声明消费者
            var consumer = new EventingBasicConsumer(channel);

            //对消费端进行限流:
            //首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 channel.basicConsume(queueName, false, consumer);
            //第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
            //第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 channel.basicAck(envelope.getDeliveryTag(), true);
            //prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            //消费者处理的事件
            consumer.Received += (model, ea) =>
            {
                //业务逻辑处理
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine($"normal received => {message},routingkey:{ea.RoutingKey}");

                //消费完成后需要手动手动签收消息,如果不写该代码就容易导致重复消费问题
                //可以降低每次签收性能损耗。参数multiple:false就是单个手动签收,true就是批量签收,比如消费30条消息后再确认签收
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            //消息的签收模式
            //手动签收:保证正确消费,不会丢消息(基于客户端而已)
            //自动签收:容易丟失消息
            channel.BasicConsume(queueName2, //消费队列2的消息(可以手动替换其他队列消费)
                autoAck: false, //代表要手动签收,因可能会出现确认签收了然后宕机了导致没有执行事件,造成消息丢失。解决方案:手动签收操作写在了队列事件完成后。
                consumer: consumer);

        }
View Code

6、header 参数匹配模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列,Headers 类型的交换器性能会很差,所以这种类型不常用。

以上注意:Exchange(交换机):只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息将会丢失!!