rabbitmq+mqtt+docker-compose搭建MQTT服务器和.netcore 客户端实现 订阅+发布

发布时间 2023-10-26 15:41:09作者: 古兆洋

转自:https://blog.csdn.net/oopxiajun2011/article/details/130658329

1 搭建MQTT服务器

1.1 Dockerfile 内容

1 FROM rabbitmq:3.11.6-management
2 COPY install_rabbitmq_plus.sh /usr/local/
3 RUN  chmod 777 /usr/local/install_rabbitmq_plus.sh 
4 RUN  /bin/sh /usr/local/install_rabbitmq_plus.sh 

1.2 容器中需要安装插件的命令  放在 install_rabbitmq_plus.sh 文件中

1 rabbitmq-plugins enable rabbitmq_management
2 rabbitmq-plugins enable rabbitmq_mqtt
3 rabbitmq-plugins enable rabbitmq_web_mqtt

1.3 构件容器需要用到的yml配置 docker-compose-RabbitMQ.yaml

 1 version: "3.6"
 2 services:
 3   #服务
 4   rabbitmq:
 5     build:          #镜像构建
 6       context:  .   #构建镜像时所在的资源路径
 7       dockerfile: Dockerfile    #构建镜像时需要的dockerfile文件路径
 8     ports:
 9       - 5672:5672
10       - 15672:15672
11       # mqtt端口
12       - 15675:15675
13       - 1883:1883
14     volumes:
15       - ./data/rabbitmq:/var/lib/rabbitmq
16     environment:
17       - TZ=Asia/Shanghai
18       - RABBITMQ_DEFAULT_USER=root
19       - RABBITMQ_DEFAULT_PASS=1234
20     restart: always

1.4 docker-compose 启动RabbitMQ以及MQTT插件服务

docker-compose -f docker-compose-RabbitMQ.yaml up --build  -d

2  .net core 实现

 1  
 2 using MQTTnet;
 3 using MQTTnet.Client;
 4 using MQTTnet.Packets;
 5 using MQTTnet.Protocol;
 6 public class XJ_MQTT
 7     {
 8         public MqttClient GetClient()
 9         {
10             MqttClient mqttClient = new MQTTnet.MqttFactory().CreateMqttClient() as MqttClient;
11             mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync1;
12             mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
13             mqttClient.ConnectingAsync += MqttClient_ConnectingAsync;
14             mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
15             mqttClient.InspectPacketAsync += MqttClient_InspectPacketAsync;
16             try
17             {
18                 MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder();
19                 optionsBuilder.WithTcpServer("mqtt的服务器IP", 1883);
20                 string id = Guid.NewGuid().ToString("N");
21                 optionsBuilder.WithClientId(id);
22                 optionsBuilder.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311);
23                 optionsBuilder.WithCredentials("root", "1234");
24                 mqttClient.ConnectAsync(optionsBuilder.Build()).Wait();
25             }
26             catch (Exception e)
27             {
28                 Console.WriteLine($"连接到MQTT服务器失败" + Environment.NewLine + e.Message + Environment.NewLine);
29  
30             }
31             return mqttClient;
32         }
33  
34         private Task MqttClient_InspectPacketAsync(MQTTnet.Diagnostics.InspectMqttPacketEventArgs arg)
35         {
36  
37             Console.WriteLine($"MqttClient_InspectPacketAsync :{UTF8Encoding.UTF8.GetString(arg.Buffer)}" + Environment.NewLine);
38             return Task.CompletedTask;
39         }
40  
41         private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
42         {
43             Console.WriteLine("已断开MQTT服务器" + Environment.NewLine);
44             return Task.CompletedTask;
45         }
46  
47         private Task MqttClient_ConnectingAsync(MqttClientConnectingEventArgs arg)
48         {
49  
50             Console.WriteLine("链接MQTT服务器中...." + Environment.NewLine);
51             return Task.CompletedTask;
52         }
53  
54         private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
55         {
56             Console.WriteLine("已连接到MQTT服务器" + Environment.NewLine);
57             return Task.CompletedTask;
58         }
59  
60         private Task MqttClient_ApplicationMessageReceivedAsync1(MqttApplicationMessageReceivedEventArgs arg)
61         {
62             Console.WriteLine($">>ClientId:{arg.ClientId} \r\nTopic:{arg.ApplicationMessage.Topic}\r\nPayload:{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}{Environment.NewLine}" + Environment.NewLine);
63             return Task.CompletedTask;
64         }
65  
66     }

2.1 实现发布 

 1 using MQTTnet;
 2 using MQTTnet.Client;
 3 using MQTTnet.Packets;
 4 using MQTTnet.Protocol;
 5  
 6  
 7 var mq = new XJ_MQTT();
 8 var client = mq.GetClient();
 9 while (true)
10 {
11     string input = Console.ReadLine();
12     client.PublishStringAsync("testTpic", "payload" + Guid.NewGuid().ToString("N") + ":" + input);
13 }

2.2 实现订阅

1 using MQTTnet.Packets;
2 var mq = new XJ_MQTT();
3 var client = mq.GetClient();
4 client.SubscribeAsync(new MQTTnet.Client.MqttClientSubscribeOptions()
5 {
6     TopicFilters = new List<MqttTopicFilter>() { new MqttTopicFilter() { Topic = "testTpic" } }
7 });
8 while (true)
9     Console.ReadLine();

3 运行效果