消息队列 RabbitMQ

发布时间 2023-09-12 15:26:10作者: 不争丶
RabbitMQ 内部结构
  1. 发布者:生产者,消息的发送方。
  2. 连接:网络连接。
  3. Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  4. Exchange:交换器(路由器),负责消息的路由到相应队列。类型:direct、fanout、topic
  5. Binding:队列与交换器间的关联绑定。消费者将关注的队列绑定到指定交换器上,以便Exchange能准确分发消息到指定队列。
  6. Queue:队列,消息的缓冲存储区。
  7. Virtual Host:虚拟主机,虚拟主机提供资源的逻辑分组和分离。包含连接,交换,队列,绑定,用户权限,策略等。
  8. Broker:消息队列的服务器实体。
  9. 消费者:消费者,消息的接收方。

一、安装

1.安装Erlang运行环境    坑:安装完要记得重启
2.安装RabbitMQ

二、消息模型

大致可以分为3类:

  • 基本消费模型(1)
  • Work消息模型(2)
  • 订阅模型(3,4,5)

第6个属于RPC暂时不管。


三、基本消费模型 -邮局

python-one.png
1、生产者 Sending

sending.png

using RabbitMQ.Client;
using System.Text;

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

//创建连接
using(var connection = factory.CreateConnection())
{
    //创建通道
    using(var channel = connection.CreateModel())
    {
        //声明一个队列
        channel.QueueDeclare("hello1",false,false,false,null);

        //如果未声明交换机,则队列会自动绑定到默认direct类型的交换器,并以队列的形式作为路由器

        Console.WriteLine("\nRabbitMQ 连接成功,请输入消息,输入exit退出!");

        string input;
        do
        {
            input = Console.ReadLine();

            //构建byte消息数据包
            var sendBytes =Encoding.UTF8.GetBytes(input);

            //发布消息
            channel.BasicPublish("", "hello1", null, sendBytes);
            //exchange: 交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 Rabbitmo默认的交换器中
            //routingKey: 路由键,交换器根据路由键将消息存储到相应的队列之中。
            // basicProperties: 消息的本国性集,其包14个属性成员,分别有contentlype contentEnoding, headers (Map & lt; String, Object & gt; ) 、deliveryMode、 priority、correlationId、replyTo、 expiration, messageId, timestamp、 type、 userId, appId、 clusterId.
            //byte[] body: 消体 (payload) ,真正需要发送的消息
            // landatory: 没为true时,如果exchange根据自身类型和消息routeke无一符合条的quele,会调用basic.return方法将消息返还给生产者:设为false时,出现上述情形broker会直接将消息扔掉。

        } while (input.Trim().ToLower()!="exit");

    }
}


2、消费者 Receiving
receiving.png
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

//创建连接
using (var connection =factory.CreateConnection())
{
    //创建通道
    using (var channel=connection.CreateModel())
    {
        //事件基本消费者
        EventingBasicConsumer consumer=new EventingBasicConsumer(channel);

        //接收到消息事件  +=附加到事件上
        consumer.Received += (ch, ea) =>
        {
            var message=Encoding.UTF8.GetString(ea.Body.ToArray());
            Console.WriteLine($"收到消息:{message}");

            //确认该消息已被消费
            channel.BasicAck(ea.DeliveryTag, false);
        };

        //启动消费者,设置为手动应答消息
        channel.BasicConsume("hello1", false, consumer);
        Console.WriteLine("消费者1已启动");
        Console.ReadLine();
    }
}


四、Wock消费模型-工作队列 (循环调度)

python-two.png
此消息模型的好处在于如果你积压了任务,我们只需要添加更多的工作者就可以了。默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环(循环调度)
1、再增加一个消费者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

//创建连接
using (var connection = factory.CreateConnection())
{
    //创建通道
    using (var channel = connection.CreateModel())
    {
        //事件基本消费者
        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

        //接收到消息事件  +=附加到事件上
        consumer.Received += (ch, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            Console.WriteLine($"收到消息:{message}");

            //确认该消息已被消费
            channel.BasicAck(ea.DeliveryTag, false);
        };

        //启动消费者,设置为手动应答消息
        channel.BasicConsume("hello1", false, consumer);
        Console.WriteLine("消费者2已启动");
        Console.ReadLine();
    }
}


会出现各种情况:
(1)工作者完成一项任务可能需要几秒钟。如果其中一个消费者开始一项长期任务并且只完成了部分任务而死去会发生什么。
(2)如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失)RabbitMQ将会做什么处理。
(3)当RabbitMQ遇到异常情况退出或崩溃时,我们的任务会怎么办,只能丢失吗?在重新启动RabbitMQ的时候能重新找回来吗?
(4)当我们有两个工人的情况下,当所有奇数消息很重而偶数消息都很轻时,一个工人将一直很忙,另一个工人几乎不做任何工作。这种情况要怎么处理呢。
RabbitMQ都有解决方案
(1、2)消息确认
(3)消息持久性
(4)公平调度

五、订阅模型(三类)


exchanges.png

RabbitMQ 中消息传递模型的核心思想是生产者向交换器发送消息,从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。而交换器,一方面它接收来自生产者的消息,另一方面它将它们推送到队列中,交换必须确切地知道如何处理它收到的消息。

RabbitMQ 给我们提供了3种常用的交换器类型:
direct:定向,把消息交给符合指定routing key 的队列
topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
fanout:广播,将消息交给所有绑定到交换机的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!


1、Direct 定向

(明确的路由规则:消费端绑定的队列名称必须和消息发布时指定的路由名称一致)
direct-exchange.png
使用路由键橙色发布到交换的消息 将被路由到队列Q1。
带有黑色 或绿色路由键的消息将发送到Q2。
所有其他消息将被丢弃。
代码:
using RabbitMQ.Client;
using System.Text;

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

//创建连接
using (var connection = factory.CreateConnection())
{
    //创建通道
    using (var channel = connection.CreateModel())
    {
        var exchangeName = "direct_exchange";

        #region QueueDeclare方法详解
        //queue:队列的名称。
        //durable:设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
        //exclusive:设置是否排他。为true则设置队列为排他的。
        //autoDelete:设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
        //arguments:设置队列的其他一些参数,如x-message-ttl、x-expires、 x - max - length, x - max - length - bytes, x - dead - letter -exchange, x - dead - letter - routing - key、x - max - priority等。

        //声明一个队列hello1
        channel.QueueDeclare("hello1", false, false, false, null);
        //声明一个队列hello2
        channel.QueueDeclare("hello2", false, false, false, null);
        #endregion 

        #region ExchangeDeclare方法详解
        //ExchangeDeclare方法详解
        //exchange:交换器的名称
        //type:交换器的类型,常见的如fanout、direct、topic.
        //durable:设置是否持久化。durable设置为true表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息
        //autoDelete:设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑才会删除。
        //internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
        //argument:其他一些结构化参数
        #endregion

        //创建一个非持久化的,非自动删除的,绑定类型为direct的交换器
        channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

        //将队列与交换机绑定
        channel.QueueBind("hello1", exchangeName, "orange");//使用路由键(orange)将hello1队列和交换器绑定
        channel.QueueBind("hello2", exchangeName, "green");//使用路由键(green)将hello2队列和交换器绑定
        channel.QueueBind("hello2", exchangeName, "black");//使用路由键(black)将hello2队列和交换器绑定

        Console.WriteLine("\nRabbitMQ 连接成功,请输入消息,输入exit退出!");

        string input;
        do
        {
            input = Console.ReadLine();

            //构建byte消息数据包
            var sendBytes = Encoding.UTF8.GetBytes(input);

            //发布消息
            channel.BasicPublish(exchangeName, "orange", null, sendBytes);

            #region BasicPublish方法详解
            //exchange: 交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 Rabbitmo默认的交换器中
            //routingKey: 路由键,交换器根据路由键将消息存储到相应的队列之中。
            // basicProperties: 消息的本国性集,其包14个属性成员,分别有contentlype contentEnoding, headers (Map & lt; String, Object & gt; ) 、deliveryMode、 priority、correlationId、replyTo、 expiration, messageId, timestamp、 type、 userId, appId、 clusterId.
            //byte[] body: 消体 (payload) ,真正需要发送的消息
            // landatory: 没为true时,如果exchange根据自身类型和消息routeke无一符合条的quele,会调用basic.return方法将消息返还给生产者:设为false时,出现上述情形broker会直接将消息扔掉。
            #endregion

        } while (input.Trim().ToLower() != "exit");

    }
}

我们启动两个消费者"hello1"与"hello2" 然后同时启动生产者
(1)我们先使用路由键orange发布消息
            //发布消息
            channel.BasicPublish(exchangeName, "orange", null, sendBytes);
由下图 看出 只向队列hello1 发送

(2)我们再使用路由键green发布消息
        //发布消息
      channel.BasicPublish(exchangeName, "green", null, sendBytes);
由下图 看出 只向队列hello2 发送
(3)我们再使用路由键black发布消息
            //发布消息
            channel.BasicPublish(exchangeName, "black", null, sendBytes);
由下图 看出 也只向队列hello2 发送

2、Fanout 广播

(消息广播,将消息分发到exchange上绑定的所有队列上)
image 47.png
fanout的路由机制(广播)如下图,即发送到 fanout 类型exchange的消息都会分发到所有绑定该exchange的队列上去。

由下图可以看出 消费者都会拿到数据

3、Topic 通配符

(模式匹配的路由规则:支持通配符)

python-five.png

Topic Exchange使用起来非常灵活,它可以通过使用通配符(*与#)来进行模糊匹配(跟我们Api中的模糊查询类似),所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上
看一下匹配规则:

  • "*" 匹配一个单词
  • "#" 匹配一个或多个单词

举个简单的例子:
"mamba.*" 只可以匹配到"mamba.rabbit" 这个格式的routingkey
"mamba.#" 则可以匹配到"mamba.male.rabbit" 这个格式的routingkey



自己理解 

生产者:
1、创建连接工厂
2、创建连接
3、创建通道
4、创建交换器
5、将队列与交换机绑定
6、发布消息

消费者:
1、创建连接工厂
2、创建连接
3、创建通道
4、事件基本消费者
5、接收到消费事件
6、确定消息已被消费
7、启动消费者,设置为手动应答消息