MQTT 生产者(异步)代码解读

发布时间 2023-06-20 09:36:11作者: eiSouthBoy

一、问题引入

MQTT使用也有一段时间了,包括同步和异步的使用。

这里根据官方案例和本人的理解,记录以下学习过程。

二、解决过程

简要介绍编写 MQTT Producer的消息发布(异步)过程:

  • 第1步:创建客户端
LIBMQTT_API int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
		int persistence_type, void* persistence_context);
  • 第2步:设置异步回调函数
LIBMQTT_API int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
									 MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
  • 第3步:设置客户端与服务器的连接选项属性
// (异步)连接属性初始化宏定义
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}

MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  • 第4步:连接服务器
LIBMQTT_API int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options);
  • 第5步:发布响应选项设置
#define MQTTAsync_responseOptions_initializer { {'M', 'Q', 'T', 'R'}, 1, NULL, NULL, 0, 0, NULL, NULL, \
MQTTProperties_initializer, MQTTSubscribe_options_initializer, 0, NULL}

MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;

// 发布选项设置
pub_opts.onSuccess = onSend;
pub_opts.onFailure = onSendFailure;
pub_opts.context = client;
  • 第6步:设置消息发布选项属性
#define MQTTAsync_message_initializer { {'M', 'Q', 'T', 'M'}, 1, 0, NULL, 0, 0, 0, 0, MQTTProperties_initializer }

MQTTAsync_message pubmsg = MQTTAsync_message_initializer;

pubmsg.qos = 1;
pubmsg.payload = payload;
pubmsg.payloadlen = len;
pubmsg.retained = 1;
// pubmsg其他设置
  • 第7步:发布消息
LIBMQTT_API int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, 
        const MQTTAsync_message* msg, MQTTAsync_responseOptions* response);
  • 第8步:释放内存
LIBMQTT_API void MQTTAsync_destroy(MQTTAsync* handle);

2-1 数据类型的声明

MQTT 生产者的同步和异步的函数API不同,结构体声明也有所不同。

异步模式下,所有的结构体和函数声明都加上前缀:MQTTAsync

异步连接选项MQTTAsync_connectOptions结构体成员(注意:和同步中的重复部分不再例举):

成员函数 范围 功能
MQTTAsync_onSuccess* onSuccess 如果连接成功完成,这个指向回调函数的指针会被调用
MQTTAsync_onFailure* onFailure 如果连接失败,这个指向回调函数的指针会被调用
int automaticReconnect 在连接丢失的情况下自动重连
int minRetryInterval 最小重试间隔时长
int maxRetryInterval 最大重试间隔时长
MQTTProperties *connectProperties MQTT v5.0的连接属性
MQTTProperties *willProperties MQTT v5.0的遗嘱属性
MQTTAsync_onSuccess5* onSuccess5 如果连接完成,这个指向回调函数的指针会被调用
MQTTAsync_onFailure5* onFailure5 如果连接失败,这个指向回调函数的指针会被调用

异步消息结构体MQTTAsync_message和同步消息结构体MQTTClient_message完全相同
MQTTAsync_message结构体的成员:

成员名称 取值范围 功能
char struct_id[4] 结构体识别序号
int struct_version 0~1 结构体号码
int payloadlen 消息长度
void* payload 消息
int qos 0~2 消息质量
int retained True or False 消息保留标志位
int dup True or False 消息副本标志,仅在接收QOS=1的消息时有效
int msgid 消息标识符
MQTTProperties properties MQTT v5.0关联的消息属性

异步消息发布响应选项MQTTAsync_responseOptions结构体成员:

成员函数 范围 功能
char struct_id[4] 结构体识别序号
int struct_version 0~8 结构体号码
MQTTAsync_onSuccess* onSuccess 如果连接成功完成,这个指向回调函数的指针会被调用
MQTTAsync_onFailure* onFailure 如果连接失败,这个指向回调函数的指针会被调用
void* context
MQTTAsync_token token 从回调函数返回的令牌,可被用于跟踪请求的状态
MQTTAsync_onSuccess5* onSuccess5 如果连接完成,这个指向回调函数的指针会被调用
MQTTAsync_onFailure5* onFailure5 如果连接失败,这个指向回调函数的指针会被调用
MQTTProperties properties MQTT v5.0的输入属性
MQTTSubscribe_options subscribeOptions MQTT v5.0的订阅选项,仅在订阅时才能使用
int subscribeOptionsCount MQTT v5.0的订阅选项计数,仅在订阅多个才能使用
MQTTSubscribe_options* subscribeOptionsList MQTT v5.0的订阅选项数组,仅在订阅多个才能使用

2-2 回调函数

设置回调函数的原型:

LIBMQTT_API int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
  								 MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);

其中包含3个回调函数:

  • MQTTAsync_connectionLost* cl :用于处理是否断开连接(如果该值被设置位NULL,那么客户端将无法处理异常断开的情况)
    其原型:typedef void MQTTAsync_connectionLost(void* context, char* cause);

实例:

void connlost(void *context, char *cause)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;

	printf("\nConnection lost\n");
	printf("     cause: %s\n", cause);

	printf("Reconnecting\n");
	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start connect, return code %d\n", rc);
		finished = 1;
	}
}
  • MQTTAsync_messageArrived* ma :用于处理订阅主题的消息是否到达,这里为生产者无需额外处理。(该值不能设置为NULL,若这时为NULL会直接报错)
    其原型:typedef int MQTTAsync_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message);

  • MQTTAsync_deliveryComplete* dc :用于处理消息是否分发完成(若不想检查是否分发成功,可以设置为NULL)
    其原型:typedef void MQTTAsync_deliveryComplete(void* context, MQTTAsync_token token);

三、反思总结

使用异步发布消息,可以提高发布效率,经过测试:100 条/秒是可以达到的。

四、参考引用