将mqtt的消息存储至mysql数据库
Step1. mqtt消息注册及处理
使用python来做:
import paho.mqtt.client as mqtt
import mqtt_msghub as mqtt_msghub # mqtt payload is dealing here...
# MQTT服务器信息
broker = '192.168.0.16'
port = 1883
#topic = 'sensor/shake/measure/1'
username = "xxxxx"
password = "xxxxx"
# 连接回调函数
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("连接成功")
else:
print("连接失败,错误码:" + str(rc))
# 接收消息回调函数
def on_message(client, userdata, msg):
print("收到消息:")
print("主题:" + msg.topic)
#print("消息:" + str(msg.payload.decode())) //have bin format payload, can not transfter into str.
mqtt_msghub.mqtt_dealmsg(client, msg.topic, msg.payload)
# 创建MQTT客户端对象
client = mqtt.Client()
# 设置连接回调函数
client.on_connect = on_connect
# 设置接收消息回调函数
client.on_message = on_message
# 连接MQTT服务器
client.username_pw_set(username, password)
client.connect(broker, port)
# 订阅主题
mqtt_msghub.mqtt_topic_register(client) #at msgHub moule.
# 开始循环监听消息,Ctrl+C中断退出
client.loop_forever()
Step1.1 看看topic注册及消息处理部分
from xml.dom import registerDOMImplementation
import payload_type_exchange as typeExchange
import paho.mqtt.client as mqtt
import table_common_table_crud as dbhelper
import gp_mysql_server as gpmysql #mysql.
mqtt_topics = {
"sensor/shake/xx/%d": "d_shake_xxx_ch%02d",
"sensor/shake/yyy/%d": "d_shake_yyy_ch%02d",
};
def mqtt_topic_register(client):
global mqtt_topics;
for item in mqtt_topics:
key = item;
topic = key.replace('%d', '+');
client.subscribe(topic);
print("mqtt register:[%s]" % topic);
def mqtt_get_topic_related_db_table(client, topic):
dbtable = "";
global mqtt_topics;
if(topic in mqtt_topics): #for dict of python, iterator is just key itself.
return (0, mqtt_topics[topic]);
parts = topic.split("/");
for item in mqtt_topics:
key = item
keyparts = key.split("/");
parts_pre = parts[:-1];
keyparts_pre = keyparts[:-1];
if(parts_pre == keyparts_pre):
ch = int(parts[-1])
dbtable = mqtt_topics[item] % ch
return (0, dbtable);
return (-1, dbtable);
def getdbtable_macrotype(dbtablename):
parts = dbtablename.split("_");
return parts[:-1];
def mqtt_dealmsg(client, topic, payload):
(ret, dbtable) = mqtt_get_topic_related_db_table(client, topic); #check if this msg need to save to db
if(ret!=0): return;
(ret, dbDict) = payload_to_dictOfDbField(payload, dbtable); #get db insert cmd related dictionary object(dbDict)
if(ret != 0): return;
dbhelper.insert_data(gpmysql.gpDbConn(), dbtable, dbDict); //call SQL_insert helper cmd.
def payload_to_dictOfDbField(payload, dbtable):
dbtable_class = getdbtable_macrotype(dbtable);
if(dbtable_class == "d_shake_envelope".split("_")):
return (0, typeExchange.payload_to_d_shake_envelope(payload)); //the indepentant type-convertion functions.
else:
print("unknown payload:%s, ommited to save to DB!" % dbtable);
return (-1, "unknow payload");
Step2 通用的写mySql的辅助函数
代码中包含有两类payload的数据库入库接口,对于json格式,比较容易处理。顶多做一个名称映射表。
对于二进制格式,需要先将二进制转换为一个结构化的数据,然后才能入库。因为dbtable是个二维对象,最佳的载体是python.dictionary.
这里只给出了CRUD中的C和R。UD的代码可以此类推。
def json_to_mysql_insert(json_obj, table_name):
data = json.load(json.dumps(json_obj));
columns = ','.join(data.keys())
values = "','".join(data.values());
insert_statement = "INSERT INTO {} ({}) VALUES ('{}')".format(table_name, columns, values)
return insert_statement
def dictionary_to_mysql_insert(dictionary, table_name):
columns = ','.join(dictionary.keys())
values = ','.join(['%s'] * len(dictionary))
insert_statement = "INSERT INTO {} ({}) VALUES ({})".format(table_name, columns, values)
return insert_statement
# 查询操作
def query_data(conn, tablename):
with conn.cursor() as cursor:
sql = "SELECT * FROM " + tablename;
cursor.execute(sql)
data = cursor.fetchall()
cursor.close()
# 处理数据并转换为 JSON
records = [];
results = data;
for row in results:
json_data = []
for item_name in row:
item = row[item_name];
if isinstance(item, datetime.datetime):
dumbTime = item.strftime('%Y-%m-%d %H:%M:%S') # 将 datetime 对象按照指定格式转换为字符串
json_data.append(dumbTime);
else:
if isinstance(item, bytes):
dumbBytes = binascii.hexlify(item[:8]).decode()
json_data.append(dumbBytes);
else:
if isinstance(item, decimal.Decimal):
dumbDecimal = decimal.Decimal(item).to_eng_string();
json_data.append(dumbDecimal);
else:
json_data.append(item);
records.append(json_data);
str = json.dumps(records);
return str;
Step2.1 原始二进制流到Dictionary的转换:
这里没有定义结构体
from asyncio.windows_events import NULL
from multiprocessing.sharedctypes import Value
from os import name
from pickle import BINBYTES
from sqlite3 import SQLITE_BUSY_SNAPSHOT
import pymysql
import json
import gp_mysql_server as gpmysql
import datetime;
import binascii;
import decimal;
import struct;
from typing import MutableSequence
PT_OF_SENSOR_SHAKE_SAMPLE = 2048
def payload_to_d_shake_envelope(payload):
#payload is a binarray(20:08, Sep18,2023,break.)
#c_struct = CStruct(payload)
c_struct = payload;
verHigh = c_struct[0]
verLow = c_struct[1]
sn = struct.unpack_from("<I", c_struct, offset=2)[0]
type_str = c_struct[6:8].decode('utf-8')
timeOfSample_str = c_struct[8:28].decode('utf-8').rstrip('x00')
scale = struct.unpack_from("<I", c_struct, offset=28)[0]
freqCenter = struct.unpack_from("<f", c_struct, offset=32)[0]
freqBand = struct.unpack_from("<f", c_struct, offset=36)[0]
binData = payload[40:40+4096];
measures = struct.unpack_from("<4f", c_struct, offset=40 + 2 * PT_OF_SENSOR_SHAKE_SAMPLE)
#fmt to dictionary.
fmtValue ={};
#verHight=1, verLow=0, result="1.0"
fmtValue["ver"] = "{}.{}".format(verHigh, verLow);
fmtValue["type"] = type_str;
fmtValue["time"] = timeOfSample_str;
fmtValue["scale"] = scale;
fmtValue["fs"] = freqCenter;
fmtValue["band"] = freqBand;
fmtValue["bin_data"] = binData;
fmtValue["rms"] = measures[0];
fmtValue["ppk"] = measures[1];
fmtValue["kurtossis"] = measures[2];
fmtValue["margin"] = measures[3];
fmtValue["sn"] = sn;
return fmtValue;