c#使用RabbitMQ.Client源码

发布时间 2023-03-22 21:10:49作者: 真的是小鹏友
生产者代码:
using RabbitMQ.Client;
using System.Text;

//创建rabbit mq连接基础设置
var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    Port = 5672,
    VirtualHost = "/"
};
//创建rabbit mq连接
using (var connection = factory.CreateConnection())
{
    //创建信道/通道
    using (var channel = connection.CreateModel())
    {
        //声明创建队列queue或者交换机exchange
        channel.QueueDeclare(queue: "mymessage", durable: true, exclusive: false, autoDelete: false, arguments: null);
        //channel.ExchangeDeclare(exchange: "example.exchange", ExchangeType.Topic, durable: true, autoDelete: true, arguments: null);
        
        //设置消息持久化
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        //开启消息确认(生产者发送至MQ,确保此次生产成功发送)。
        channel.ConfirmSelect();
        Console.WriteLine("输入需要传输的消息,输入Exit退出");
        var message = Console.ReadLine();
        while (message != "Exit")
        {
            //Console.WriteLine("输入RouteKey");
            //var routekey = Console.ReadLine();
            var body = Encoding.UTF8.GetBytes(message);

            //将生产信息发布。
            channel.BasicPublish(exchange: "",
                                 routingKey: "mymessage",
                                 basicProperties: properties,
                                 body: body);
            //等待消息确认
            bool flag = channel.WaitForConfirms();
            if (flag)
            {
                Console.WriteLine(" 发送消息 {0} 成功!", message);
            }
            else
            {
                Console.WriteLine(" 发送消息 {0} 错误!", message);
            }
            message = Console.ReadLine();
        }
    }
}
Console.WriteLine("按回车退出");
Console.ReadLine();

消费者/订阅者代码:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Reflection;
using System.Text;

//创建rabbit mq连接基础设置
var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    Port = 5672,
    VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
    //创建信道/通道
    using (var channel = connection.CreateModel())
    {
        //声明创建队列queue或者交换机exchange
        //channel.ExchangeDeclare(exchange: "example.exchange", type: ExchangeType.Topic, durable: true, autoDelete: true);
        //绑定交换机。
        //channel.QueueBind(queue: "mymessage", exchange: "example.exchange", routingKey: "#.log");
        //声明创建队列queue或者交换机exchange
        channel.QueueDeclare(queue: "mymessage", durable: true, exclusive: false, autoDelete: false, arguments: null);
        //设置每个消费者公平分发
        channel.BasicQos(0, 1, false);

        //定义时间消费者异步操作。
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            //读取生产传递body参数。
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());

            //执行逻辑。
            int dots = message.Length;
            Thread.Sleep(dots * 1000);

            //返回响应事件标识,//释放rabbit mq内存
            channel.BasicAck(ea.DeliveryTag, false);
            Console.WriteLine("收到消息 {0}", message);
        };

        //执行消费:autoAck默认不进行自动返回,使用channel.BasicAck方法在异步消费完成后再次返回标识。
        channel.BasicConsume(queue: "mymessage",
                             autoAck: false,
                             consumer: consumer);

        Console.WriteLine("按回车退出");
        Console.ReadLine();
    }
}

 源码下载地址:RabbitMQ_demo.zip