using RabbitMQ.Client; using System.Text; //创建rabbit mq连接基础设置 var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "admin", Password = "admin", Port = 5672, VirtualHost = "/" }; //创建rabbit mq连接 using (var connection = factory.CreateConnection()) { //创建信道/通道 using (var channel = connection.CreateModel()) { //声明创建队列queue或者交换机exchange channel.QueueDeclare(queue: "mymessage", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "example.exchange", ExchangeType.Topic, durable: true, autoDelete: true, arguments: null); //设置消息持久化 var properties = channel.CreateBasicProperties(); properties.Persistent = true; //开启消息确认(生产者发送至MQ,确保此次生产成功发送)。 channel.ConfirmSelect(); Console.WriteLine("输入需要传输的消息,输入Exit退出"); var message = Console.ReadLine(); while (message != "Exit") { //Console.WriteLine("输入RouteKey"); //var routekey = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //将生产信息发布。 channel.BasicPublish(exchange: "", routingKey: "mymessage", basicProperties: properties, body: body); //等待消息确认 bool flag = channel.WaitForConfirms(); if (flag) { Console.WriteLine(" 发送消息 {0} 成功!", message); } else { Console.WriteLine(" 发送消息 {0} 错误!", message); } message = Console.ReadLine(); } } } Console.WriteLine("按回车退出"); Console.ReadLine();
消费者/订阅者代码:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Reflection; using System.Text; //创建rabbit mq连接基础设置 var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "admin", Password = "admin", Port = 5672, VirtualHost = "/" }; using (var connection = factory.CreateConnection()) { //创建信道/通道 using (var channel = connection.CreateModel()) { //声明创建队列queue或者交换机exchange //channel.ExchangeDeclare(exchange: "example.exchange", type: ExchangeType.Topic, durable: true, autoDelete: true); //绑定交换机。 //channel.QueueBind(queue: "mymessage", exchange: "example.exchange", routingKey: "#.log"); //声明创建队列queue或者交换机exchange channel.QueueDeclare(queue: "mymessage", durable: true, exclusive: false, autoDelete: false, arguments: null); //设置每个消费者公平分发 channel.BasicQos(0, 1, false); //定义时间消费者异步操作。 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { //读取生产传递body参数。 var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); //执行逻辑。 int dots = message.Length; Thread.Sleep(dots * 1000); //返回响应事件标识,//释放rabbit mq内存 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine("收到消息 {0}", message); }; //执行消费:autoAck默认不进行自动返回,使用channel.BasicAck方法在异步消费完成后再次返回标识。 channel.BasicConsume(queue: "mymessage", autoAck: false, consumer: consumer); Console.WriteLine("按回车退出"); Console.ReadLine(); } }
源码下载地址:RabbitMQ_demo.zip