物联网 MQTT 协议

发布时间 2023-12-29 03:29:43作者: moon~light



简介

MQTT(Message Queuing Telemetry Transport 消息队列遥测传输)是基于 TCP 的发布/订阅模式的消息协议,主要用于物联网,适合于硬件性能低、需要可靠消息、需要实时传递的场景

优点

  • 轻量协议:消息头为 2 字节,降低带宽消耗,提升了传输效率,适合于硬件和网络都受限的环境
  • 发布订阅:异步协议,实现了发送者和接收者的解耦,也进一步降低对网络带宽和硬件的要求
  • 推送模式:采用消息实时推送,对于需要快速响应的应用很有用
  • 保留消息:有会话控制,当设备断开重连,可以继续使用原有的会话,可以自动重发离线消息
  • QoS级别:可以灵活调整可靠性,有三个级别,QoS-0 最多一次,QoS-1 至少一次,QoS-2 仅一次

以上优点很适用于物联网,因为物联网差异很大,有很多硬件性能低,网络带宽小,网络可靠性差的场景

Topic

MQTT topic 由一个或多个主题级别组成,每个主题级别由正斜杠分隔

比如 myhome/groundfloor/livingroom/temperature 大小写敏感,第一个字符最好不用斜杠

单级通配符 myhome/groundfloor/+/temperature

多级通配符 myhome/groundfloor/#

订阅可以使用通配符,发布要指定具体 topic

流程

client --> CONNECT --> broker
client <-- CONNACK <-- broker

client --> SUBSCRIBE --> broker
client <-- SUBACK    <-- broker
	
client --> UNSUBSCRIBE --> broker
client <-- UNSUBACK    <-- broker
	
client --> PUBLISH --> broker --> PUBLISH --> client

不同的 QoS 在 publish 的时候流程有点不一样

安装

MQTT 有不同实现,这里以 mosquitto 和 emqx 为例

使用 mosquitto

下载
====
https://mosquitto.org/download/


修改 mosquitto.conf
===================
listener 1883
allow_anonymous true


启动 MQTT
=========
cd C:\Program Files\mosquitto
mosquitto.exe -c mosquitto.conf


publish
=======
mosquitto_pub.exe -h localhost -p 1883 -t <topic> -m <message>


subscribe
=========
mosquitto_sub.exe -h localhost -p 1883 -t <topic>

使用 emqx

emqx 集群,能支持 1 亿级别的设备链接,同时支持每秒百万级别的消息,以及毫米级时延
emqx 支持转发消息到 Kafka


安装
====
wget https://www.emqx.com/zh/downloads/broker/v4.4.18/emqx-4.4.18-otp24.3.4.2-1-ubuntu20.04-amd64.deb
sudo apt install ./emqx-4.4.18-otp24.3.4.2-1-ubuntu20.04-amd64.deb

sudo systemctl start emqx
sudo systemctl stop emqx

cat /etc/emqx/emqx.conf  | grep -v "^##" | grep -v "^$"


版本
====
MQTT 主流版本 3.1, 3.1.1, 5.0
之所以没有 4 据说是因为协议里面标记版本的字节,数值 3 被用来代表版本 3.1,数值 4 被用来代表版本 3.1.1,所以下个版本就直接跳到 5
https://www.jensd.de/wordpress/?p=2667

EMQX 最新版本是 5,而 EMQX 的版本 5 支持 MQTT 3.x 和 5.0 的
https://www.emqx.io/docs/en/latest/


安装客户端
=========
https://mqttx.app/zh

这个客户端可以给所有 MQTT 服务用,不仅是 EMQX 和 mosquitto


web
===
http://localhost:18083/

emqx 提供 web,默认账号密码是 admin/public


消息端口
======
1883: 用于非加密的 MQTT 通信
1884: 用于通过 TLS/SSL 进行安全的 MQTT 通信
11883: 用于 MQTT over WebSocket 的端口, 允许在 Web 浏览器中进行 MQTT 通信


Clean Session 配置
=================
MQTT 3.1.1 使用 Clean Session, 如果是 false 代表会话永久存在
MQTT 5.0   使用 Clean Start 和 Session Expiry Interval, 需要指定会话过期时间

这个不是 emqx 的,是 MQTT 协议的一部分


System topic
============
$SYS/brokers
$SYS/brokers/{node}/
$SYS/brokers/emqx@127.0.0.1/uptime
$SYS/brokers/${node}/clients/${clientid}/connected
$SYS/brokers/${node}/clients/${clientid}/disconnected
$SYS/brokers/emqx@127.0.0.1/clients/client-id-1/connected

用于接收系统消息,比如 connected 消息
{
    "username":"user-1",
    "ts":1698999394480,
    "sockport":1883,
    "proto_ver":5,
    "proto_name":"MQTT",
    "keepalive":60,
    "ipaddress":"127.0.0.1",
    "expiry_interval":0,
    "connected_at":1698999394480,
    "connack":0,
    "clientid":"client-id-1",
    "clean_start":true
}


event topic
===========
/etc/emqx/plugins/emqx_rule_engine.conf

$events/session_subscribed       // subscribe topic 时触发 (需要 acl.conf 配有权限)
$events/session_unsubscribed
$events/message_dropped          // 比如没 subscribe 就发消息,就会触发 message_dropped 事件

用于接收事件,比如 subscribe 事件
{
	"username":"user-1",
	"topic":"mytest/helloworld/#",
	"timestamp":1702004865027,
	"qos":0,
	"peerhost":"127.0.0.1",
	"node":"emqx@127.0.0.1",
	"event":"session.subscribed",
	"clientid":"client-id-1"
}


HTTP API
========
https://www.emqx.io/docs/en/v4.4/advanced/http-api.html#api-endpoints


shared subscription
===================
https://www.emqx.io/docs/en/latest/messaging/mqtt-shared-subscription.html

emqx 支持 group 内部的负载均衡 (类似 Kafka 的 group)
在 subscribe topic 的时候加上前缀 $share/<group-name>/ 就可以

比如多个 client 如果 subscribe 的 topic 都是 $share/group-1/mytest/helloworld/#
实际 subscribe 的是 mytest/helloworld/#
消息会在这些 client 内部做负载均衡

由于 MQTT 是轻量协议,可以有多个实现

代码

# sudo pip3 install paho-mqtt

import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))


def on_subscribe(client, userdata, mid, granted_qos):
    print("Subscribed: " + str(mid) + " " + str(granted_qos))


def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))


client = mqtt.Client(client_id="client-id-1", clean_session=False, protocol=mqtt.MQTTv311)

client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.on_message = on_message

client.username_pw_set(username="user-1")

client.connect(host="localhost", port=1883, keepalive=60)
client.subscribe("mytest/helloworld/#")

client.loop_forever()


# 在新窗口
client_2 = mqtt.Client(client_id="client-id-2", clean_session=False, protocol=mqtt.MQTTv311)
client_2.username_pw_set(username="user-2")
client_2.connect(host="localhost", port=1883, keepalive=60)
client_2.publish("mytest/helloworld/topic1", "payload-test", 1)

代码实现也比较简单

和 Kafka 比较

Kafka 没有 MQTT 轻量
Kafka 会持久化数据到文件,再通过 pull 模式处理
Kafka 的可靠相当于只能到 QoS-1
Kafka 通过分区实现更好的扩展性,通过多节点实现可靠性
Kafka 更适合于大数据的处理,支持批量处理

像 EMQX 这样的 MQTT 服务已经和 Kafka 集成,可以设置在服务端收到 MQTT 消息后再丢给内部的 Kafka,这样把消息存下来,后面再处理

这样既能利用 MQTT 可靠地链接外部那些设备量大,但单个设备数据量小,网络又不可靠的终端
又能利用 Kafka 在内部做大数据量的存储,和高吞吐量的处理

和 HTTP 比较

HTTP 是重量级协议,有很多 header 和规制
HTTP 是同步协议,需要等待服务端回复
HTTP 是单向协议,必须是客户端发起情况,服务端无法向物联网设备发起请求
HTTP 可靠性差,失败了需要自己做重发处理,服务器也无法知道客户端收到响应没有