MQTT:掉线重连 (Reconnect)

发布时间 2023-10-25 15:20:40作者: eiSouthBoy

cleansession对重连的影响

在使用MQTT同步:#include "MQTTClient.h" 时,若client 与 broker 断开了连接,重连逻辑如何实现呢?其中 cleansession 对重连逻辑实现有什么影响呢?

分别对cleansession的两种情况进行测试和验证。

? 注:MQTT Version = MQTT3.1.1

cleansession=1

当cleansession=1时,client断开连接后,broker中会清除会话的信息。当该client重连后,broker中重新创建新的会话ID,client需要重新订阅主题,从主题消息的最新的位置开始消费。

MQTTClient_subscribe_cleansession1.c

#include "windows.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include "MQTTClient.h"

#define ADDRESS     "tcp://10.8.198.47:1883"
#define CLIENTID    "ClientSub_Host62"
#define TOPIC       "datetime1"
#define QOS         1
#define TIMEOUT     10000L

static int run = 1;
volatile MQTTClient_deliveryToken deliveredtoken;

static void sighandler(int signum)
{
	if (signum == SIGINT)
	{
		printf("catch signal:SIGINT\n");
		run = 0;
	}
}

void delivered(void *context, MQTTClient_deliveryToken dt)
{
	printf("Message with token value %d delivery confirmed\n", dt);
	deliveredtoken = dt;
}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
	printf("Message arrived\n");
	printf("     topic: %s\n", topicName);
	printf("   message: %.*s\n", message->payloadlen, (char*)message->payload);
	MQTTClient_freeMessage(&message);
	MQTTClient_free(topicName);
	return 1;
}

void connlost(void *context, char *cause)
{
	printf("\nConnection lost\n");
	printf("     cause: %s\n", cause);
	
	MQTTClient client = (MQTTClient )context;

	MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
	conn_opts.keepAliveInterval = 10;
	conn_opts.cleansession = 1;

	while (1)
	{
		if (MQTTClient_connect(client, &conn_opts) == MQTTCLIENT_SUCCESS)
		{
			printf("Reconnection Successful\n");
			break;
		}
		else
		{
			printf("Reconnection Failure\n");
		}
		Sleep(1000);
	}

	while (1)
	{
		if (MQTTClient_subscribe(client, TOPIC, QOS) == MQTTCLIENT_SUCCESS)
		{
			printf("Resubscribe Successful\n");
			break;
		}
		else
		{
			printf("Resubscribe Failure\n");
		}
		Sleep(1000);
	}
}

int main(int argc, char* argv[])
{
	MQTTClient client;
	MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
	int rc;

	if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
		MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
	{
		printf("Failed to create client, return code %d\n", rc);
		rc = EXIT_FAILURE;
		goto exit;
	}

	if ((rc = MQTTClient_setCallbacks(client, client, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS)
	{
		printf("Failed to set callbacks, return code %d\n", rc);
		rc = EXIT_FAILURE;
		goto destroy_exit;
	}

	conn_opts.keepAliveInterval = 10;
	conn_opts.cleansession = 1;

	if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
	{
		printf("Failed to connect, return code %d\n", rc);
		rc = EXIT_FAILURE;
		goto destroy_exit;
	}

	printf("Subscribing to topic %s for client %s using QoS%d\n", TOPIC, CLIENTID, QOS);
	if ((rc = MQTTClient_subscribe(client, TOPIC, QOS)) != MQTTCLIENT_SUCCESS)
	{
		printf("Failed to subscribe, return code %d\n", rc);
		rc = EXIT_FAILURE;
		goto destroy_exit;
	}

	signal(SIGINT, sighandler);
	while (run)
	{
		Sleep(100);
	}

	if ((rc = MQTTClient_disconnect(client, TIMEOUT)) != MQTTCLIENT_SUCCESS)
	{
		printf("Failed to disconnect, return code %d\n", rc);
		rc = EXIT_FAILURE;
	}
destroy_exit:
	MQTTClient_destroy(&client);
exit:
	getchar();
	return rc;
}

cleansession=0

当cleansession=0时,client断开连接后,broker中会记录刚掉线会话的信息,例如:订阅的主题、未消费的主题消息。当该会话重连时,不需要对掉线前的主题再次订阅。重连成功后,接着消费掉线前的位置消费(即历史消息),历史消息的最大数量取决于broker中队列的大小。EMQX中的队列最多可以保留1000条消息

MQTTClient_subscribe_cleansession0.c

#include "windows.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include "MQTTClient.h"

#define ADDRESS     "tcp://10.8.198.47:1883"
#define CLIENTID    "ClientSub_Host62"
#define TOPIC       "datetime1"
#define QOS         1
#define TIMEOUT     10000L

static int run = 1;
volatile MQTTClient_deliveryToken deliveredtoken;

static void sighandler(int signum)
{
	if (signum == SIGINT)
	{
		printf("catch signal:SIGINT\n");
		run = 0;
	}
}

void delivered(void *context, MQTTClient_deliveryToken dt)
{
	printf("Message with token value %d delivery confirmed\n", dt);
	deliveredtoken = dt;
}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
	printf("Message arrived\n");
	printf("     topic: %s\n", topicName);
	printf("   message: %.*s\n", message->payloadlen, (char*)message->payload);
	MQTTClient_freeMessage(&message);
	MQTTClient_free(topicName);
	return 1;
}

void connlost(void *context, char *cause)
{
    MQTTClient pclient = (MQTTClient )context;
	MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
	printf("\nConnection lost\n");
	printf("     cause: %s\n", cause);
	
	conn_opts.keepAliveInterval = 10;
	conn_opts.cleansession = 0;

	while (1)
	{
		if (MQTTClient_connect(pclient, &conn_opts) == MQTTCLIENT_SUCCESS)
		{
			printf("Reconnection Successful\n");
			break;
		}
		else
		{
			printf("Reconnection Failure\n");
		}
		Sleep(1000);
	}
}

int main(int argc, char* argv[])
{
	MQTTClient client;
	MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
	int rc;

	if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
		MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
	{
		printf("Failed to create client, return code %d\n", rc);
		rc = EXIT_FAILURE;
		goto exit;
	}

	if ((rc = MQTTClient_setCallbacks(client, client, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS)
	{
		printf("Failed to set callbacks, return code %d\n", rc);
		rc = EXIT_FAILURE;
		goto destroy_exit;
	}

	conn_opts.keepAliveInterval = 10;
	conn_opts.cleansession = 0;

	if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
	{
		printf("Failed to connect, return code %d\n", rc);
		rc = EXIT_FAILURE;
		goto destroy_exit;
	}

	printf("Subscribing to topic %s for client %s using QoS%d\n", TOPIC, CLIENTID, QOS);
	if ((rc = MQTTClient_subscribe(client, TOPIC, QOS)) != MQTTCLIENT_SUCCESS)
	{
		printf("Failed to subscribe, return code %d\n", rc);
		rc = EXIT_FAILURE;
		goto destroy_exit;
	}

	signal(SIGINT, sighandler);
	while (run)
	{
		Sleep(100);
	}

	if ((rc = MQTTClient_disconnect(client, TIMEOUT)) != MQTTCLIENT_SUCCESS)
	{
		printf("Failed to disconnect, return code %d\n", rc);
		rc = EXIT_FAILURE;
	}
destroy_exit:
	MQTTClient_destroy(&client);
exit:
	getchar();
	return rc;
}

反思总结

疑点1

这里看以下重连回调函数原型:

/**
 * This is a callback function. The client application
 * must provide an implementation of this function to enable asynchronous
 * notification of the loss of connection to the server. The function is
 * registered with the client library by passing it as an argument to
 * MQTTClient_setCallbacks(). It is called by the client library if the client
 * loses its connection to the server. The client application must take
 * appropriate action, such as trying to reconnect or reporting the problem.
 * This function is executed on a separate thread to the one on which the
 * client application is running.
 * @param context A pointer to the <i>context</i> value originally passed to
 * MQTTClient_setCallbacks(), which contains any application-specific context.
 * @param cause The reason for the disconnection.
 * Currently, <i>cause</i> is always set to NULL.
 */
typedef void MQTTClient_connectionLost(void* context, char* cause);

注释中说明了参数: void* context,这个指针指向上下文,通过函数 MQTTClient_setCallbacks() 的第二参数传递进去的。

函数 MQTTClient_setCallbacks() 中传入的所有的回调函数原型的第一个参数都是void* context,那么这里所说的指向上下文是什么意思呢?

其实就是 MQTTClient client mqtt客户端句柄就是联系上下文的,包括关联到:会话丢失回调函数、消息接收回调函数、消息发送成功回调函数

疑点2

在 会话丢失回调函数中,为什么要把连接操作放在一个while(1)语句中?

client连接到broker后,会话丢失回调函数被注册,等待会话断开才被激活。若重连没有成功就离开了会话丢失回调函数,那么再也不会进来了,所以一定要保证重连成功才能离开。

在会话丢失后,通过 会话丢失回调函数 重连成功,一段时间后,若会话又一次丢失,会话丢失回调函数还有效吗?

是的