EMQX简单入门

发布时间 2023-08-27 18:03:26作者: 为什么不是这样呢

最近项目上使用了mqtt协议来传输数据,之前没了解过,故简单学习下,本文作为学习记录以便之后复习使用。

1.什么是MQTT

MQTT 是一种基于发布/订阅模式轻量级消息传输协议,专门针对低带宽不稳定网络环境的物联网应用而设计,可以用极少的代码为联网设备提供实时可靠的消息服务。MQTT 协议广泛应用于物联网、移动互联网、智能硬件、车联网、智慧城市、远程医疗、电力、石油与能源等领域按照MQTT协议作者的说法,MQTT协议必须具备这几点。

  1. 简单容易实现
  2. 支持 QoS(设备网络环境复杂)
  3. 轻量且省带宽(因为那时候带宽很贵)
  4. 数据无关(不关心 Payload 数据格式)
  5. 有持续地会话感知能力(时刻知道设备是否在线

而MQTT协议凭借着轻量高效、可靠的消息传递、海量连接支持、安全的双向通信等优点已成为物联网行业的首选协议。

2.MQTT工作原理

2.1 基础概念

MQTT客户端:任何运行 MQTT客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT测试工具也是客户端。

MQTT Broker: MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。(而EMQX最为优秀的mqtt broker是首选的学习对象。本文也是基于emqx来进行学习介绍的。)

发布订阅模式:发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。

主题:MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,MQTT 主题支持以下两种通配符:+ 和 #。1.  +:表示单层通配符,例如 a/+ 匹配 a/x 或 a/y。 2.  #:表示多层通配符,例如 a/# 匹配 a/xa/b/c/d【注意:通配符主题只能用于订阅,不能用于发布。】

QoS(Quality of Service):

MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。

  • QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
  • QoS 1:消息至少传送一次。
  • QoS 2:消息只传送一次。

 

2.2 工作流程

  1. 客户端使用 TCP/IP 协议与 Broker 建立连接,可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
  2. 客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
  3. MQTT Broker 接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。

 3.MQTT简单使用

EMQX优势的地方在于提供了Web形式的客户端(MQTTX)以及在线试用的Broker,这对于学习MQTT的朋友提供了极大的便利。

本文使用docker形式部署本地的emqx。部署步骤如下:

  1. docker pull emqx/emqx:5.1.6
  2. docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.1.6
  3. 浏览器输入你的IP:18083,默认账户密码为admin/public,首次登录需改密码。

页面打开的是EMQX 提供了一个内置的管理控制台,即 EMQX Dashboard。方便用户通过 Web 页面就能轻松管理和监控 EMQX 集群,并配置和使用所需的各项功能。这也是EMQX的优势。

在EMQX Dashboard中进入客户端认证页面,添加基于password_based方式认证机制,添加时也可以选择存储认证信息的数据源,使用非常方便。【可选】

下面已Python客户端(paho-mqtt)为例演示数据接收与发送,Broker使用本地部署的,当然也可已使用EMQX提供的试用Broker。直接上代码。

import random
import time

from paho.mqtt import client as mqtt_client


broker = 'IP'
port = 8083
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = '账户'
password = '密码'


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id, transport="websockets")  # 使用websockets方式连接,默认端口8083
    client.username_pw_set(username=username, password=password)
    client.on_connect = on_connect
    client.connect(host=broker, port=port)
    return client


def publish(client: mqtt_client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        status = result[0]
        if status == 0:
            print(f"Send {msg} to topic {topic}")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)
import random

from paho.mqtt import client as mqtt_client

broker = 'IP'
port = 8083
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = '账户'
password = '密码'


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id, transport="websockets")
    client.username_pw_set(username=username, password=password)
    client.on_connect = on_connect
    client.connect(host=broker, port=port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(msg)
        print(f"Received {msg.payload.decode()} from {msg.topic} topic")

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()

运行脚本后可以在控制台看到实时的看到Broker的使用情况,简直太方便了。