Ubuntu22.04 安装 MQTT 服务器

发布时间 2023-12-23 12:14:16作者: 路有所思

本周 web 课简单了解了MQTT,在此记录下我的安装使用(用python写个订阅新闻的简单示例)流程,如有错误,欢迎指正!

1. MQTT 简介

MQTT 是一种轻量级的消息传递协议,专为低带宽、高延迟或不可靠的网络环境设计。

1.1 MQTT 的主要特点

  • 轻量级和低带宽:MQTT 协议的头部非常小,这使得它在带宽有限的情况下依然有效。

  • 使用 TCP/IP 进行消息传输:保证了消息的可靠传输。

  • 发布-订阅模型:MQTT 使用发布-订阅消息模式,这意味着发送者(发布者)和接收者(订阅者)不直接进行通信。相反,消息通过主题交换,发布者将消息发布到主题,订阅者订阅它们感兴趣的主题并接收相应的消息。

  • 支持三种消息发布服务质量(QoS)

    • 0(最多一次)
    • 1(至少一次)
    • 2(仅一次)

1.2 核心组件

  • Broker:MQTT 消息代理服务器,负责接收所有消息、过滤消息、决定谁是消息的接收者,以及将消息传输给客户端。

有许多MQTT代理可用于测试和实际应用。有免费的自托管代理,最流行的是 Mosquitto,还有商业代理,如 HiveMQ

Mosquitto 是一个免费的开源MQTT代理,可在Windows和Linux上运行。

  • Client:任何可以与 Broker 建立连接的设备。客户端可以是发布消息的生产者,也可以是订阅消息的消费者。

对于MQTTv3.1.1,在几乎所有编程语言和主要操作系统Linux、Windows、Mac中都有客户端软件,来自Eclipse Paho项目。如:Paho Python客户端

1.3 消息流程

  • 连接:客户端通过 TCP/IP 协议连接到 Broker。
  • 发布(Publish):客户端向特定主题发送消息。
  • 订阅(Subscribe):客户端订阅一个或多个主题,以接收来自这些主题的消息。
  • 断开连接(Disconnect):结束会话时,客户端与 Broker 断开连接。

2. 在 Ubuntu 22.04 上安装 MQTT

2.1 安装 MQTT Broker (Mosquitto)

  • 在终端运行以下命令安装 Mosquitto 服务器和客户端:
sudo apt-get update
sudo apt-get install mosquitto mosquitto-clients

命令执行后将安装 Mosquitto MQTT Broker 以及一些有用的客户端工具

2.2 安装 MQTT 客户端库

  • 如果您还需要安装 MQTT 客户端库,如 Paho-MQTT,以便与 MQTT 服务器交互,可以使用 pip:
pip3 install paho-mqtt

3. MQTT 相关命令

注意:

  • 这些命令都需要管理员权限,因此需要使用 sudo
  • 在不同的 Linux 发行版上,服务管理工具可能有所不同。在较新的系统上,推荐使用systemctl ,而在一些老旧的系统上,可能需要使用 service

3.1 使用 systemctl

推荐用于新版系统,如 Ubuntu 16.04 及以后版本

  • 启动 Mosquitto 服务
sudo systemctl start mosquitto
  • 停止 Mosquitto 服务
sudo systemctl stop mosquitto
  • 重启 Mosquitto 服务
sudo systemctl restart mosquitto
  • 查看 Mosquitto 服务状态
sudo systemctl status mosquitto
  • 设置 Mosquitto 服务在启动时自动运行
sudo systemctl enable mosquitto
  • 禁用 Mosquitto 服务自动启动
sudo systemctl disable mosquitto

3.2 使用 service

老旧系统上的方法

  • 启动 Mosquitto 服务
sudo service mosquitto start
  • 停止 Mosquitto 服务
sudo service mosquitto stop
  • 重启 Mosquitto 服务
sudo service mosquitto restart
  • 查看 Mosquitto 服务状态
sudo service mosquitto status

4. 实现 MQTT 的 python 编程

4.1 需求

  • 实现雅虎RSS生活类新闻的订阅功能
  • 客户端可以通过订阅主题(/rssnews),获取最新的10条新闻消息(只包括新闻标题和时间)

4.2 代码实现

4.2.1 发布者

# mqtt 服务端

import time
from datetime import datetime

import feedparser
import paho.mqtt.client as mqtt
import pytz

# RSS源URL和MQTT Broker地址
BASE_URL = "https://news.yahoo.co.jp/rss/categories/life.xml"
BROKER_ADDRESS = "localhost"
UPDATE_INTERVAL = 60  # 更新间隔时间(秒)


def MQTTPublish():
    # 创建一个 MQTT 客户端实例
    client = mqtt.Client("NewsPublisher")

    # 连接到 MQTT 服务器
    client.connect(BROKER_ADDRESS)

    print("MQTT 客户端已连接到服务器.")

    # 持续运行,定期发布新闻
    while True:
        news_data = get_latest_news(BASE_URL)
        client.publish("/rssnews", news_data)
        print(f"已发布最新新闻到 /rssnews,等待 {UPDATE_INTERVAL} 秒后重新发布.")
        time.sleep(UPDATE_INTERVAL)


# 获取最新的十条新闻
def get_latest_news(url):
    feed = feedparser.parse(url)
    formatted_news_list = ""
    for index, item in enumerate(feed.entries[:10], start=1):
        title = item.title
        pub_date = convert_to_beijing_time(item.published)
        formatted_news_list += f"{index}. 标题: {title}\n发布时间: {pub_date}\n\n"
    return formatted_news_list


# 转换时间格式
def convert_to_beijing_time(gmt_time_str):
    """
    将国际标准时间 (GMT/UTC) 的日期时间字符串转换为北京时间字符串
    :param gmt_time_str:  包含日期时间的国际标准时间 (GMT/UTC) 字符串
    :return: 包含日期时间的北京时间字符串,格式为"年-月-日 时:分"
    """
    # 创建一个时区对象,用于表示中国北京时区
    beijing_tz = pytz.timezone('Asia/Shanghai')

    # 将日期时间字符串解析为 datetime 对象(原始字符串的格式为 RFC1123)
    gmt_time = datetime.strptime(gmt_time_str, "%a, %d %b %Y %H:%M:%S GMT")

    # 使用 astimezone 将时区从 UTC 转换为北京时区
    beijing_datetime = gmt_time.replace(tzinfo=pytz.utc).astimezone(beijing_tz)

    # 使用 strftime 格式化日期时间对象为指定格式
    beijing_time_str = beijing_datetime.strftime('%Y年%m月%d日 %H:%M')

    return beijing_time_str


if __name__ == '__main__':
    MQTTPublish()

4.2.2 接收者

# mqtt 客户端

import paho.mqtt.client as mqtt


# 当客户端成功连接到 MQTT 服务器时调用的回调函数
def on_connect(client, userdata, flags, rc):
    print(f"成功连接到 MQTT 服务器,返回码:{rc}")
    client.subscribe("/rssnews")
    print("已订阅主题 /rssnews")


# 当客户端接收到从 MQTT 服务器发送的消息时调用的回调函数
def on_message(client, userdata, msg):
    print(f"从主题 {msg.topic} 收到消息:")
    print(msg.payload.decode())  # 解码并打印消息内容


# MQTT 服务器地址
broker_address = "localhost"

# 创建一个 MQTT 客户端实例,并设置回调函数
client = mqtt.Client("NewsSubscriber")
client.on_connect = on_connect
client.on_message = on_message

# 连接到 MQTT 服务器
client.connect(broker_address)
print("正在连接到 MQTT 服务器...")

# 开始循环监听消息
client.loop_forever()

4.3 运行效果

  • 发布者

  • 接收者