将mqtt的消息存储至mysql数据库

Step1. mqtt消息注册及处理

使用python来做:

import paho.mqtt.client as mqtt
import mqtt_msghub as mqtt_msghub # mqtt payload is dealing here...

# MQTT服务器信息

broker = '192.168.0.16'
port = 1883
#topic = 'sensor/shake/measure/1'
username = "xxxxx"
password = "xxxxx"

# 连接回调函数
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("连接成功")
    else:
        print("连接失败,错误码:" + str(rc))

# 接收消息回调函数
def on_message(client, userdata, msg):
    print("收到消息:")
    print("主题:" + msg.topic)
    #print("消息:" + str(msg.payload.decode())) //have bin format payload, can not transfter into str.
    mqtt_msghub.mqtt_dealmsg(client, msg.topic, msg.payload)
    

# 创建MQTT客户端对象
client = mqtt.Client()

# 设置连接回调函数
client.on_connect = on_connect

# 设置接收消息回调函数
client.on_message = on_message

# 连接MQTT服务器
client.username_pw_set(username, password)
client.connect(broker, port)

# 订阅主题
mqtt_msghub.mqtt_topic_register(client) #at msgHub moule.

# 开始循环监听消息,Ctrl+C中断退出
client.loop_forever()

Step1.1 看看topic注册及消息处理部分

from xml.dom import registerDOMImplementation
import payload_type_exchange as typeExchange
import paho.mqtt.client as mqtt
import table_common_table_crud as dbhelper
import gp_mysql_server as gpmysql #mysql.

mqtt_topics = {
    "sensor/shake/xx/%d": "d_shake_xxx_ch%02d",
    "sensor/shake/yyy/%d": "d_shake_yyy_ch%02d",
};

def mqtt_topic_register(client):
    global mqtt_topics;
    for item in mqtt_topics:
        key = item;
        topic = key.replace('%d', '+');
        client.subscribe(topic);
        print("mqtt register:[%s]" % topic);

def mqtt_get_topic_related_db_table(client, topic):
    dbtable = "";
    global mqtt_topics;
    if(topic in mqtt_topics): #for dict of python, iterator is just key itself.
        return (0, mqtt_topics[topic]);
    parts = topic.split("/");
    for item in mqtt_topics:
        key = item
        keyparts = key.split("/");
        parts_pre = parts[:-1];
        keyparts_pre = keyparts[:-1];
        if(parts_pre == keyparts_pre):
            ch = int(parts[-1])
            dbtable = mqtt_topics[item] % ch
            return (0, dbtable);
    return (-1, dbtable);

def getdbtable_macrotype(dbtablename):
    parts = dbtablename.split("_");
    return parts[:-1];
    

def mqtt_dealmsg(client, topic, payload):
    (ret, dbtable) = mqtt_get_topic_related_db_table(client, topic); #check if this msg need to save to db
    if(ret!=0): return;
    (ret, dbDict) = payload_to_dictOfDbField(payload, dbtable);  #get db insert cmd related dictionary object(dbDict)
    if(ret != 0): return;
    dbhelper.insert_data(gpmysql.gpDbConn(), dbtable, dbDict); //call SQL_insert helper cmd.

def payload_to_dictOfDbField(payload, dbtable):
    dbtable_class = getdbtable_macrotype(dbtable);
    if(dbtable_class == "d_shake_envelope".split("_")):
        return (0, typeExchange.payload_to_d_shake_envelope(payload));  //the indepentant type-convertion functions.
    else:
        print("unknown payload:%s, ommited to save to DB!" % dbtable);
        return (-1, "unknow payload");

Step2 通用的写mySql的辅助函数

代码中包含有两类payload的数据库入库接口,对于json格式,比较容易处理。顶多做一个名称映射表。

对于二进制格式,需要先将二进制转换为一个结构化的数据,然后才能入库。因为dbtable是个二维对象,最佳的载体是python.dictionary.

这里只给出了CRUD中的C和R。UD的代码可以此类推。

def json_to_mysql_insert(json_obj, table_name):
    data = json.load(json.dumps(json_obj));
    columns = ','.join(data.keys())
    values = "','".join(data.values());
    insert_statement = "INSERT INTO {} ({}) VALUES ('{}')".format(table_name, columns, values)
    return insert_statement

def dictionary_to_mysql_insert(dictionary, table_name):
    columns = ','.join(dictionary.keys())
    values = ','.join(['%s'] * len(dictionary))
    insert_statement = "INSERT INTO {} ({}) VALUES ({})".format(table_name, columns, values)
    return insert_statement

# 查询操作
def query_data(conn, tablename):
    with conn.cursor() as cursor:
        sql = "SELECT * FROM " + tablename;
        cursor.execute(sql)
        data = cursor.fetchall()
        cursor.close()

    # 处理数据并转换为 JSON
    records = [];
    results = data;
    for row in results:
        json_data = []
        for item_name in row:
            item = row[item_name];
            if isinstance(item, datetime.datetime):
                dumbTime = item.strftime('%Y-%m-%d %H:%M:%S')  # 将 datetime 对象按照指定格式转换为字符串
                json_data.append(dumbTime);
            else:
                if isinstance(item, bytes):
                    dumbBytes = binascii.hexlify(item[:8]).decode()
                    json_data.append(dumbBytes);
                else:
                    if isinstance(item, decimal.Decimal):
                        dumbDecimal = decimal.Decimal(item).to_eng_string();
                        json_data.append(dumbDecimal);
                    else:
                        json_data.append(item);
        records.append(json_data);
    str =  json.dumps(records);
    return str;

Step2.1 原始二进制流到Dictionary的转换:

这里没有定义结构体

from asyncio.windows_events import NULL
from multiprocessing.sharedctypes import Value
from os import name
from pickle import BINBYTES
from sqlite3 import SQLITE_BUSY_SNAPSHOT
import pymysql
import json
import gp_mysql_server as gpmysql
import datetime;
import binascii;
import decimal;
import struct;
from typing import MutableSequence

PT_OF_SENSOR_SHAKE_SAMPLE = 2048

def payload_to_d_shake_envelope(payload):
    #payload is a binarray(20:08, Sep18,2023,break.)
    #c_struct = CStruct(payload)
    c_struct = payload;
    verHigh = c_struct[0]
    verLow = c_struct[1]
    sn = struct.unpack_from("<I", c_struct, offset=2)[0]
    type_str = c_struct[6:8].decode('utf-8')
    timeOfSample_str = c_struct[8:28].decode('utf-8').rstrip('x00')
    scale = struct.unpack_from("<I", c_struct, offset=28)[0]
    freqCenter = struct.unpack_from("<f", c_struct, offset=32)[0]
    freqBand = struct.unpack_from("<f", c_struct, offset=36)[0]
    binData = payload[40:40+4096];
    measures = struct.unpack_from("<4f", c_struct, offset=40 + 2 * PT_OF_SENSOR_SHAKE_SAMPLE)

    #fmt to dictionary.
    fmtValue ={};
    #verHight=1, verLow=0, result="1.0"
    fmtValue["ver"] = "{}.{}".format(verHigh, verLow);
    fmtValue["type"] = type_str;
    fmtValue["time"] = timeOfSample_str;
    fmtValue["scale"] = scale;
    fmtValue["fs"] = freqCenter;
    fmtValue["band"] = freqBand;
    fmtValue["bin_data"] = binData;
    fmtValue["rms"] = measures[0];
    fmtValue["ppk"] = measures[1];
    fmtValue["kurtossis"] = measures[2];
    fmtValue["margin"] = measures[3];
    fmtValue["sn"] = sn;
    return fmtValue;