CH395+EMQX实现MQTT应用(Windows系统)

发布时间 2023-12-11 09:52:32作者: lqlq123
  • MQTT协议

1.MQTT简介

MQTT是一种基于 发布/订阅 模式的轻量级消息协议,工作在TCP/IP协议族上。其最大的优点是用极少量的代码和有限的宽带为设备间提供实时可靠的消息服务。在物联网(IOT)和机器与机器(M2M)等方面有较广泛的应用。

2.MQTT特性

2.1 发布/订阅模式,提供一对多的消息发布,方便消息在传感器间传递,解耦Client/Server的模式,不在需要预先知道对端的存在,不用同时运行

2.2 提供服务质量管理(QoS):

  ①Qos0(至多一次):发送方发送的消息,接收方最多能收到一次。这一级别会发生消息丢失或者重复,消息的发布依赖于底层TCP/IP网络。这一级别可用于如传感器数据上发,丢掉一次无所谓的情况,因为之后会二次发送。

  ②Qos1(至少一次):发送方发送的消息,接收方最少能收到一次,因此消息可能会发生重复。

  ③Qos2(只有一次):确保消息只到达一次。这一级别在网络中开销最高,适用于一些对数据严格的场景。

2.3 MQTT有1字节固定报头,2字节心跳报文,可以最小化传输开销,有效减少网络流量。因此非常时候应用在物联网,传感器数据采集和嵌入式设备信息收集等领域。

3.MQTT实现方式

  3.1 使用MQTT协议通讯时,有三种身份:发布者(publish)、代理(Broker/服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息的代理是服务器,消息的发布者同时可以是订阅者。

  3.2 MQTT传输的消息分为主题(Topic)和负载(Payload)两部分:

   主题:即消息的类型,订阅者订阅某主题后,就会收到该主题的消息内容

   负载:即消息的具体内容

   3.3 MQTT客户端:是指一个使用MQTT协议的应用程序或设备,客户端可以:发布其他客户端可能会订阅的信息;订阅其他客户端发布的信息

    MQTT服务器:位于消息发布者和订阅者之间,服务器可以:接收客户端的连接;接收客户端发布的信息;处理客户端的订阅和退订;向订阅者转发消息。

4.MQTT数据包结构

  4.1 MQTT协议中,一个MQTT数据包由固定头、可变头、消息体三部分组成。

固定头(Fixed header):存在所有MQTT数据包中,表示数据包类型。固定头是不虚的,所有MQTT数据包中必须包含固定头。

可变头(Variable header):部分MQTT数据包有,由数据包类型所决定。

消息体(Payload):存在于部分MQTT数据包中,表示客户端收到的具体内容,也是由数据包类型决定。

  4.2 固定头

固定头包含两部分(如下图),首字节为第一字节,剩余消息报文长度是指当前数据包中剩余内容长度的字节数。

 
 
bit
7 6 5 4 3 2 1 0
Byte1 MQTT数据包类型 数据包类型标识
Byte2 剩余长度
    其中,Byte1中的MQTT数据包类型和标识:
数据包类型(说明) 7-4bit值 数据方向 bit3 bit2 bit1 bit0
保留 0000          
CONNECT  (客户端连接服务器) 0001 Client--->Server 0 0 0 0
CONNACK  (连接确认) 0010 Server--->Client 0 0 0 0
PUBLISH    (发布消息) 0011 Client<-->Server DUP Qos Qos RETAIN
PUBACK     (发布确认Qos1) 0100 Client<-->Server 0 0 0 0
PUBREC     (消息已接收  Qos2一阶段) 0101 Client<-->Server 0 0 0 0
PUBREL     (消息释放  Qos2二阶段) 0110 Client<-->Server 0 0 1 0
PUBCOMP  (发布结束  Qos2三阶段) 0111 Client<-->Server 0 0 0 0
SUBSCRIBE(客户端请求订阅) 1000 Client--->Server 0 0 1 0
SUBACK      (服务端确认订阅) 1001 Server--->Client 0 0 0 0
UNSUBACRIBE(客户端取消订阅) 1010 Client--->Server 0 0 1 0

 

        • 如果bit3 DUP位为1,表示数据包是重复的消息;如果为0则表示该数据包是第一次发布
        • 如果 bit1 和 bit2都为0,则表示Qos0;如果 bit1 为1,则表示Qos1;如果 bit2 为1,则表示Qos2;如果 bit1和 bit2都置1,则表示非法消息,会关闭当前连接。

4.3  可变头和消息体的相关报文解析可以查看官方文档

MQTT官方文档:MQTT Version 3.1.1 (oasis-open.org)

  • EMQX具体操作步骤

  1.EMQX下载: https://packages.emqx.net/emqx-ce/v4.3.22/emqx-windows-4.3.22.zip

  2.下载完成后会生成一个emqx文件夹

3.右击开始图标,选择Windows PowerShell(管理员)

4.①使用cd命令进入eqmx文件夹下的bin;

   ②进入bin文件后使用  .\emqx install ,出现成功则表示安装完成。

   ③使用 .\emqx console,会出现一个弹窗,点击确认即可,之后会打初始化一系列使用到的端口,如果最终成功,会出现EMQX is running now!

     5.打开浏览器输入localhost:18083即可进入EMQX登录界面,初始账号为admin,密码为public。

  6.当我们用395成功连接上EMQX服务器后,在客户端列表中能看到我们设备的信息,点击Client ID下的 admin1,能详细的看到我们设备的ip、端口等信息。

   7.在监视器界面能看到具体数据量,在主题界面能看到设备所发布的主题。

  •  相关代码说明

1.MQTT驱动文件

2. CH395.c

 1 /* CH395相关定义 */
 2 const UINT8 CH395IPAddr[4] = {192,168,1,100};                         /* CH395IP地址 */
 3 const UINT8 CH395GWIPAddr[4] = {192,168,1,1};                        /* CH395网关 */
 4 const UINT8 CH395IPMask[4] = {255,255,255,0};                        /* CH395子网掩码 */
 5 
 6 /* socket 相关定义*/
 7 const UINT8  Socket0DesIP[4] = {192,168,1,24};                      /* Socket 0目的IP地址 */
 8 const UINT16 Socket0DesPort = 1883;                                  /* Socket 0目的端口 */
 9 const UINT16 Socket0SourPort = 4000;                                 /* Socket 0源端口 */
10 
11 u8 publishValid = 0;
12 u16 timeCnt = 0;
13 char *username  = "user2";                         //Device name, unique for each device, available "/" for classification
14 char *password  = "user2";                         //Server login password
15 char *sub_topic = "topic_wch";                       //subscribed session name
16 char *pub_topic = "topic_wch";                       //Published session name
17 int pub_qos = 0;                                   //Publish quality of service
18 int sub_qos = 0;                                   //Subscription quality of service
19 char *payload = "WCHNET MQTT";                     //Publish content
20 //有效负载
21 u8 con_flag  = 0;                                  //Connect MQTT server flag
22 u8 pub_flag  = 0;                                  //Publish session message flag/
23 u8 sub_flag  = 0;                                  //Subscription session flag
24 u8 tout_flag = 0;                                  //time-out flag
25 u16 packetid = 5;                                  //package id

程序使用时,需要

①将Socket0DesIP改为相应电脑的本机IP

②username为用户名称,sub_topic为订阅的主题名,pub_topic为发布消息的主题名,pub_qos为发布消息的服务质量,sub_qos为订阅消息的服务质量,payload为负载。

 1 /*********************************************************************
 2  * @fn      TIM2_Init
 3  *
 4  * @brief   Initializes TIM2.
 5  *
 6  * @return  none
 7  */
 8 void TIM2_Init( void )
 9 {
10     TIM_TimeBaseInitTypeDef  TIM_TimeBaseStructure={0};
11 
12     RCC_APB1PeriphClockCmd(RCC_APB1Periph_TIM2, ENABLE);
13 
14     TIM_TimeBaseStructure.TIM_Period = SystemCoreClock / 1000000 - 1;
15     TIM_TimeBaseStructure.TIM_Prescaler = WCHNETTIMERPERIOD * 1000 - 1;
16     TIM_TimeBaseStructure.TIM_ClockDivision = 0;
17     TIM_TimeBaseStructure.TIM_CounterMode = TIM_CounterMode_Up;
18     TIM_TimeBaseInit(TIM2, &TIM_TimeBaseStructure);
19     TIM_ITConfig(TIM2, TIM_IT_Update ,ENABLE);
20 
21     TIM_Cmd(TIM2, ENABLE);
22     TIM_ClearITPendingBit(TIM2, TIM_IT_Update );
23     NVIC_EnableIRQ(TIM2_IRQn);
24 }
25 
26 void TIM2_IRQHandler(void) __attribute__((interrupt("WCH-Interrupt-fast")));
27 void TIM2_IRQHandler(void)
28 {
29     if(!publishValid){            //Set the publishValid flag every 20s
30         timeCnt += 10;
31         if(timeCnt > 20000){
32             publishValid = 1;
33             timeCnt = 0;
34         }
35     }
36    // WCHNET_TimeIsr(WCHNETTIMERPERIOD);
37     TIM_ClearITPendingBit(TIM2, TIM_IT_Update);
38 }

定时器设置,20秒给MQTT服务器上传一包数据,数据内容即为  WCHNET MQTT。

 1 /**********************************************************************************
 2 * Function Name  : InitCH395InfParam
 3 * Description    : 初始化CH395Inf参数
 4 * Input          : None
 5 * Output         : None
 6 * Return         : None
 7 **********************************************************************************/
 8 void InitCH395InfParam(void)
 9 {
10     memset(&CH395Inf,0,sizeof(CH395Inf));                            /* 将CH395Inf全部清零*/
11     memcpy(CH395Inf.IPAddr,CH395IPAddr,sizeof(CH395IPAddr));         /* 将IP地址写入CH395Inf中 */
12     memcpy(CH395Inf.GWIPAddr,CH395GWIPAddr,sizeof(CH395GWIPAddr));   /* 将网关IP地址写入CH395Inf中 */
13     memcpy(CH395Inf.MASKAddr,CH395IPMask,sizeof(CH395IPMask));       /* 将子网掩码写入CH395Inf中 */
14 }
15 
16 
17 /**********************************************************************************
18 * Function Name  : InitSocketParam
19 * Description    : 初始化socket
20 * Input          : None
21 * Output         : None
22 * Return         : None
23 **********************************************************************************/
24 void InitSocketParam(void)
25 {
26     memset(&SockInf,0,sizeof(SockInf));                              /* 将SockInf[0]全部清零*/
27     memcpy(SockInf.IPAddr,Socket0DesIP,sizeof(Socket0DesIP));        /* 将目的IP地址写入 */
28     SockInf.DesPort = Socket0DesPort;                                /* 目的端口 */
29     SockInf.SourPort = Socket0SourPort;                              /* 源端口 */
30     SockInf.ProtoType = PROTO_TYPE_TCP;                              /* TCP模式 */
31     SockInf.TcpMode = TCP_CLIENT_MODE;
32 }
33 /**********************************************************************************
34 * Function Name  : CH395SocketInitOpen
35 * Description    : 配置CH395 socket 参数,初始化并打开socket
36 * Input          : None
37 * Output         : None
38 * Return         : None
39 **********************************************************************************/
40 void CH395SocketInitOpen(void)
41 {
42     UINT8 i;
43 
44     /* socket 0为TCP 客户端模式 */
45     CH395SetSocketDesIP(0,SockInf.IPAddr);                           /* 设置socket 0目标IP地址 */
46     CH395SetSocketProtType(0,SockInf.ProtoType);                     /* 设置socket 0协议类型 */
47     CH395SetSocketDesPort(0,SockInf.DesPort);                        /* 设置socket 0目的端口 */
48     CH395SetSocketSourPort(0,SockInf.SourPort);                      /* 设置socket 0源端口 */
49 
50     i = CH395OpenSocket(0);                                          /* 打开socket 0 */
51     mStopIfError(i);                                                 /* 检查是否成功 */
52 
53     i = CH395TCPConnect(0);                                          /* TCP连接 */
54     mStopIfError(i);                                                 /* 检查是否成功 */                                              /* 检查是否成功 */
55 
56 }

因为MQTT协议是基于TCP/IP协议,所以初始化395后,需要设置一个socket为tcp模式。

 1 /*********************************************************************
 2  * @fn      MQTT_Connect
 3  *
 4  * @brief   Establish MQTT connection.
 5  *
 6  * @param   username - user name.
 7  *          password - password
 8  *
 9  * @return  none
10  */
11 void MQTT_Connect(char *username, char *password)
12 {
13     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
14     u32 len;
15     u8 buf[200];
16 
17     data.clientID.cstring = "admin1";
18     data.keepAliveInterval = 2000;
19     data.cleansession = 1;
20     data.username.cstring = username;
21     data.password.cstring = password;
22 
23     len = MQTTSerialize_connect(buf,sizeof(buf),&data);//
24     Transport_SendPacket(buf,len);
25 }

MQTT连接函数

17行 设置设备id,

18行 为心跳间隔,

19行 设置Clean Session标志,当设置为0时,表示创建一个持久对话,在客户端断开连接时,会话仍会保持并保存离线消息,直到会话超时注销。

    当设置成1时,表示创建一个新的临时对话,当客户端断开时,会话自动注销。

(此部分为报文类型CONNECT时,可变头中的相关内容,详细报文内容可以查阅官方手册)

 1 /*********************************************************************
 2  * @fn      MQTT_Subscribe
 3  *
 4  * @brief   MQTT subscribes to a topic.
 5  *
 6  * @param   topic - Topic name to subscribe to.
 7  *          req_qos - quality of service
 8  *
 9  * @return  none
10  */
11 void MQTT_Subscribe( char *topic,int req_qos)
12 {
13     MQTTString topicString = MQTTString_initializer;
14     u32 len;
15     u32 msgid = 1;//
16     u8 buf[200];
17 
18     topicString.cstring = topic;
19     len = MQTTSerialize_subscribe(buf,sizeof(buf),0,msgid,1,&topicString,&req_qos);//
20     Transport_SendPacket(buf,len);//
21 }
22 /*********************************************************************
23  * @fn      MQTT_Unsubscribe
24  *
25  * @brief   MQTT unsubscribe from a topic.
26  *
27  * @param   topic - Topic name to unsubscribe to.
28  *
29  * @return  none
30  */
31 void MQTT_Unsubscribe(char *topic)
32 {
33     MQTTString topicString = MQTTString_initializer;
34     u32 len;
35     u32 msgid = 1;
36     u8 buf[200];
37 
38     topicString.cstring = topic;
39     len = MQTTSerialize_unsubscribe(buf,sizeof(buf),0,msgid,1,&topicString);
40     Transport_SendPacket(buf,len);
41 }
42 /*********************************************************************
43  * @fn      MQTT_Publish
44  *
45  * @brief   MQTT publishes a topic.
46  *
47  * @param   topic - Published topic name.
48  *          qos - quality of service
49  *          payload - data buff
50  *
51  * @return  none
52  */
53 void MQTT_Publish(char *topic, int qos, char *payload)
54 {
55     MQTTString topicString = MQTTString_initializer;
56     u32 payloadlen;
57     u32 len;
58     u8 buf[1024];
59 
60     topicString.cstring = topic;
61     payloadlen = strlen(payload);
62     len = MQTTSerialize_publish(buf,sizeof(buf),0,qos,0,packetid++,topicString,payload,payloadlen);
63     Transport_SendPacket(buf,len);
64 }

上面三个函数分别为订阅主题,取订主题,发布消息的功能函数,直接调用即可。

 1 /**********************************************************************************
 2 * Function Name  : CH395SocketInterrupt
 3 * Description    : CH395 socket 中断,在全局中断中被调用
 4 * Input          : sockindex
 5 * Output         : None
 6 * Return         : None
 7 **********************************************************************************/
 8 void CH395SocketInterrupt(UINT8 sockindex)
 9 {
10     int qos, payloadlen;
11    MQTTString topicName;
12    UINT8  sock_int_socket;
13    UINT8 i;
14    UINT32 len;
15 //   UINT16 tmp;
16    unsigned short packetid;
17    unsigned char retained, dup;
18    unsigned char *payload;
19 
20    sock_int_socket = CH395GetSocketInt(sockindex);                   /* 获取socket 的中断状态 */
21    if(sock_int_socket & SINT_STAT_SENBUF_FREE)                       /* 发送缓冲区空闲,可以继续写入要发送的数据 */
22    {
23    }
24    if(sock_int_socket & SINT_STAT_SEND_OK)                           /* 发送完成中断 */
25    {
26    }
27    if(sock_int_socket & SINT_STAT_RECV)                              /* 接收中断 */
28    {
29        len = CH395GetRecvLength(sockindex);                          /* 获取当前缓冲区内数据长度 */
30      #if CH395_DEBUG
31             printf("receive len = %d\n",len);
32      #endif
33             if(len == 0)return;
34             if(len > 512)len = 512;                                       /* MyBuffer缓冲区长度为512 */
35             CH395GetRecvData(sockindex,len,MyBuffer);                     /* 读取数据 */
36 //            for(tmp =0; tmp < len; tmp++)                                 /* 将所有数据按位取反 */
37 //            {
38 //               MyBuffer[tmp] = ~MyBuffer[tmp];
39 //            }
40             switch(MyBuffer[0] >>4)//判断数据包类型
41             {
42             case CONNACK:
43                 printf("CONNACK\r\n");
44                  con_flag = 1;
45                  MQTT_Subscribe(sub_topic, sub_qos);
46                  break;
47             case PUBLISH:
48                 MQTTDeserialize_publish(&dup,&qos,&retained,&packetid,&topicName,
49                                                     &payload,&payloadlen,MyBuffer,len);
50                 msgDeal(payload, payloadlen);
51                 break;
52 
53             case SUBACK:
54                 sub_flag = 1;
55                 printf("SUBACK\r\n");
56                 break;
57 
58             default:
59 
60                 break;
61             }
62             memset(MyBuffer, 0 ,sizeof(MyBuffer));
63          //   CH395SendData(sockindex,MyBuffer,len);
64 
65    }
66 
67    /*
68    **产生断开连接中断和超时中断时,CH395默认配置是内部主动关闭,用户不需要自己关闭该Socket,如果想配置成不主动关闭Socket需要配置
69    **SOCK_CTRL_FLAG_SOCKET_CLOSE标志位(默认为0),如果该标志为1,CH395内部不对Socket进行关闭处理,用户在连接中断和超时中断时调用
70    **CH395CloseSocket函数对Socket进行关闭,如果不关闭则该Socket一直为连接的状态(事实上已经断开),就不能再去连接了。
71    */
72    if(sock_int_socket & SINT_STAT_CONNECT)                          /* 连接中断,仅在TCP模式下有效*/
73   {
74 //   CH395SetKeepLive(sockindex,1);                               /*打开KEEPALIVE保活定时器*/
75 //   CH395SetTTLNum(sockindex,40);                                /*设置TTL*/
76        MQTT_Connect(username, password);
77        printf("TCP Connect Success\r\n");
78   }
79 
80     if(sock_int_socket & SINT_STAT_DISCONNECT)                        /* 断开中断,仅在TCP模式下有效 */
81     {
82            con_flag = 0;
83             printf("TCP Disconnect\r\n");
84     }
85    if(sock_int_socket & SINT_STAT_TIM_OUT)                           /* 超时中断 */
86    {
87        printf("time out \n");
88        con_flag = 0;
89 //      i = CH395CloseSocket(sockindex);
90 //     mStopIfError(i);
91    }
92 }

在socket中断的接收中断中,判断服务器发给395的数据包的类型,具体协议包类型可以看MQTT协议中的类型表格,在连接中断中发送连接函数,在断开中断和超时中断中将连接标志位置0。

 1     while(1)
 2     {
 3      //   WCHNET_MainTask();
 4         if(CH395_INT_WIRE==RESET)                                    /* 当中断引脚电平变化,进入全局中断 */
 5         {
 6             CH395GlobalInterrupt();
 7         }
 8         if (publishValid == 1) {
 9                     publishValid = 0;
10 
11                     if(con_flag)
12                         {
13 
14                         MQTT_Publish(pub_topic, pub_qos, payload);
15                         }
16         //            if(con_flag) MQTT_Pingreq();                                   //heartbeat packet
17                 }
18     }

在main函数的while循环中,当 publishValid标志为1,则表示20s间隔时间已到,在此条件下如果连接标志 con_flag为1,则上传消息。


 

完整工程链接:

 https://files.cnblogs.com/files/blogs/808422/EXAM_mqtt.zip?t=1702258990&download=true