硬件Hi3861移植MQTT

发布时间 2023-04-15 17:28:56作者: choumei

摘要:本文简单介绍如何移植MQTT

参考https://www.easemob.com/news/7286

https://ost.51cto.com/posts/10201

http://www.taodudu.cc/news/show-4296838.html

适合群体:适用于润和Hi3861开发板文中所有代码仓库:https://gitee.com/qidiyun/hihope-3861-smart-home-kit

MQTT工作模式接入华为云示例教程

https://www.cnblogs.com/hpiot/p/16971305.html

11.1 MQTT介绍

MQTT 是当前最主流的物联网通信协议,需要物联网云平台,例如华为云、阿里云、移动OneNET都支持mqtt。而Hi3861则是一款专为IoT应用场景打造的芯片。本节主要讲如何在鸿蒙系统中通过移植第3方软件包 paho mqtt去实现MQTT协议功能,最后会给出测试验证。为后续的物联网项目打好基础。

友情预告,本节内容较多,源码也贴出来了,大家最好先看一遍,然后再操作一次。

已经移植好的MQTT源码:https://gitee.com/qidiyun/harmony_mqtt

11.2 MQTT移植

如果不想要自己移植的,可以跳过本节MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于发布/订阅范式的二进制“轻量级”消息协议,由IB公司发布。针对于网络受限和嵌入式设备而设计的一种数据传输协议。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。MQTT模型如图所示。

更多MQTT协议的介绍见这篇文章: MQTT 协议开发入门image.png

1. 下载paho mqtt软件包,添加到鸿蒙代码中

paho mqtt-c 是基于C语言实现的MQTT客户端,非常适合用在嵌入式设备上。首先下载源码:

https://github.com/eclipse/paho.mqtt.embedded-c

下载之后解压,会得到这么一个文件夹:

image.png如何在鸿蒙系统中移植 Paho-MQTT 实现MQTT协议-鸿蒙HarmonyOS技术社区

我们在鸿蒙系统源码的 third_party 文件夹下创建一个 pahomqtt 文件夹,然后把解压后的所有文件都拷贝到 pahomqtt 文件夹下

下一步,我们在pahomqtt 文件夹下面新建BUILD.gn文件,用来构建编译。其内容如下:

import("//build/lite/config/component/lite_component.gni")
import("//build/lite/ndk/ndk.gni")

config("pahomqtt_config") {
  include_dirs = [
      "MQTTPacket/src",
      "MQTTClient-C/src",
      "MQTTClient-C/src/liteOS",
      "//third_party/iot_link/network/mqtt/mqtt_al",
      "//third_party/iot_link/inc",
      "//vendor/hisi/hi3861/hi3861/third_party/lwip_sack/include",
      "//kernel/liteos_m/components/cmsis/2.0",
  ]
}
  cflags = [ "-Wno-unused-variable" ]
  cflags += [ "-Wno-unused-but-set-variable" ]
  cflags += [ "-Wno-unused-parameter" ]
  cflags += [ "-Wno-sign-compare" ]
  cflags += [ "-Wno-unused-function" ]
  cflags += [ "-Wno-return-type" ]
pahomqtt_sources = [
"MQTTClient-C/src/liteOS/MQTTLiteOS.c",
"MQTTClient-C/src/MQTTClient.c",

"MQTTPacket/src/MQTTConnectClient.c",
"MQTTPacket/src/MQTTConnectServer.c",
"MQTTPacket/src/MQTTDeserializePublish.c",
"MQTTPacket/src/MQTTFormat.c",
"MQTTPacket/src/MQTTPacket.c",
"MQTTPacket/src/MQTTSerializePublish.c",
"MQTTPacket/src/MQTTSubscribeClient.c",
"MQTTPacket/src/MQTTSubscribeServer.c",
"MQTTPacket/src/MQTTUnsubscribeClient.c",
"MQTTPacket/src/MQTTUnsubscribeServer.c",
"MQTTPacket/samples/transport.c",
]

lite_library("pahomqtt_static") {
  target_type = "static_library"
  sources = pahomqtt_sources
  public_configs = [ ":pahomqtt_config" ]
}

lite_library("pahomqtt_shared") {
  target_type = "shared_library"
  sources = pahomqtt_sources
  public_configs = [ ":pahomqtt_config" ]
}

ndk_lib("pahomqtt_ndk") {
  if (board_name != "hi3861v100") {
      lib_extension = ".so"
      deps = [
          ":pahomqtt_shared"
      ]
  } else {
      deps = [
          ":pahomqtt_static"
      ]
  }
  head_files = [
      "//third_party/pahomqtt"
  ]
}

 

2. 移植

我们使用到的是MQTTClient-C的代码,该代码支持多线程。

(1)创建LiteOS文件夹MQTT已经提供了Linux和freertos的移植,这里我们参考,新建文件夹:third_party\pahomqtt\MQTTClient-C\src\liteOS里面存放两个文件:MQTTLiteOS.c 和 MQTTLiteOS.h

内容如下:MQTTLiteOS.c

/*******************************************************************************
* Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
*   http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
*   http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
*   Allan Stockdill-Mander - initial API and implementation and/or initial documentation
*   Ian Craggs - convert to FreeRTOS
*******************************************************************************/

#include "MQTTLiteOS.h"


int ThreadStart(Thread* thread, void (*fn)(void*), void* arg)
{
int rc = 0;
thread = thread;

osThreadAttr_t attr;

  attr.name = "Hi3861";
  attr.attr_bits = 0U;
  attr.cb_mem = NULL;
  attr.cb_size = 0U;
  attr.stack_mem = NULL;
  attr.stack_size = 2048;
  attr.priority = osThreadGetPriority(osThreadGetId());

  rc = (int)osThreadNew((osThreadFunc_t)fn, arg, &attr);

return rc;
}

void TimerInit(Timer* timer)
{
timer->end_time = (struct timeval){0, 0};
}

char TimerIsExpired(Timer* timer)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&timer->end_time, &now, &res);
return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0);
}


void TimerCountdownMS(Timer* timer, unsigned int timeout)
{
struct timeval now;
gettimeofday(&now, NULL);
struct timeval interval = {timeout / 1000, (timeout % 1000) * 1000};
timeradd(&now, &interval, &timer->end_time);
}


void TimerCountdown(Timer* timer, unsigned int timeout)
{
struct timeval now;
gettimeofday(&now, NULL);
struct timeval interval = {timeout, 0};
timeradd(&now, &interval, &timer->end_time);
}


int TimerLeftMS(Timer* timer)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&timer->end_time, &now, &res);
//printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;
}




void MqttMutexInit(Mutex* mutex)
{
mutex->sem = osSemaphoreNew(1, 1, NULL);
}

int MqttMutexLock(Mutex* mutex)
{
return osSemaphoreAcquire(mutex->sem, LOS_WAIT_FOREVER);
}

int MqttMutexUnlock(Mutex* mutex)
{
return osSemaphoreRelease(mutex->sem);
}

int linux_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
{
interval.tv_sec = 0;
interval.tv_usec = 100;
}

setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));

int bytes = 0;
while (bytes < len)
{
int rc = recv(n->my_socket, &buffer[bytes], (size_t)(len - bytes), 0);
if (rc == -1)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
bytes = -1;
break;
}
else if (rc == 0)
{
bytes = 0;
break;
}
else
bytes += rc;
}
return bytes;
}


int linux_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
struct timeval tv;

tv.tv_sec = 0; /* 30 Secs Timeout */
tv.tv_usec = timeout_ms * 1000; // Not init'ing this can cause strange errors

setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,sizeof(struct timeval));
intrc = send(n->my_socket, buffer, len, 0);
return rc;
}


void NetworkInit(Network* n)
{
n->my_socket = 0;
n->mqttread = linux_read;
n->mqttwrite = linux_write;
}


int NetworkConnect(Network* n, char* addr, int port)
{
int type = SOCK_STREAM;
struct sockaddr_in address;
int rc = -1;
sa_family_t family = AF_INET;
struct addrinfo *result = NULL;
struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};

if ((rc = getaddrinfo(addr, NULL, &hints, &result)) == 0)
{
struct addrinfo* res = result;

/* prefer ip4 addresses */
while (res)
{
if (res->ai_family == AF_INET)
{
result = res;
break;
}
res = res->ai_next;
}

if (result->ai_family == AF_INET)
{
address.sin_port = htons(port);
address.sin_family = family = AF_INET;
address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
}
else
rc = -1;

freeaddrinfo(result);
}

if (rc == 0)
{
n->my_socket = socket(family, type, 0);
if (n->my_socket != -1)
rc = connect(n->my_socket, (struct sockaddr*)&address, sizeof(address));
else
rc = -1;
}

return rc;
}


void NetworkDisconnect(Network* n)
{
close(n->my_socket);
}

MQTTLiteOS.h

/*******************************************************************************
* Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
*   http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
*   http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
*   Allan Stockdill-Mander - initial API and implementation and/or initial documentation
*******************************************************************************/

#if !defined(MQTTLiteOS_H)
#define MQTTLiteOS_H


#include <sys/types.h>

#if !defined(SOCKET_ERROR)
/** error in socket operation */
#define SOCKET_ERROR -1
#endif

#if defined(WIN32)
/* default on Windows is 64 - increase to make Linux and Windows the same */
#define FD_SETSIZE 1024
#include <winsock2.h>
#include <ws2tcpip.h>
#define MAXHOSTNAMELEN 256
#define EAGAIN WSAEWOULDBLOCK
#define EINTR WSAEINTR
#define EINVAL WSAEINVAL
#define EINPROGRESS WSAEINPROGRESS
#define EWOULDBLOCK WSAEWOULDBLOCK
#define ENOTCONN WSAENOTCONN
#define ECONNRESET WSAECONNRESET
#define ioctl ioctlsocket
#define socklen_t int
#else
#define INVALID_SOCKET SOCKET_ERROR
#include <sys/socket.h>
#include <sys/param.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#endif

#if defined(WIN32)
#include <Iphlpapi.h>
#else
#include <sys/ioctl.h>
#include <net/if.h>
#endif

#include "ohos_init.h"
#include "cmsis_os2.h"

#include "lwip/ip_addr.h"
#include "lwip/netifapi.h"

#include "lwip/sockets.h"

#define MQTT_TASK

typedef struct Thread
{
osThreadId_t task;
} Thread;

int ThreadStart(Thread*, void (*fn)(void*), void* arg);

typedef struct Timer
{
struct timeval end_time;
} Timer;

typedef struct Mutex
{
osSemaphoreId_t sem;
} Mutex;

void MqttMutexInit(Mutex*);
int MqttMutexLock(Mutex*);
int MqttMutexUnlock(Mutex*);

void TimerInit(Timer*);
char TimerIsExpired(Timer*);
void TimerCountdownMS(Timer*, unsigned int);
void TimerCountdown(Timer*, unsigned int);
int TimerLeftMS(Timer*);

typedef struct Network
{
int my_socket;
int (*mqttread) (struct Network*, unsigned char*, int, int);
int (*mqttwrite) (struct Network*, unsigned char*, int, int);
} Network;

int linux_read(Network*, unsigned char*, int, int);
int linux_write(Network*, unsigned char*, int, int);

void NetworkInit(Network*);
int NetworkConnect(Network*, char*, int);
void NetworkDisconnect(Network*);

#endif
  • 修改编译错误

错误日志截取如下

[OHOS ERROR] riscv32-unknown-elf-ld: ohos/libs/libpahomqtt_static.a(libpahomqtt_static.MQTTLiteOS.o): in function `MutexInit':
[OHOS ERROR] MQTTLiteOS.c:(.text.MutexInit+0x0): multiple definition of `MutexInit'; ohos/libs/libdiscovery.a(libdiscovery.os_adapter.o):os_adapter.c:(.text.MutexInit+0x0): first defined here
[OHOS ERROR] riscv32-unknown-elf-ld: ohos/libs/libpahomqtt_static.a(libpahomqtt_static.MQTTLiteOS.o): in function `MutexLock':
[OHOS ERROR] MQTTLiteOS.c:(.text.MutexLock+0x0): multiple definition of `MutexLock'; ohos/libs/libdiscovery.a(libdiscovery.os_adapter.o):os_adapter.c:(.text.MutexLock+0x0): first defined here
[OHOS ERROR] riscv32-unknown-elf-ld: ohos/libs/libpahomqtt_static.a(libpahomqtt_static.MQTTLiteOS.o): in function `MutexUnlock':
[OHOS ERROR] MQTTLiteOS.c:(.text.MutexUnlock+0x0): multiple definition of `MutexUnlock'; ohos/libs/libdiscovery.a(libdiscovery.os_adapter.o):os_adapter.c:(.text.MutexUnlock+0x0): first defined here
[OHOS ERROR] scons: *** [output/bin/Hi3861_wifiiot_app.out] Error 1
[OHOS ERROR] BUILD FAILED!!!!
[OHOS ERROR] Failed building output/bin/Hi3861_wifiiot_app.out: Error 1
[OHOS ERROR] you can check build log in /home/soon/ohos300_iot/out/hispark_pegasus/wifiiot_hispark_pegasus/build.log
[OHOS ERROR] command: "/home/soon/ohostool_101/ninja/ninja -w dupbuild=warn -C /home/soon/ohos300_iot/out/hispark_pegasus/wifiiot_hispark_pegasus" failed
[OHOS ERROR] return code: 1
[OHOS ERROR] execution path: /home/soon/ohos300_iot
soon@soon-u20:~/ohos300_iot $

我的修改方式是将third_party\pahomqtt\MQTTClient-C\src\MQTTClient.cthird_party\pahomqtt\MQTTClient-C\src\liteOS\MQTTLiteOS.cthird_party\pahomqtt\MQTTClient-C\src\liteOS\MQTTLiteOS.h这三个档案中的所有的

MutexInit(Mutex*);MutexLock(Mutex*);MutexUnlock(Mutex*);

对应替换为

MqttMutexInit(Mutex*);MqttMutexLock(Mutex*);MqttMutexUnlock(Mutex*);

这样子就能正常编译通过了

 

至此我们移植基本结束

 

11.3 测试代码

测试代码比较好写。主要是3个文件,内容我都贴出来了:

(1)BUILD.gn文件内容:

# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
static_library("mqtt_test_at") {
  sources = [
      "mqtt_test.c",
      "at_entry.c"
  ]
include_dirs = [
  "//utils/native/lite/include","//kernel/liteos_m/components/cmsis/2.0",
  "//base/iot_hardware/interfaces/kits/wifiiot_lite",
  "//vendor/hisi/hi3861/hi3861/third_party/lwip_sack/include",
  "//foundation/communication/interfaces/kits/wifi_lite/wifiservice",
  "//third_party/pahomqtt/MQTTPacket/src","//third_party/pahomqtt/MQTTPacket/samples",
  "//vendor\hisi\hi3861\hi3861\components\at\src",
  "//device/hisilicon/hispark_pegasus/sdk_liteos/components/at/src/"
  ]

  #表示需要a_myparty 软件包
  deps = [
      "//third_party/pahomqtt:pahomqtt_static",
  ]
}

(2)at_entry.c文件主要是进行热点连接,因为我不会写代码就用不同人写的代码拼接在一起!!!

#include <stdio.h>
#include <unistd.h>
#include "ohos_init.h"
#include "cmsis_os2.h"
#include <unistd.h>
#include <at.h>
#include <hi_at.h>
#include "hi_wifi_api.h"
#include "mqtt_test.h"

#include <stdio.h>

#include <unistd.h>

#include "ohos_init.h"
#include "cmsis_os2.h"

#include <unistd.h>
#include "hi_wifi_api.h"
//#include "wifi_sta.h"
#include "lwip/ip_addr.h"
#include "lwip/netifapi.h"

#include "lwip/sockets.h"



void mqtt_test_thread(void * argv) {
argv = argv;
mqtt_test();
}
hi_u32 at_exe_mqtt_test_cmd(void) {
osThreadAttr_t attr;
attr.name = "wifi_config_thread";
attr.attr_bits = 0U;
attr.cb_mem = NULL;
attr.cb_size = 0U;
attr.stack_mem = NULL;
attr.stack_size = 4096;
attr.priority = 36;
if (osThreadNew((osThreadFunc_t)mqtt_test_thread, NULL, &attr) == NULL) {
printf("[LedExample] Falied to create LedTask!\n");
}
AT_RESPONSE_OK;
return HI_ERR_SUCCESS;
}
const at_cmd_func g_at_mqtt_func_tbl[] = { {
"+MQTTTEST", 9, HI_NULL, HI_NULL, HI_NULL, (at_call_back_func)at_exe_mqtt_test_cmd
}
,
}
;
void AtExampleEntry(void) {
hi_at_register_cmd(g_at_mqtt_func_tbl,
  sizeof(g_at_mqtt_func_tbl)/sizeof(g_at_mqtt_func_tbl[0]));
}
// SYS_RUN(AtExampleEntry);



#define APP_INIT_VAP_NUM   2
#define APP_INIT_USR_NUM   2

int wifi_ok_flg = 0;

static struct netif *g_lwip_netif = NULL;

/* clear netif's ip, gateway and netmask */
void hi_sta_reset_addr(struct netif *pst_lwip_netif)
{
  ip4_addr_t st_gw;
  ip4_addr_t st_ipaddr;
  ip4_addr_t st_netmask;

  if (pst_lwip_netif == NULL) {
      printf("hisi_reset_addr::Null param of netdev\r\n");
      return;
  }

  IP4_ADDR(&st_gw, 0, 0, 0, 0);
  IP4_ADDR(&st_ipaddr, 0, 0, 0, 0);
  IP4_ADDR(&st_netmask, 0, 0, 0, 0);

  netifapi_netif_set_addr(pst_lwip_netif, &st_ipaddr, &st_netmask, &st_gw);
}

void wifi_wpa_event_cb(const hi_wifi_event *hisi_event)
{
  if (hisi_event == NULL)
      return;

  switch (hisi_event->event) {
      case HI_WIFI_EVT_SCAN_DONE:
          printf("WiFi: Scan results available\n");
          break;
      case HI_WIFI_EVT_CONNECTED:
          printf("WiFi: Connected\n");
          netifapi_dhcp_start(g_lwip_netif);
          wifi_ok_flg = 1;
          break;
      case HI_WIFI_EVT_DISCONNECTED:
          printf("WiFi: Disconnected\n");
          netifapi_dhcp_stop(g_lwip_netif);
          hi_sta_reset_addr(g_lwip_netif);
          break;
      case HI_WIFI_EVT_WPS_TIMEOUT:
          printf("WiFi: wps is timeout\n");
          break;
      default:
          break;
  }
}

int hi_wifi_start_connect(void)
{
  int ret;
  errno_t rc;
  hi_wifi_assoc_request assoc_req = {0};

  /* copy SSID to assoc_req */
  rc = memcpy_s(assoc_req.ssid, HI_WIFI_MAX_SSID_LEN + 1, "MERCURY_8956", 12); /* 9:ssid length */
  if (rc != EOK) {
      return -1;
  }

  //热点加密方式
  assoc_req.auth = HI_WIFI_SECURITY_WPA2PSK;

  /* 热点密码 */
  memcpy(assoc_req.key, "ceshi123", 8);

  ret = hi_wifi_sta_connect(&assoc_req);
  if (ret != HISI_OK) {
      return -1;
  }

  return 0;
}

int hi_wifi_start_sta(void)
{
  int ret;
  char ifname[WIFI_IFNAME_MAX_SIZE + 1] = {0};
  int len = sizeof(ifname);
  const unsigned char wifi_vap_res_num = APP_INIT_VAP_NUM;
  const unsigned char wifi_user_res_num = APP_INIT_USR_NUM;
  unsigned int num = WIFI_SCAN_AP_LIMIT;

  //这里不需要重复进行WiFi init,因为系统启动后就自己会做WiFi init
#if 0
  printf("_______>>>>>>>>>> %s %d \r\n", __FILE__, __LINE__);
  ret = hi_wifi_init(wifi_vap_res_num, wifi_user_res_num);
  if (ret != HISI_OK) {
      return -1;
  }
#endif
  ret = hi_wifi_sta_start(ifname, &len);
  if (ret != HISI_OK) {
      return -1;
  }

  /* register call back function to receive wifi event, etc scan results event,
    * connected event, disconnected event.
    */
  ret = hi_wifi_register_event_callback(wifi_wpa_event_cb);
  if (ret != HISI_OK) {
      printf("register wifi event callback failed\n");
  }

  /* acquire netif for IP operation */
  g_lwip_netif = netifapi_netif_find(ifname);
  if (g_lwip_netif == NULL) {
      printf("%s: get netif failed\n", __FUNCTION__);
      return -1;
  }

  /* 开始扫描附件的WiFi热点 */
  ret = hi_wifi_sta_scan();
  if (ret != HISI_OK) {
      return -1;
  }

  sleep(5);   /* sleep 5s, waiting for scan result. */

  hi_wifi_ap_info *pst_results = malloc(sizeof(hi_wifi_ap_info) * WIFI_SCAN_AP_LIMIT);
  if (pst_results == NULL) {
      return -1;
  }

  //把扫描到的热点结果存储起来
  ret = hi_wifi_sta_scan_results(pst_results, &num);
  if (ret != HISI_OK) {
      free(pst_results);
      return -1;
  }

  //打印扫描到的所有热点
  for (unsigned int loop = 0; (loop < num) && (loop < WIFI_SCAN_AP_LIMIT); loop++) {
      printf("SSID: %s\n", pst_results[loop].ssid);
  }
  free(pst_results);

  /* 开始接入热点 */
  ret = hi_wifi_start_connect();
  if (ret != 0) {
      return -1;
  }
  return 0;
}


void wifi_sta_task(void *arg)
{
  arg = arg;
   
  //连接热点
  hi_wifi_start_sta();

  while(wifi_ok_flg == 0)
  {
      usleep(30000);
  }
   
   
  usleep(2000000);

  AtExampleEntry();

  //开始进入MQTT测试
  //mqtt_test();
}



void wifi_sta_entry(void)
{
  osThreadAttr_t attr;
   
  attr.name = "wifi_sta_demo";
  attr.attr_bits = 0U;
  attr.cb_mem = NULL;
  attr.cb_size = 0U;
  attr.stack_mem = NULL;
  attr.stack_size = 4096;
  attr.priority = 26;

  if (osThreadNew((osThreadFunc_t)wifi_sta_task, NULL, &attr) == NULL) {
      printf("[wifi_sta_demo] Falied to create wifi_sta_demo!\n");
  }
   
}


SYS_RUN(wifi_sta_entry);

 

(3)mqtt_test.c 文件则是编写了一个简单的MQTT测试代码

其中测试用的mqtt服务器是我自己的服务器IP就可大家也可以改成自己的。

#include <stdio.h> 
#include <unistd.h>
#include "ohos_init.h"
#include "cmsis_os2.h"
#include <unistd.h>
#include "hi_wifi_api.h"
  //#include "wifi_sta.h"
#include "lwip/ip_addr.h"
#include "lwip/netifapi.h"
#include "lwip/sockets.h"
#include "MQTTPacket.h"
#include "transport.h"
int toStop = 0;
int mqtt_connect(void) {
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int rc = 0;
int mysock = 0;
unsigned char buf[200];
int buflen = sizeof(buf);
int msgid = 1;
MQTTString topicString = MQTTString_initializer;
int req_qos = 0;
char * payload = "hello HarmonyOS";
int payloadlen = strlen(payload);
int len = 0;
char * host = "121.36.42.100";
//char *host = "192.168.1.102";
int port = 1883;
mysock = transport_open(host, port);
if(mysock < 0)return mysock;
printf("Sending to hostname %s port %d\n", host, port);
data.clientID.cstring = "6414651136eaa44b9c900dd0_Hi3861_0_0_2023032014";
data.keepAliveInterval = 50;
data.cleansession = 1;
data.username.cstring = "6414651136eaa44b9c900dd0_Hi3861";
data.password.cstring = "154221f09e84c79275133c99ada5db847e033553352be6df01bac87780e4cdef";
len = MQTTSerialize_connect(buf, buflen, &data);
rc = transport_sendPacketBuffer(mysock, buf, len);
/* wait for connack */
if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK) {
unsigned char sessionPresent, connack_rc;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0) {
printf("Unable to connect, return code %d\n", connack_rc);
goto exit;
}
}   else
          goto exit;
/* subscribe */
topicString.cstring = "substopic";
len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
rc = transport_sendPacketBuffer(mysock, buf, len);
if (MQTTPacket_read(buf, buflen, transport_getdata) == SUBACK)
/* wait for suback */ {
unsigned short submsgid;
int subcount;
int granted_qos;
rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
if (granted_qos != 0) {
printf("granted qos != 0, %d\n", granted_qos);
goto exit;
}
}   else
          goto exit;
/* loop getting msgs on subscribed topic */
topicString.cstring = "pubtopic";
while (!toStop) {
/* transport_getdata() has a built-in 1 second timeout,your mileage will vary */
if (MQTTPacket_read(buf, buflen, transport_getdata) == PUBLISH) {
unsigned char dup;
int qos;
unsigned char retained;
unsigned short msgid;
int payloadlen_in;
unsigned char* payload_in;
int rc;
MQTTString receivedTopic;
rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,&payload_in, &payloadlen_in, buf, buflen);
printf("message arrived %.*s\n", payloadlen_in, payload_in);
rc = rc;
}
printf("publishing reading\n");
len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
rc = transport_sendPacketBuffer(mysock, buf, len);
}
printf("disconnecting\n");
len = MQTTSerialize_disconnect(buf, buflen);
rc = transport_sendPacketBuffer(mysock, buf, len);
exit: transport_close(mysock);
rc = rc;
return 0;
}
void mqtt_test(void) {
mqtt_connect();
}

mqtt_test.h

/*
* Copyright (c) 2020 HiSilicon (Shanghai) Technologies CO., LIMITED.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef __MQTT_TEST_H__
#define __MQTT_TEST_H__

void mqtt_test(void);

#endif /* __MQTT_TEST_H__ */

到这里就完成了代码部分,可以开始编译了。

 

11.4 实验

这里我们需要先下载一个 Windows电脑端的 MQTT客户端,这样我们就可以用电脑订阅开发板的MQTT主题信息了。

电脑版的mqtt客户端下载链接: https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.ui.app/1.1.1/

我们选择这一个:image.png弄完后打开软件,按图操作:image.png

此时我们去查看 我们电脑端的MQTT客户端软件,可以看到右边已经有接收MQTT信息了,主题未 ohospub,消息内容为 openharmony,说明实验成功。

电脑发送主题为ohossub,内容为123456,查看串口打印,可以看到也收到了数据image.png

 

原文地址:https://blog.51cto.com/u_14640655/4921576