使用MASA Stack+.Net 从零开始搭建IoT平台 第八章 指令下发

发布时间 2023-10-18 17:24:16作者: sunday866


指令下发-RPC式调用

我们需要控制IoT设备,就需要通过MQTT向设备发送指令,这个功能我们可以通过如下方式实现:

通过向MQTT写入数据的方式实现向设备发送指令,设备回复的结果同样写入MQTT,我们可以通过绑定一个RequestID来获取下发对应的回复结果。但是这样做是异步的。

现在各大IoT平台向设备发送指令基本都是类似RPC式调用,向设备发送指令只需要定义好消息体和超时时间,然后就可以通过类似“同步”的方式获取设备返回的结果或者下发超时的结果。

完整流程如下:

  1. 业务系统调用IoT core Api 通过http下发控制指令。
  2. IoT core调用EMQX的发布接口向mqtt写入数据。
  3. 设备通过mqtt的特定Topic订阅到指令数据。
  4. 设备执行对应操作后,将结果通过向mqtt发布消息的方式写入EMQX。
  5. EMQX通过配置规则使用Hook的方式将结果回复给IoT core Api。
  6. 如果没有超时则IoT core Api将结果写入HTTP 的响应体内,完成HTTP的请求响应。

如果设备没有在规定超时时间内回复,IoT core Api会将超时错误写入HTTP响应体,返回给业务系统。
这样业务系统只需要一次HTTP请求就可以获取到设备的执行结果,我们可以使用这种方式来执行一些时效性比较高的操作。

主题规划

我们为RPC式调用指定一个发布主题
rpc/${productKey}/${deviceName}/${requestId}

再指定一个回复主题
rpc_resp/${productKey}/${deviceName}/${requestId}

等待设备回复

业务系统调用接口下发RPC式指令后,我们需要将下发的内容记录下来,然后等待设备回复后将回复内容写入HTTP的响应体内,完成这次HTTP调用。
我们依然采用influxDB来记录下发和上报的日志,方便日后查询。

流程如下:

  1. 调用接口下发指令后,我们将指令内容记录到influxDB。并通过循环查询RequestId的方式等待设备返回,获取到结果之后返回成功,并记录成功结果(篇幅问题本文没有实现记录HTTP调用结果功能)。
  2. 接收到设备对RPC式调用的回复后,将回复内容写入InfluxDB,并关联RequestId。
  3. 超时时间内没有获取到RequestId对应的回复信息,则返回错误,并记录错误结果。

这样我们就能准确记录每次下发的指令和设备回复的信息,以及每次http请求的最终结果。

服务端实现

一、发布指令到MQTT

1、定义请求体

public class RpcMessageRequest
{
    /// <summary>
    /// 设备名称
    /// </summary>
    public string? DeviceName { get; set; }

    /// <summary>
    /// 请求ID
    /// </summary>
    public Guid RequestId { get; set; } 
    /// <summary>
    /// 产品ID
    /// </summary>

    public Guid ProductId { get; set; } = Guid.Parse("c85ef7e5-2e43-4bd2-a939-07fe5ea3f459");

    /// <summary>
    /// 消息类型
    /// </summary>
    public MessageType MessageType { get; set; } = MessageType.Down;

    /// <summary>
    /// 消息ID 来自EMQX
    /// </summary>
    public string MessageId { get; set; }

    /// <summary>
    /// 消息体
    /// </summary>
    public string MessageData { get; set; }

    /// <summary>
    /// 超时时间默认5s
    /// </summary>
    public int Timeout { get; set; } = 5;
}

2、在 MqttHandler 添加 PublishToMqttAsync 方法用于向EMQX发布数据,
发布数据我们使用EMQX提供的publish Api,相关参数与响应内容,请参考官方文档

https://www.emqx.io/docs/zh/v5.0/admin/api-docs.html#tag/Publish

/// <summary>
/// 向EMQX发布消息
/// </summary>
/// <returns></returns>
public async Task<EmqxBaseResponse> PublishToMqttAsync(PublishMessageRequest request)
{
    var url = $"{_appSettings.MqttSetting.Url}/api/v5/publish";
    var response = await url.WithBasicAuth(_appSettings.MqttSetting.ApiKey, _appSettings.MqttSetting.SecretKey).AllowAnyHttpStatu().PostJsonAsync(request);
    if (response.StatusCode is (int)HttpStatusCode.OK //200
        or (int)HttpStatusCode.BadRequest //400
        or (int)HttpStatusCode.ServiceUnavailable //503
        or (int)HttpStatusCode.Accepted) //202
    {
        return await response.GetJsonAsync<EmqxBaseResponse>();
    }
    else
    {
        throw new UserFriendlyException(await response.GetStringAsync());
    }
}

二、将下发日志写入InfluxDB

1、定义InfluxDB 的 Measurement RPCMessage

 [InfluxDB.Client.Core.Measurement("RPCMessage")]
 public class RPCMessage
 {
     /// <summary>
     /// 设备名称
     /// </summary>
     [Column("DeviceName", IsTag = true)] public string? DeviceName { get; set; }

     /// <summary>
     /// 产品ID
     /// </summary>
     [Column("ProductId", IsTag = true)]
     public Guid ProductId { get; set; } = Guid.Parse("c85ef7e5-2e43-4bd2-a939-07fe5ea3f459");

     /// <summary>
     /// 消息类型
     /// </summary>
     [Column("MessageType", IsTag = true)] public MessageType MessageType { get; set; }

     /// <summary>
     /// 请求ID
     /// </summary>
     [Column("RequestId", IsTag = true)] public Guid RequestId { get; set; }

     /// <summary>
     /// 消息ID
     /// </summary>
     [Column("MessageId", IsTag = true)] public string MessageId { get; set; }

     /// <summary>
     /// 消息体
     /// </summary>
     [Column("MessageData")] public string? MessageData { get; set; }

     /// <summary>
     /// 时间戳
     /// </summary>
     [JsonProperty(propertyName: "Ts")]
     [Column(IsTimestamp = true)] public long Timestamp { get; set; }
 }

 public enum MessageType
 {
     Up, //回复
     Down, //下发
     Other //其他
 }

2、在DeviceHandler添加 WriteRpcMessageLog 方法用于向InfluxDb写入日志

        /// <summary>
        /// 写入RPC日志
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        private bool WriteRpcMessageLog(RpcMessageRequest request)
        {
            var message = new RPCMessage
            {
                DeviceName = request.DeviceName, 
                ProductId = request.ProductId, 
                MessageType = request.MessageType, 
                RequestId = request.RequestId,
                MessageId = request.MessageId,
                MessageData = request.MessageData,
                Timestamp = Convert.ToInt64((DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalMilliseconds)
            };

            //记录下发指令
            return _timeSeriesDbClient.WriteMeasurement(message);
        }

成功写入EMQX后会获取到消息的全局唯一id(MessageID)。

三、从InfluxDb获取设备响应消息

1、在TimeSeriesDbClient中添加 GetRpcMessageResultAsync 方法用于从InfluxDb获取设备回复的消息

        /// <summary>
        /// 从influxDb获取设备回复的消息
        /// </summary>
        /// <param name="option"></param>
        /// <returns></returns>
        public async Task<(string messageId, string deviceResonse)> GetRpcMessageResultAsync(GetRpcMessageOption option)
        {
            var query =
                $@"from(bucket: ""{_bucket}"")
                    |> range(start: {option.UTCStartDateTimeStr},stop:{option.UTCStopDateTimeStr})                                                           
                    |> filter(fn: (r) => r._measurement == ""RPCMessage"" 
                    and r.MessageType ==""Up""
                    and r.RequestId == ""{option.RequestId}"")
                    |>last()";

            var tables = await _client.GetQueryApi().QueryAsync(query, _org);

            var fluxRecords = tables.SelectMany(table => table.Records);

            if (fluxRecords.Any())
            {
                return new ValueTuple<string, string>(fluxRecords.First().GetValueByKey("MessageId").ToString(),
                          fluxRecords.First().GetValue().ToString());
            }

            return new ValueTuple<string, string>(string.Empty, string.Empty);
        }

MessageType = UP 代表我们获取设备上报的数据,RequestId为我们下发时的请求ID,我们使用RequestId保证一次完整下发与上报的对应关系。

2、根据RequestId 在Timeout超时时间内循环检查InfluxDB是否存在设备上报数据

       /// <summary>
       /// 获取设备回复
       /// </summary>
       /// <param name="option"></param>
       /// <returns></returns>
       private async Task<RpcMessageResponse> GetRpcMessageResponseAsync(GetRpcMessageOption option)
       {
           (string messageId, string deviceResonse) messageInfo = new();
           for (int i = 0; i < option.Timeout * 10; i++) //100ms查询一次
           {
               messageInfo = await _timeSeriesDbClient.GetRpcMessageResultAsync(option);
               if (!string.IsNullOrEmpty(messageInfo.deviceResonse))
               {
                   break; //查询到设备返回消息就停止
               }
               await Task.Delay(100);
           }

           var result = new RpcMessageResponse()
           {
               RequestId = option.RequestId,
               Success = false,
               ErrorMessage = "Cmd Timeout",
               DeviceResponse = string.Empty,
               MessageId = string.Empty,
           };

           if (!string.IsNullOrEmpty(messageInfo.deviceResonse))
           {
               var rpcMessageResponse = JsonConvert.DeserializeObject<RpcMessageResponse>(messageInfo.deviceResonse);
               result.Success = rpcMessageResponse.Success;
               result.ErrorMessage = rpcMessageResponse.ErrorMessage;
               result.DeviceResponse = messageInfo.deviceResonse;
               result.MessageId = messageInfo.messageId;
               result.RequestId = option.RequestId;
           }

           return result;
       }

每隔100ms检查一次,如果超时没有找到返回:Cmd Timeout

四、整合RPC下发业务

1、在DeviceHandler中添加 PublishAndGetResponseAsync 方法将上述流程整合

        /// <summary>
        /// 发布指令并等待设备返回
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        /// <exception cref="UserFriendlyException"></exception>
        public async Task<RpcMessageResponse> PublishAndGetResponseAsync(RpcMessageRequest request)
        {
            var emqxBaseResponse = await _mqttHandler.PublishToMqttAsync(new PublishMessageRequest
            {
                Topic = $"rpc/{request.ProductId}/{request.DeviceName}/{request.RequestId}",
                Payload = request.MessageData
            });
            if (string.IsNullOrEmpty(emqxBaseResponse.Id)) //发布失败
            {
                throw new UserFriendlyException($"reason_code:{emqxBaseResponse.reason_code},Code:{emqxBaseResponse.Code},Message:{emqxBaseResponse.Message}");
            }
            request.MessageId = emqxBaseResponse.Id;
            request.MessageType = MessageType.Down;
            //写入influxDb 日志
            var writeSucceeded = WriteRpcMessageLog(request);
            if (writeSucceeded)
            {
                //获取设备返回数据
                return await GetRpcMessageResponseAsync(new GetRpcMessageOption
                {
                    RequestId = request.RequestId,
                    StartDateTime = DateTime.Now.AddMinutes(-5),
                    Timeout = request.Timeout,
                    StopDateTime = DateTime.Now.AddMinutes(+5)
                });
            }

            throw new UserFriendlyException("Write inflxDB error!");
        }

2、添加Web Api用于业务调用
在DeviceMqttController中添加方法SendRpcMessageAsync 用于业务系统调用

        /// <summary>
        /// 发送RPC式调用,并同步等待设备返回
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost("SendRpcMessage")]
        public async Task<RpcMessageResponse> SendRpcMessageAsync(SendRpcMessageRequest request)
        {
            return await _deviceHandler.PublishAndGetResponseAsync(new RpcMessageRequest
            {
                DeviceName = request.DeviceName,
                RequestId = Guid.NewGuid(),
                ProductId = request.ProductId,
                MessageType = MessageType.Up,
                MessageData = request.MessageData,
                Timeout = request.Timeout
            });
        }

RequestId 是业务端请求时产生的,RequestId 会被带入rpc Topic中,设备需要回复以该RequestId 结尾的rpc_resp Topic完成一次完整的请求和回复过程。MessageID是来自于EMQX Publish接口发布成功后的接口返回。

五、接收设备回复消息

接收设备回复的消息是通过在EMQX配置规则,通过Hook的方式获取的,所以我们需要添加一个方法让EMQX 的Hook调用。

1、添加请求体

    public class RespondRpcMessageRequest
    {
        /// <summary>
        /// Topic
        /// </summary>
        public string Topic { get; set; }

        /// <summary>
        /// 消息体
        /// </summary>
        public string Payload { get; set; }

        /// <summary>
        /// 消息Id(来自EMQX)
        /// </summary>
        public string MessageId { get; set; }
    }

2、添加RespondToRpc接口用于Hook

        /// <summary>
        /// 设备响应RPC请求
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost("RespondToRpc")]
        public async Task<bool> RespondToRpcAsync(RespondRpcMessageRequest request)
        {
            var infoArr = request.Topic.Split("/");
            var result = _deviceHandler.RespondToRpc(new RpcMessageRequest
            {
                DeviceName = infoArr[2],
                RequestId = Guid.Parse(infoArr[3]),
                ProductId = Guid.Parse(infoArr[1]),
                MessageType = MessageType.Up,
                MessageId = request.MessageId,
                MessageData = request.Payload,
            });
            return await Task.FromResult(result);
        }

DeviceName 、RequestId 、ProductId 我们都从设备回复的Topic中获取。获取之后我们写入InfluxDB,使上面的GetRpcMessageResultAsync方法可以及时获取。

配置EMQX规则

通过配置EMQX规则,实现设备通过回复rpc_resp时,调用上面的RespondToRpcAsync接口
我们在EMQX管理界面->集成->规则中新建规则RespondToRpcMessage

SELECT
  *,json_encode(payload) as payloadStr
FROM
  "rpc_resp/#"

这里我们使用json_encode函数将payload转换成字符串payloadStr,因为设备上报的是Json,RespondToRpcAsync方法接收消息体的是string类型。

然后在右侧"添加动作",选择HTTP服务,动作选择"使用数据桥接转发",配置我们的接口和请求体

注意:这里${payloadStr}不需要加双引号。

测试

我们在Swagger中调用SendRpcMessage接口,为了方便演示,这里超时时间我设置为30s,方便在MQTTX模拟器中手动操作回复动作。

我们连接上EMQX后在订阅中添加对rpc Topic的订阅

Topic:"rpc/c85ef7e5-2e43-4bd2-a939-07fe5ea3f459/284202304230001/#" 其中c85ef7e5-2e43-4bd2-a939-07fe5ea3f459为产品id,284202304230001为设备名称,# 用于匹配RequestID

我们调用接口下发后可以看到MQTTX订阅到了RPC式调用的指令

我们快速修改MQTTX模拟设备上报的Topic为 rpc_resp/c85ef7e5-2e43-4bd2-a939-07fe5ea3f459/284202304230001/3c0131a8-6eff-48c4-b703-1f681b9ec7ff,其中末尾的3c0131a8-6eff-48c4-b703-1f681b9ec7ff为RequestID

我们回复JSON格式消息"{"Success": true,"DeviceResponse":"{"DevicePower":"ON"}"}",可以看到,接口"同步"获取到了设备的响应

总结

至此,我们完成了一次完整对设备的RPC式调用,可以通过该方式控制设备。
这里只实现了最基础的功能,生产环境中还需要做很多工作,
例如下发指令需要先鉴权,需要检查设备是否存在、提供设备不存在或者不在线的错误提示等,而且应该将每次调用 SendRpcMessageAsync 接口返回的内容也记录到InfluxDB中,通过RequestId,与这一次完整请求关联,方便业务查询。
介于篇幅问题,我也没有将功能整合到UI中,我会在之后的内容中将这部分内容整合,并作为单独的功能展示出来。

完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos