乘风破浪,遇见最佳跨平台跨终端框架.Net Core/.Net生态 - 对接MQTT和它的代理们

发布时间 2023-05-29 00:55:19作者: TaylorShi

什么是MQTT

https://mqtt.org

image

MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,专门用于物联网(IoT)设备之间的通信。它是基于发布/订阅(publish/subscribe)模式的协议,通过中间代理(broker)进行消息的传输

MQTT在物联网应用中被广泛使用,特别是在传感器网络、远程监控、物联网平台等领域。它提供了一种简单、高效、可靠的方式来实现设备之间的通信和数据传输。

基本概念

  • 发布/订阅模式:MQTT使用发布/订阅模式,其中发布者(Publisher)将消息发布到特定的主题(Topic),而订阅者(Subscriber)可以选择订阅感兴趣的主题。这种模式可以实现一对多的消息传递。

  • 主题(Topic):主题是MQTT中消息的分类标识。发布者发布消息到特定的主题上,而订阅者则订阅感兴趣的主题以接收相关消息。

  • 代理(Broker)代理是MQTT通信的中间服务器,负责接收发布者发送的消息,并将其传递给相应的订阅者。代理还可以处理订阅和取消订阅请求,并维护客户端之间的连接。

服务质量等级(QoS)

服务质量等级(QoS, Quality of Service):MQTT提供了不同的服务质量等级,用于确保消息的可靠性和传输效率。

QoS级别包括

  • QoS 0(最多一次):消息最多传输一次,不保证消息的可靠性。
  • QoS 1(至少一次):消息至少传输一次,确保消息到达代理,但可能会出现重复消息。
  • QoS 2(仅一次):消息仅传输一次,确保消息只被传递一次,确保最高的可靠性。

特点

  • 轻量级:MQTT被设计为一种轻量级的协议,具有较小的网络开销和资源需求。这使得它非常适合在受限的网络环境和资源受限的设备上使用。

  • 支持异步通信:MQTT允许发布者和订阅者进行异步通信,不需要两者同时在线。发布者可以发布消息,而订阅者可以在其上线后接收到消息。

和消息队列的区别

MQTT是一种轻量级的通信协议,专门用于物联网设备之间的通信。它基于发布/订阅模式,通过中间代理(Broker)进行消息的传输。MQTT适用于需要实时传输数据、低带宽消耗和资源受限的环境,例如传感器网络和物联网应用。

消息队列是一种通信模式,用于在分布式系统中进行异步消息传递。它通常基于消息队列服务(Message Queue Service),提供消息的存储、转发和交付等功能。消息队列可以用于解耦消息的发送者和接收者,实现异步通信、流量控制和削峰填谷等功能。它适用于处理大量消息和高并发的情况,例如应用程序之间的解耦、任务调度和事件驱动架构等。

MQTT和消息队列之间的一些区别

  • 通信模式:MQTT是基于发布/订阅模式的通信协议,允许多个订阅者订阅特定主题的消息。消息队列通常是基于点对点或发布/订阅模式的,消息的发送者将消息发送到队列中,然后接收者从队列中接收消息。

  • 应用场景:MQTT主要用于物联网设备之间的实时通信,例如传感器数据的实时传输和设备控制。消息队列更适用于解耦和异步通信的场景,例如大规模分布式系统、任务处理和事件驱动架构。

  • 传输特点:MQTT是一种轻量级协议,具有较小的网络开销和资源需求。它通常用于低带宽环境和资源受限的设备。消息队列通常提供持久化、消息确认和重试等特性,以确保消息的可靠性和可用性。

  • 扩展性和性能:MQTT适用于大规模的设备连接,但在处理大量并发连接时性能可能受到限制。消息队列通常具有良好的扩展性和高吞吐量,能够处理大量的消息和并发请求。

MQTT适用于物联网设备之间的实时通信,而消息队列适用于解耦和异步通信的场景。选择使用哪种机制取决于具体的应用需求、通信模式和性能要求。在某些情况下,MQTT和消息队列也可以结合使用,根据实际情况选择合适的通信方式。

MQTT的代理(Broker)

MQTT协议支持多种不同的代理(broker)实现,官方已有一些推荐,以下是一些常见的MQTT broker及其优缺点:

Mosquitto

https://mosquitto.org

https://test.mosquitto.org

Eclipse Mosquitto是一个开源的(EPL/EDL许可)消息代理,实现了MQTT协议的5.0、3.1.1和3.1版本。Mosquitto是轻量级的,适合在所有设备上使用,从低功率的单板计算机到完整的服务器。

image

  • 优点:Mosquitto是一个开源的、轻量级的MQTT broker,易于安装和配置。它具有较低的资源消耗,适用于嵌入式设备和资源受限的环境。Mosquitto还提供了可扩展性和安全性的特性。
  • 缺点:在处理大量并发连接时,Mosquitto的性能可能受到限制。

HiveMQ

https://www.hivemq.com

HiveMQ是一个MQTT代理,它从一开始就以最大的可扩展性和企业级的安全性为目标。它有原生的网络套接字支持和一个开源的插件SDK来扩展其功能或将其与其他组件集成。

image

  • 优点:HiveMQ是一个功能强大的商业MQTT broker,具有良好的可扩展性和性能。它支持高并发连接和大规模部署,并提供了可靠性和可用性方面的高级功能。HiveMQ还提供了全面的安全特性和集成选项。
  • 缺点:HiveMQ是商业产品,需要付费许可证才能使用其高级功能。

EMQX

https://www.emqx.com/en/products/emqx

EMQX是一个完全开源、高度可扩展、高度可用的分布式MQTT消息代理,用于物联网、M2M和移动应用,可以处理数千万个并发客户端。
从3.0版本开始,EMQX完全支持MQTT V5.0协议规范,并向后兼容MQTT V3.1和V3.1.1,以及其他通信协议,如MQTT-SN、CoAP、LwM2M、WebSocket和STOMP。EMQX的3.0版本可以在一个集群上扩展到1000多万个并发的MQTT连接。

image

  • 优点:EMQ X是一个开源的、高度可扩展的MQTT broker,具有出色的性能和可靠性。它支持海量并发连接和分布式部署,并提供了高级的安全特性和集群功能。EMQ X还支持多种协议和插件扩展。
  • 缺点:配置和管理EMQ X可能需要一定的学习成本,对于初学者来说可能有一定的复杂性。

RabbitMQ

https://www.rabbitmq.com

RabbitMQ是一个AMQP消息代理 - 有一个MQTT插件(捆绑在3.x版本以上)。

image

  • 优点:RabbitMQ是一个通用的消息代理,支持多种协议,其中包括MQTT。它具有成熟的可靠性和可用性特性,支持高吞吐量和可扩展性。RabbitMQ还提供了丰富的功能和灵活的路由规则。
  • 缺点:相比于专门为MQTT设计的代理,RabbitMQ在MQTT方面的性能可能较低

MQTTnet

https://github.com/dotnet/MQTTnet

MQTTnet是一个高性能的.NET库,用于基于MQTT的通信。它提供了一个MQTT客户端和一个MQTT服务器(Broker),并支持MQTT协议的第5版。它与大多数支持的.NET框架版本和CPU架构兼容。

image

  • 优点

    • 开源且活跃的社区支持:MQTTnet是一个开源项目,拥有活跃的社区支持。这意味着你可以从社区中获取更新、修复bug和功能扩展,以及与其他开发者分享经验和解决方案。

    • 跨平台支持:MQTTnet是基于C#语言开发的,可以在多个平台上使用,包括Windows、Linux和.NET Core等。这使得它具有广泛的应用领域和灵活性。

  • 简单易用:MQTTnet提供了简单而直观的API,使得在.NET平台上实现MQTT通信变得简单易用。它提供了连接管理、发布/订阅功能以及QoS支持等核心功能。

    • 高度可配置:MQTTnet允许你根据需求进行灵活的配置。你可以设置连接选项、消息保留策略、QoS级别、消息处理回调等,以满足不同的通信需求。
  • 缺点

    • C#语言限制:由于MQTTnet是基于C#语言的,因此它在某些嵌入式设备或特定平台上的可用性可能会受到限制。

    • 可能存在性能瓶颈:尽管MQTTnet已经努力优化性能,但在处理大量并发连接或高吞吐量时,仍可能面临性能瓶颈的挑战。这需要根据具体应用场景进行测试和优化。

    • 依赖于第三方库:MQTTnet可能依赖于其他第三方库,这可能增加项目的复杂性和维护的难度。需要注意对依赖库的版本控制和更新。

这些仅是一些常见的MQTT broker实现,还有其他许多可选择的实现,每个实现都有其自己的特点和适用场景。选择合适的MQTT broker取决于你的具体需求,包括预期的并发连接数、性能要求、可扩展性需求、安全性需求以及预算等因素。

Mosquitto安装

Windows安装包

image

image
image
image

安装之后,它会自动创建一个mosquitto的服务,默认是没有自动开启的,我们可以手动打开下。

image

image

这时候这个代理就运行起来了。

Docker运行

https://hub.docker.com/_/eclipse-mosquitto

执行如下命令即可:

docker run -d --name mosquitto --restart unless-stopped -p 1883:1883 -p 9001:9001 eclipse-mosquitto:2.0.15

image

调试MQTT

MQTT.fx

https://softblade.de/en/welcome/

MQTTX

MQTTX是一款开源的跨平台桌面客户端,它简单易用且提供全面的MQTT 5.0功能、特性测试,可运行在macOS, Linux和Windows上。同时,它还提供了命令行及浏览器版本,满足不同场景下的MQTT测试需求。

https://mqttx.app

https://github.com/emqx/MQTTX

https://github.com/emqx/MQTTX/releases

image

image

对接MQTT

通过MQTTnet对接MQTT

Nuget包

https://github.com/dotnet/MQTTnet

https://www.nuget.org/packages/MQTTnet

dotnet add package MQTTnet

连接

连接代码

var mqttFactory = new MqttFactory();

using (var mqttClient = mqttFactory.CreateMqttClient())
{
    // Use builder classes where possible in this project.
    var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").Build();

    // This will throw an exception if the server is not available.
    // The result from this message returns additional data which was sent 
    // from the server. Please refer to the MQTT protocol specification for details.
    var response = await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

    Console.WriteLine("The MQTT client is connected.");

    //response.DumpToConsole();

    // Send a clean disconnect to the server by calling _DisconnectAsync_. Without this the TCP connection
    // gets dropped and the server will handle this as a non clean disconnect (see MQTT spec for details).
    var mqttClientDisconnectOptions = mqttFactory.CreateClientDisconnectOptionsBuilder().Build();

    await mqttClient.DisconnectAsync(mqttClientDisconnectOptions, CancellationToken.None);
}

订阅

var mqttFactory = new MqttFactory();

using (var mqttClient = mqttFactory.CreateMqttClient())
{
    var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer("broker.hivemq.com").Build();

    // Setup message handling before connecting so that queued messages
    // are also handled properly. When there is no event handler attached all
    // received messages get lost.
    mqttClient.ApplicationMessageReceivedAsync += e =>
    {
        Console.WriteLine("Received application message.");
        e.DumpToConsole();

        return Task.CompletedTask;
    };

    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

    var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
        .WithTopicFilter(
            f =>
            {
                f.WithTopic("mqttnet/samples/topic/2");
            })
        .Build();

    await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);

    Console.WriteLine("MQTT client subscribed to topic.");

    Console.WriteLine("Press enter to exit.");
    Console.ReadLine();
}

发送

var mqttFactory = new MqttFactory();

using (var mqttClient = mqttFactory.CreateMqttClient())
{
    var mqttClientOptions = new MqttClientOptionsBuilder()
        .WithTcpServer("broker.hivemq.com")
        .Build();

    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

    var applicationMessage = new MqttApplicationMessageBuilder()
        .WithTopic("samples/temperature/living_room")
        .WithPayload("19.5")
        .Build();

    await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

    await mqttClient.DisconnectAsync();
    
    Console.WriteLine("MQTT application message is published.");
}

参考