传感器采集数据,数据转化格式、传输数据

发布时间 2023-06-07 18:05:14作者: ꧁ʚ星月天空ɞ꧂

1. 项目背景

由于公司之前安装的传感器仅做到了采集数据,但是并没有将数据传输给甲方,甲方设计网站才找的我们将自己的传感器采集的数据进行格式转化给甲方的网站上面

2. 代码实现

# encoding=utf-8
import datetime
import json
import random
import threading
import time

import numpy as np
import paho.mqtt.client as mqtt
import requests
from bitstring import BitStream
import os
from paho.mqtt import client as mqtt_client

collect_thread = None




# sensor_lst = ['27002', '27003', '15174', '15175', '15176', '15177']
# wav_sensor = {}
# latest_time_sensor = {}
# for sensor in sensor_lst:
#     wav_sensor[sensor] = {}
#     latest_time_sensor[sensor] = 0

trans = {'00': 'NUL', '01': 'SOH', '02': 'STX', '03': 'ETX', '04': 'EOT', '05': 'ENQ', '06': 'ACK', '07': 'BEL',
         '08': 'BS', '09': 'HT', '0A': 'LF', '0B': 'VT', '0C': 'FF', '0D': 'CR', '0E': 'SO', '0F': 'SI', '10': 'DLE',
         '11': 'DC1', '12': 'DC2', '13': 'DC3', '14': 'DC4', '15': 'NAK', '16': 'SYN', '17': 'ETB', '18': 'CAN',
         '19': 'EM', '1A': 'SUB', '1B': 'ESC', '1C': 'FS', '1D': 'GS', '1E': 'RS', '1F': 'US', '20': 'SP', '21': '!',
         '22': '"', '23': '#', '24': '$', '25': '%', '26': '&', '27': "'", '28': '(', '29': ')', '2A': '*', '2B': '+',
         '2C': ',', '2D': '--', '2E': '.', '2F': '/', '30': '0', '31': '1', '32': '2', '33': '3', '34': '4', '35': '5',
         '36': '6', '37': '7', '38': '8', '39': '9', '3A': ':', '3B': ';', '3C': '<', '3D': '=', '3E': '>', '3F': '?',
         '40': '@', '41': 'A', '42': 'B', '43': 'C', '44': 'D', '45': 'E', '46': 'F', '47': 'G', '48': 'H', '49': 'I',
         '4A': 'J', '4B': 'K', '4C': 'L', '4D': 'M', '4E': 'N', '4F': 'O', '50': 'P', '51': 'Q', '52': 'R', '53': 'S',
         '54': 'T', '55': 'U', '56': 'V', '57': 'W', '58': 'X', '59': 'Y', '5A': 'Z', '5B': '[', '5C': '\\', '5D': ']',
         '5E': '^', '5F': '_', '60': '`', '61': 'a', '62': 'b', '63': 'c', '64': 'd', '65': 'e', '66': 'f', '67': 'g',
         '68': 'h', '69': 'i', '6A': 'j', '6B': 'k', '6C': 'l', '6D': 'm', '6E': 'n', '6F': 'o', '70': 'p', '71': 'q',
         '72': 'r', '73': 's', '74': 't', '75': 'u', '76': 'v', '77': 'w', '78': 'x', '79': 'y', '7A': 'z', '7B': '{',
         '7C': '|', '7D': '}', '7E': '~', '7F': 'DEL', '84': 'IND', '85': 'NEL', '86': 'SSA', '87': 'ESA', '88': 'HTS',
         '89': 'HTJ', '8A': 'VTS', '8B': 'PLD', '8C': 'PLU', '8D': 'RI', '8E': 'SS2', '8F': 'SS3', '90': 'DCS',
         '91': 'PU1', '92': 'PU2', '93': 'STS', '94': 'CCH', '95': 'MW', '96': 'SPA', '97': 'EPA', '9B': 'CSI',
         '9C': 'ST', '9D': 'OSC', '9E': 'PM', '9F': 'APC', 'A1': '隆', 'A2': '垄', 'A3': '拢', 'A4': '', 'A5': '楼',
         'A6': '', 'A7': '搂', 'A8': '陇', 'A9': '漏', 'AA': '陋', 'AB': '芦', 'AC': '', 'AD': '', 'AE': '', 'AF': '',
         'B0': '掳', 'B1': '卤', 'B2': '虏', 'B3': '鲁', 'B4': '', 'B5': '碌', 'B6': '露', 'B7': '路', 'B8': '',
         'B9': '鹿', 'BA': '潞', 'BB': '禄', 'BC': '录', 'BD': '陆', 'BE': '', 'BF': '驴', 'C0': '脌', 'C1': '脕',
         'C2': '脗',
         'C3': '脙',
         'C4': '脛', 'C5': '脜', 'C6': '脝', 'C7': '脟', 'C8': '脠', 'C9': '脡', 'CA': '脢', 'CB': '脣', 'CC': '脤',
         'CD': '脥', 'CE': '脦', 'CF': '脧', 'D0': '', 'D1': '脩', 'D2': '脪', 'D3': '脫', 'D4': '脭', 'D5': '脮',
         'D6': '脰',
         'D7': 'OE', 'D8': '脴', 'D9': '脵', 'DA': '脷', 'DB': '脹', 'DC': '脺', 'DD': 'Y', 'DE': '', 'DF': '脽',
         'E0': '脿',
         'E1': '谩', 'E2': '芒', 'E3': '茫', 'E4': '盲', 'E5': '氓', 'E6': '忙', 'E7': '莽', 'E8': '猫', 'E9': '茅',
         'EA': '锚',
         'EB': '毛', 'EC': '矛', 'ED': '铆', 'EE': '卯', 'EF': '茂', 'F0': '', 'F1': '帽', 'F2': '貌', 'F3': '贸',
         'F4': '么',
         'F5': '玫', 'F6': '枚', 'F7': 'oe', 'F8': '酶', 'F9': '霉', 'FA': '煤', 'FB': '没', 'FC': '眉', 'FD': '每'}

maps = {'27001': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.1#主泵电机ID号27001振动",
        '27002': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.1#主泵电机ID号27002振动",
        '27003': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.2#主泵电机ID号27003振动",
        '27004': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.2#主泵电机ID号27004振动",
        '27005': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.3#主泵电机ID号27005振动",
        '27006': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.3#主泵电机ID号27006振动",
        '27007': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.4#主泵电机ID号27007振动",
        '27008': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.4#主泵电机ID号27008振动",
        '27009': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.5#主泵电机ID号27009振动",
        '27010': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.5#主泵电机ID号27010振动",
        '27011': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.1#循环泵电机ID号27011振动",
        '27012': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.1#循环泵电机ID号27012振动",
        '27013': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.1#主泵电机ID号27013振动",
        '27014': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.1#主泵电机ID号27014振动",
        '27015': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.2#主泵电机ID号27015振动",
        '27016': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.2#主泵电机ID号27016振动",
        '27017': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.3#主泵电机ID号27017振动",
        '27018': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.3#主泵电机ID号27018振动",
        '27019': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.4#主泵电机ID号27019振动",
        '27020': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.4#主泵电机ID号27020振动",
        '27021': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.1#循环泵电机ID号27021振动",
        '27022': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.1#循环泵电机ID号27022振动",
        '27023': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.2#循环泵电机ID号27023振动",
        '27024': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.2#循环泵电机ID号27024振动",
        '27025': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.末搅1#水泵电机ID号27025振动",
        '27026': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.末搅1#水泵电机ID号27026振动",
        '27201': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.末搅1#水泵电机ID号27201电流",
        '27027': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.末搅2#水泵电机ID号27027振动",
        '27028': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.末搅2#水泵电机ID号27028振动",
        '27202': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.末搅2#水泵电机ID号27202电流",
        '27029': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.铸搅1#水泵电机ID号27029振动",
        '27030': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.铸搅1#水泵电机ID号27030振动",
        '27203': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.铸搅1#水泵电机ID号27203电流",
        '27031': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.铸搅2#水泵电机ID号27031振动",
        '27032': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.铸搅2#水泵电机ID号27032振动",
        '27204': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.铸搅2#水泵电机ID号27204电流",
        '27033': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.首搅1#水泵电机ID号27033振动",
        '27034': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.首搅1#水泵电机ID号27034振动",
        '27205': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.首搅1#水泵电机ID号27205电流",
        '27035': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.首搅2#水泵电机ID号27035振动",
        '27036': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.首搅2#水泵电机ID号27036振动",
        '27206': "2#连铸机本体区域2#连铸机冷却系统电搅水阀站.首搅2#水泵电机ID号27206电流",
        '27037': "2#连铸机本体区域2#连铸机冷却系统振动液压站.1#主泵电机ID号27037振动",
        '27038': "2#连铸机本体区域2#连铸机冷却系统振动液压站.1#主泵电机ID号27038振动",
        '27039': "2#连铸机本体区域2#连铸机冷却系统振动液压站.2#主泵电机ID号27039振动",
        '27040': "2#连铸机本体区域2#连铸机冷却系统振动液压站.2#主泵电机ID号27040振动",
        '27065': "2#连铸机本体区域2#连铸机冷却系统振动液压站.1#循环泵电机ID号27065振动",
        '27066': "2#连铸机本体区域2#连铸机冷却系统振动液压站.1#循环泵电机ID号27066振动",
        '27073': "2#连铸机本体区域2#连铸机冷却系统1#蒸排风机.1#蒸排风机电机ID号27073振动",
        '27074': "2#连铸机本体区域2#连铸机冷却系统1#蒸排风机.1#蒸排风机电机ID号27074振动",
        '27071': "2#连铸机本体区域2#连铸机冷却系统2#蒸排风机.2#蒸排风机电机ID号27071振动",
        '27072': "2#连铸机本体区域2#连铸机冷却系统2#蒸排风机.2#蒸排风机电机ID号27072振动",
        '27207': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.1#主泵电机ID号27207电流",
        '27208': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.2#主泵电机ID号27208电流",
        '27209': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.3#主泵电机ID号27209电流",
        '27210': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.4#主泵电机ID号27210电流",
        '27211': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.5#主泵电机ID号27211电流",
        '27212': "2#连铸机本体区域2#连铸机液压/润滑系统拉矫机液压站.1#循环泵电机ID号27212电流",
        '27213': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.1#主泵电机ID号27213电流",
        '27214': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.2#主泵电机ID号27214电流",
        '27215': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.3#主泵电机ID号27215电流",
        '27216': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.4#主泵电机ID号27216电流",
        '27217': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.1#循环泵电机ID号27217电流",
        '27218': "2#连铸机本体区域2#连铸机液压/润滑系统主液压站.2#循环泵电机ID号27218电流",
        '27219': "2#连铸机本体区域2#连铸机冷却系统振动液压站.1#主泵电机ID号27219电流",
        '27220': "2#连铸机本体区域2#连铸机冷却系统振动液压站.2#主泵电机ID号27220电流",
        '27221': "2#连铸机本体区域2#连铸机冷却系统振动液压站.1#循环泵电机ID号27221电流",
        '27075': "2#连铸机本体区域2#连铸机本体系统冲渣装置.1#冲渣泵电机ID号27075振动",
        '27076': "2#连铸机本体区域2#连铸机本体系统冲渣装置.1#冲渣泵电机ID号27076振动",
        '27077': "2#连铸机本体区域2#连铸机本体系统冲渣装置.2#冲渣泵电机ID号27077振动",
        '27078': "2#连铸机本体区域2#连铸机本体系统冲渣装置.2#冲渣泵电机ID号27078振动",
        '27083': "2#连铸机本体区域2#连铸机本体系统1#中间罐车.1#中包车南侧电机ID号27083振动",
        '27084': "2#连铸机本体区域2#连铸机本体系统1#中间罐车.1#中包车南侧电机ID号27084振动",
        '27085': "2#连铸机本体区域2#连铸机本体系统1#中间罐车.1#中包车北侧电机ID号27085振动",
        '27086': "2#连铸机本体区域2#连铸机本体系统1#中间罐车.1#中包车北侧电机ID号27086振动",
        '27087': "2#连铸机本体区域2#连铸机本体系统2#中间罐车.2#中包车南侧电机ID号27087振动",
        '27088': "2#连铸机本体区域2#连铸机本体系统2#中间罐车.2#中包车南侧电机ID号27088振动",
        '27089': "2#连铸机本体区域2#连铸机本体系统2#中间罐车.2#中包车北侧电机ID号27089振动",
        '27090': "2#连铸机本体区域2#连铸机本体系统2#中间罐车.2#中包车北侧电机ID号27090振动",
        '27095': "2#连铸机本体区域2#连铸机本体系统1#中间罐烘烤器.1#中包烘烤器电机ID号27095振动",
        '27096': "2#连铸机本体区域2#连铸机本体系统1#中间罐烘烤器.1#中包烘烤器电机ID号27096振动",
        '27097': "2#连铸机本体区域2#连铸机本体系统2#中间罐烘烤器.2#中包烘烤器电机ID号27097振动",
        '27098': "2#连铸机本体区域2#连铸机本体系统2#中间罐烘烤器.2#中包烘烤器电机ID号27098振动",
        '27144': "2#连铸后道区域缓冷坑电气室风机控制柜+FG1.缓冷坑除尘风机电机ID号27144振动",
        '27176': "2#连铸后道区域缓冷坑电气室风机控制柜+FG1.缓冷坑除尘风机电机ID号27176振动",
        '27067': "2#连铸机本体区域2#连铸机冷却系统二冷水阀站.1#空压机电机ID号27067振动",
        '27068': "2#连铸机本体区域2#连铸机冷却系统二冷水阀站.1#空压机电机ID号27068振动",
        '27069': "2#连铸机本体区域2#连铸机冷却系统二冷水阀站.2#空压机电机ID号27069振动",
        '27070': "2#连铸机本体区域2#连铸机冷却系统二冷水阀站.2#空压机电机ID号27070振动",
        '27091': "2#连铸机本体区域2#连铸机本体系统1#中间罐车.1#中包水口预热风机电机ID号27091振动",
        '27092': "2#连铸机本体区域2#连铸机本体系统1#中间罐车.1#中包水口预热风机电机ID号27092振动",
        '27093': "2#连铸机本体区域2#连铸机本体系统2#中间罐车.2#中包水口预热风机电机ID号27093振动",
        '27094': "2#连铸机本体区域2#连铸机本体系统2#中间罐车.2#中包水口预热风机电机ID号27094振动",
        '27079': "水系统其他装置旋流井离心泵.1#浊环水提升泵电机ID号27079振动",
        '27080': "水系统其他装置旋流井离心泵.1#浊环水提升泵电机ID号27080振动",
        '27081': "水系统其他装置旋流井离心泵.2#浊环水提升泵电机ID号27081振动",
        '27082': "水系统其他装置旋流井离心泵.2#浊环水提升泵电机ID号27082振动",
        '27041': "新建精整区域32+32北侧行车32+32北侧行车:大车配电.北侧32+32t行车大车(西)电机ID号27041振动",
        '27042': "新建精整区域32+32北侧行车32+32北侧行车:大车配电.北侧32+32t行车大车(西)电机ID号27042振动",
        '27043': "新建精整区域32+32北侧行车32+32北侧行车:大车配电.北侧32+32t行车大车(东)电机ID号27043振动",
        '27044': "新建精整区域32+32北侧行车32+32北侧行车:大车配电.北侧32+32t行车大车(东)电机ID号27044振动",
        '27045': "新建精整区域32+32北侧行车32+32北侧行车:主起升配电.北侧32+32t行车主钩电机ID号27045振动",
        '27046': "新建精整区域32+32北侧行车32+32北侧行车:主起升配电.北侧32+32t行车主钩电机ID号27046振动",
        '27047': "新建精整区域32+32北侧行车32+32北侧行车:小车配电.北侧32+32t行车小车电机ID号27047振动",
        '27048': "新建精整区域32+32北侧行车32+32北侧行车:小车配电.北侧32+32t行车小车电机ID号27048振动",
        '27049': "新建精整区域32+32南侧行车32+32南侧行车:大车配电.南侧32+32t行车大车(西)电机ID号27049振动",
        '27050': "新建精整区域32+32南侧行车32+32南侧行车:大车配电.南侧32+32t行车大车(西)电机ID号27050振动",
        '27051': "新建精整区域32+32南侧行车32+32南侧行车:大车配电.南侧32+32t行车大车(东)电机ID号27051振动",
        '27052': "新建精整区域32+32南侧行车32+32南侧行车:大车配电.南侧32+32t行车大车(东)电机ID号27052振动",
        '27053': "新建精整区域32+32南侧行车32+32南侧行车:主起升配电.南侧32+32t行车主钩电机ID号27053振动",
        '27054': "新建精整区域32+32南侧行车32+32南侧行车:主起升配电.南侧32+32t行车主钩电机ID号27054振动",
        '27055': "新建精整区域32+32南侧行车32+32南侧行车:小车配电.南侧32+32t行车小车电机ID号27055振动",
        '27056': "新建精整区域32+32南侧行车32+32南侧行车:小车配电.南侧32+32t行车小车电机ID号27056振动",
        '27057': "新建维修区域新建维修跨行车新建维修跨行车:大车配电.32+5t行车大车(西)电机ID号27057振动",
        '27058': "新建维修区域新建维修跨行车新建维修跨行车:大车配电.32+5t行车大车(西)电机ID号27058振动",
        '27059': "新建维修区域新建维修跨行车新建维修跨行车:大车配电.32+5t行车大车(东)电机ID号27059振动",
        '27060': "新建维修区域新建维修跨行车新建维修跨行车:大车配电.32+5t行车大车(东)电机ID号27060振动",
        '27061': "新建维修区域新建维修跨行车新建维修跨行车:主起升配电.32+5t行车主钩电机ID号27061振动",
        '27062': "新建维修区域新建维修跨行车新建维修跨行车:主起升配电.32+5t行车主钩电机ID号27062振动",
        '27063': "新建维修区域新建维修跨行车新建维修跨行车:付起升配电.32+5t行车付钩电机ID号27063振动",
        '27064': "新建维修区域新建维修跨行车新建维修跨行车:付起升配电.32+5t行车付钩电机ID号27064振动",
        '27101': "水系统其他装置新二冷水泵.连铸机二冷喷淋冷却水泵ID号27101振动",
        '27102': "水系统其他装置新二冷水泵.连铸机二冷喷淋冷却水泵ID号27102振动",
        '27103': "水系统设备冷却水系统连铸结晶器冷却供水泵.连铸结晶器2#冷却水泵ID号27103振动",
        '27104': "水系统设备冷却水系统连铸结晶器冷却供水泵.连铸结晶器2#冷却水泵ID号27104振动",
        '27105': "水系统设备冷却水系统连铸结晶器冷却供水泵.连铸结晶器2#冷却水泵(备用)ID号27105振动",
        '27106': "水系统设备冷却水系统连铸结晶器冷却供水泵.连铸结晶器2#冷却水泵(备用)ID号27106振动",
        '27107': "水系统设备冷却水系统热交换器冷却供水泵.板换冷媒1#冷却水泵ID号27107振动",
        '27108': "水系统设备冷却水系统热交换器冷却供水泵.板换冷媒1#冷却水泵ID号27108振动",
        '27109': "水系统设备冷却水系统热交换器冷却供水泵.板换冷媒2#冷却水泵ID号27109振动",
        '27110': "水系统设备冷却水系统热交换器冷却供水泵.板换冷媒2#冷却水泵ID号27110振动",
        '27111': "水系统设备冷却水系统连铸二冷水及设备直接冷却供水泵.直接冷却水1#供水泵ID号27111振动",
        '27112': "水系统设备冷却水系统连铸二冷水及设备直接冷却供水泵.直接冷却水1#供水泵ID号27112振动",
        '27113': "水系统设备冷却水系统连铸二冷水及设备直接冷却供水泵.直接冷却水2#供水泵ID号27113振动",
        '27114': "水系统设备冷却水系统连铸二冷水及设备直接冷却供水泵.直接冷却水2#供水泵ID号27114振动",
        '27115': "水系统设备冷却水系统连铸设备供水泵.间接冷却水供水泵ID号27115振动",
        '27116': "水系统设备冷却水系统连铸设备供水泵.间接冷却水供水泵ID号27116振动",
        }

electric_quantity = 3.6  # 保存最新特征里的电池电压,用于波形数据上传
temperature = 20

save_wav = True
save_feature = True

broker_send = '127.0.0.1'  # mqtt代理服务器地址
# broker = 'tcp://mqtt.yong-gang.cn:51883'  # mqtt代理服务器地址
port_send = 1883
keepalive_send = 60     # 与代理通信之间允许的最长时间段(以秒为单位)
topic_send = "/mqttbox/点检检修/南京博奥"  # 消息主题
client_id_send = f'python-mqtt-pub-{random.randint(0, 1000)}'  # 客户端id不能重复


def connect_mqtt():
    '''连接mqtt代理服务器'''
    def on_connect(client, userdata, flags, rc):
        '''连接回调函数'''
        # 响应状态码为0表示连接成功
        if rc == 0:
            print("Connected to MQTT OK!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # 连接mqtt代理服务器,并获取连接引用
    client = mqtt_client.Client(client_id_send)
    client.username_pw_set("sbdj","123456")
    client.on_connect = on_connect
    client.connect(broker_send, port_send, keepalive_send)
    return client

def send_publish(client,data):
    '''发布消息'''
    result = client.publish(topic_send, data)
    status = result[0]
    # if status == 0:
    #     print(f"Send `{data}` to topic `{topic_send}`")
    # else:
    #     print(f"Failed to send message to topic {topic_send}")

client_send=connect_mqtt()
client_send.loop_start()

def upload_wav(sensor_id, received_time, wave, sr=5120, electric_quantity=3.6, temperature=20):
    '''
    "datetime": '2021-12-12 12:12:20',
    :param datetime:
    :param wave:
    :param sr:
    :return:
    '''

    global save_wav
    try:
        time.sleep(5)
        # print('=======================kaishishangchuanboxing==========')
        # url = "http://117.78.28.153/api/diagnosis/upload/wavs/"
        url = "http://192.168.110.133:8001/api/diagnosis/upload/wavs/"

        proxy_headers = {
            "content-type": "application/json",
            "SecretID": "LZKJ",
            "SecretKey": "bc367740900a2db7e40d41238cdbc60e"
        }
        data = {
            "sensor__id": sensor_id,
            "datetime": received_time,
            "device_id": '001',
            "manufacture_name": '中国石油',
            "sr": sr,
            "sr_level": 'high',
            "axis": 'z',
            "temperature": temperature,
            "data": wave,
            "unit": 'm/s²',
            'electric_quantity': electric_quantity,
            'rstp_signal_intensity': 0,
            'rssi_signal_intensity': 0
        }
        if save_wav:
            with open('波形.txt', 'w') as f:
                f.write(data.__str__())
            save_wav = False
        res = requests.post(url=url, data=json.dumps(data), headers=proxy_headers)
        print(res)
    except Exception as e:
        print(e)


def upload_feature(sensor_id, received_time, feature_data, datatype):
    global save_feature
    try:

        if datatype == "振动":
            datas = {
                "datetime": received_time,
                "device_id": sensor_id,
                "传感器温度(℃)": feature_data["传感器温度(℃)"],
                "data": feature_data,
                "电池电压(V)": feature_data["电池电压(V)"],
                'rstp_signal_intensity': 0,
                'rssi_signal_intensity': feature_data["rssi_signal_intensity"]
            }
            print(datas)
            try:
                fullname = maps[datas["device_id"]]
                datas["device_id"] = fullname.split(".")[0]
                types = fullname.split(".")[1]
            except Exception as e:
                print(e)
            l = []
            del datas["data"]["传感器温度(℃)"]
            del datas["data"]["电池电压(V)"]
            del datas["data"]["rssi_signal_intensity"]
            print(datas["data"])
            for key, value in datas["data"].items():
                if key in ['x加速度有效值(m/s²)']:
                    d = {}
                    d["type"] = types + "X"
                    d["加速度有效值(m/s²)"] = value

                if key in ['y加速度有效值(m/s²)']:
                    d = {}
                    d["type"] = types + "Y"
                    d["加速度有效值(m/s²)"] = value

                if key in ['z加速度有效值(m/s²)']:
                    d = {}
                    d["type"] = types + "Z"
                    d["加速度有效值(m/s²)"] = value

                if key in ['x速度有效值(mm/s)', 'y速度有效值(mm/s)', 'z速度有效值(mm/s)']:
                    d["速度有效值(mm/s)"] = value
                if key in ['x位移有效值(μm)', 'y位移有效值(μm)', 'z位移有效值(μm)']:
                    d["位移有效值(μm)"] = value
                    l.append(d)
            print(l)
            datas["data"] = l
            print(datas)

        else:
            datas = {
                "datetime": received_time,
                "device_id": sensor_id,
                "data": feature_data,
                "电池电量":feature_data['电池电量(A)'],
                'rstp_signal_intensity': 0,
                'rssi_signal_intensity':-85,
            }
            print(datas)
            try:
                fullname = maps[datas["device_id"]]
                print(fullname)
                print(fullname.split("."))
                datas["device_id"] = fullname.split(".")[0]
                types = fullname.split(".")[1]
            except Exception as e:
                print(e)
            l = []
            del datas["data"]["电池电量(A)"]
            print(datas["data"])
            for key, value in datas["data"].items():
                if key in ['A相电流(A)']:
                    d = {}
                    d["type"] = types + "A"
                    d["电流"] = value
                    l.append(d)

                if key in ['B相电流(A)']:
                    d = {}
                    d["type"] = types + "B"
                    d["电流"] = value
                    l.append(d)

                if key in ['C相电流(A)']:
                    d = {}
                    d["type"] = types + "C"
                    d["电流"] = value
                    l.append(d)

                # if key in ['x速度有效值(mm/s)', 'y速度有效值(mm/s)', 'z速度有效值(mm/s)']:
                #     d["速度有效值(mm/s)"] = value
                # if key in ['x位移有效值(μm)', 'y位移有效值(μm)', 'z位移有效值(μm)']:
                #     d["位移有效值(μm)"] = value
                #     l.append(d)
            print(datas["data"])
            print(l)
            datas["data"] = l
            print(datas)

        send_publish(client_send,json.dumps(datas,ensure_ascii=False))

        # res = requests.post(url=url, data=json.dumps(datas), headers=proxy_headers)
        if save_feature:
            with open('特征.txt', 'w') as f:
                f.write(datas.__str__())
            save_feature = False
        # print('feature', received_time, res.json())
        # print('feature', received_time)
    except Exception as e:
        print(e)


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("连接成功")
    else:
        print("连接失败, 返回编码 %d\n", rc)


# 当代理响应订阅请求时被调用
def on_subscribe(client, userdata, mid, granted_qos):
    print("Subscribed: " + str(mid) + " " + str(granted_qos))


def deal_type_1(rd):
    '''
    每五分钟上传一次
    :param data:
    :return:
    '''

    fmt = '''
        int:16,
        int:8,
        int:8,
        14*int:16
        '''

    had_trans = ''
    for i in range(0, len(rd['data']), 2):
        had_trans += trans[rd['data'][i:i + 2]]
    bs = BitStream(hex=had_trans)
    dec_data = bs.unpack(fmt)

    '''
    0 id
    1 数诀类型
    2 字节
    2 多少
    4 x
    5 x
    6 x
    7 y
    8 y
    9 y
    10 z
    11 z
    12 z
    13 温度
    14 频率
    15 电磁


    '''

    feature = {}
    feature["x加速度有效值(m/s²)"] = dec_data[4] / 100
    feature["x速度有效值(mm/s)"] = dec_data[5] / 100
    feature["x位移有效值(μm)"] = dec_data[6] / 10
    feature["y加速度有效值(m/s²)"] = dec_data[7] / 100
    feature["y速度有效值(mm/s)"] = dec_data[8] / 100
    feature["y位移有效值(μm)"] = dec_data[9] / 10
    feature["z加速度有效值(m/s²)"] = dec_data[10] / 100
    feature["z速度有效值(mm/s)"] = dec_data[11] / 100
    feature["z位移有效值(μm)"] = dec_data[12] / 10
    feature["传感器温度(℃)"] = dec_data[13] / 10
    feature["电池电压(V)"] = dec_data[15] / 100
    feature["rssi_signal_intensity"] = rd['rssi']
    sub_time = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S")
    print(feature)
    upload_feature(sensor_id=str(dec_data[0]), received_time=sub_time, feature_data=feature, datatype="振动")

def deal_type_5(rd):
    '''
    每五分钟上传一次
    :param data:
    :return:
    '''

    print("================================")
    fmt = '''
        int:16,
        int:8,
        int:8,
        6*int:16
        '''

    had_trans = ''
    for i in range(0, len(rd['data']), 2):
        had_trans += trans[rd['data'][i:i + 2]]
    bs = BitStream(hex=had_trans)
    dec_data = bs.unpack(fmt)

    '''
    0 id
    1 数据类型
    2 字节
    2 多少
    4 A相电流
    5 B相电流
    6 C相电流
    7 电池电量
    8 校验和
    '''

    feature = {}
    feature["A相电流(A)"] = dec_data[4] / 10
    feature["B相电流(A)"] = dec_data[5] / 10
    feature["C相电流(A)"] = dec_data[6] / 10
    feature["电池电量(A)"] = dec_data[7] / 10
    sub_time = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S")
    upload_feature(sensor_id=str(dec_data[0]), received_time=sub_time, feature_data=feature, datatype="电流")





# 当使用使用publish()发送的消息已经传输到代理时被调用。
def on_publish(client, obj, mid):
    print("OnPublish, mid: " + str(mid))
    '''发布消息'''



# 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。
def on_message(client, userdata, msg):
    fmt_head = '''
    int:16,
    int:8
    '''

    # if not msg.topic == "AITIOT145":
    #     return None

    rd = eval(msg.payload)
    with open('rd2.txt', 'a+') as f:
        f.write(str(rd))
        f.write('\n')
    had_trans = ''
    for i in range(0, 12, 2):
        had_trans += trans[rd['data'][i:i + 2]]
    bs = BitStream(hex=had_trans)

    sensor_id, data_type = bs.unpack(fmt_head)
    print(sensor_id, data_type)
    if data_type == 1:
        deal_type_1(rd)

    # if data_type == 2:
    #     deal_type_2(rd)

    if data_type==5:
        deal_type_5(rd)


# 当客户端有日志信息时调用
def on_log(client, obj, level, string):
    print("Log:" + string)


def collect():
    print('已开启采集线程')
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    # client.connect('114.115.147.171', 1883, 60)  # 600为keepalive的时间间隔
    # topics = [("AITIOT145", 0)]
    # topics = [("/mqttbox/点检检修/南京博奥", 0)]
    topics = [("AITIOT123456789", 0)]
    # client.username_pw_set()
    client.connect('127.0.0.1', 1883, 30)  # 600为keepalive的时间间隔
    # client.connect('192.168.110.133', 1883, 60)  # 600为keepalive的时间间隔
    client.subscribe(topics, qos=2)


    client.loop_forever()  # 保持连接





def watch():
    print('已开启监视线程')
    global collect_thread
    while True:
        global collect_thread
        if not collect_thread.is_alive():
            collect_thread = threading.Thread(target=collect, args=())
            collect_thread.start()
        time.sleep(20 * 60)


# 开启采集线程
collect_thread = threading.Thread(target=collect, args=())
collect_thread.start()

# 一分钟后开启监视线程
time.sleep(60)
watch_thread = threading.Thread(target=watch, args=())
watch_thread.start()

3. 向mqtt服务器推送数据

import time
import json
import random
from paho.mqtt import client as mqtt_client


broker_send = '127.0.0.1'  # mqtt代理服务器地址
# broker = 'tcp://mqtt.yong-gang.cn:51883'  # mqtt代理服务器地址
port_send = 1883
keepalive_send = 60     # 与代理通信之间允许的最长时间段(以秒为单位)
topic_send = "AITIOT123456789"  # 消息主题
client_id_send = f'python-mqtt-pub-{random.randint(0, 1000)}'  # 客户端id不能重复


def connect_mqtt():
    '''连接mqtt代理服务器'''
    def on_connect(client, userdata, flags, rc):
        '''连接回调函数'''
        # 响应状态码为0表示连接成功
        if rc == 0:
            print("Connected to MQTT OK!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # 连接mqtt代理服务器,并获取连接引用
    client = mqtt_client.Client(client_id_send)
    client.on_connect = on_connect
    client.connect(broker_send, port_send, keepalive_send)
    return client

def publish(client):
    '''发布消息'''
    while True:
        '''每隔4秒发布一次服务器信息'''
        time.sleep(4)
        msg = {'type': 'rx', 'datr': 'SF9BW125', 'time': 1684835343142, 'appeui': '0000000000000000', 'deveui': '377b4d2f3eaf3b46', 'uuid': '12345670001', 'frequency': 472.5, 'channel': 5, 'rssi': -52, 'lorasnr': 12.5, 'fport': 12, 'fcnt': 587, 'data': '3661343130353032303031363033443330324435303644333037443330384433'}
        msg= json.dumps(msg)
        result = client.publish(topic_send, msg)
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic_send}`")
        else:
            print(f"Failed to send message to topic {topic_send}")

def run():
    '''运行发布者'''
    client = connect_mqtt()
    # 运行一个线程来自动调用loop()处理网络事件, 非阻塞
    client.loop_start()
    publish(client)

if __name__ == '__main__':
    run()