本文介紹通過數據解析任務解析并存儲設備上報的ProtoBuf格式數據的完整流程。
前提條件
已創建產品和設備,獲取設備證書(ProductKey、DeviceName和DeviceSecret)。具體操作,請參見創建產品和單個創建設備。
已完成開發環境安裝與配置。
本文示例使用Python Link SDK開發設備上報ProtoBuf格式數據,使用Linux操作系統(Ubuntu 20.04 64-bit)作為開發環境。Python安裝以及SDK配置,請參見環境要求與配置。
本示例操作步驟以普通用戶權限為例。如果您在操作過程中涉及到管理員權限才能執行的操作,可嘗試使用sudo
命令執行。
安裝ProtoBuf
登錄已準備的開發環境。
執行以下命令,安裝ProtoBuf工具。
pip install protobuf
依此執行以下命令,下載ProtoBuf編譯器文件包,并解壓縮。
說明編譯器文件包版本需要與安裝的ProtoBuf版本一致。您可根據上一步安裝返回的ProtoBuf版本號,查找對應版本的編譯器文件包。具體信息,請參見ProtoBuf版本。
wget https://github.com/protocolbuffers/protobuf/releases/download/v24.2/protoc-24.2-linux-x86_64.zip unzip protoc-24.2-linux-x86_64.zip
解壓文件如下:
執行以下命令查看PIP安裝路徑。
which pip
本示例返回:
/usr/bin/pip
執行以下命令,將
bin/protoc
文件復制到PIP安裝路徑目錄下。cp bin/protoc /usr/bin
執行以下命令,返回如圖所示信息,表示編譯器安裝成功。
protoc
為示例數據生成.desc和.data文件
本示例以設備通過自定義Topic上報ProtoBuf格式數據為例,通過.desc
文件讀取對應的二進制數據.data
文件,最終解析輸出JSON格式數據,以便數據解析任務后續處理和存儲。
本示例解析的JSON格式數據如下:
數據解析任務不支持解析數組類型數據。
{
"pname": "test",
"pid": 1234,
"pemail": "test@example.com"
}
在Linux操作系統中,使用已安裝的ProtoBuf,生成對應文件。
根據Proto協議編寫
.proto
文件,解析JSON數據示例中字段。執行以下命令,創建并打開
book.proto
文件。vim book.proto
輸入以下內容后,保存并退出。
syntax = "proto3"; package tutorial; message Person { optional string pname = 1; optional int32 pid = 2; optional string pemail = 3; } message AddressBook { repeated Person people = 1; }
執行以下命令,生成對應的
book.desc
文件。protoc --descriptor_set_out=./book.desc ./book.proto
執行以下命令生成一個可調用的Python類文件
book_pb2.py
。protoc --python_out=. book.proto
根據定義的
book.proto
文件創建文件person.py
,使用person.SerializeToString()
將數據序列化轉化為二進制數據,并寫進person.data
文件。執行以下命令,創建并打開
person.py
文件。vim person.py
輸入以下內容后,保存并退出。
import book_pb2 person = book_pb2.Person() person.pid = 1234 person.pname = "test" person.pemail = "test@example.com" with open('person.data', "wb") as f: f.write(person.SerializeToString())
執行以下命令,運行
person.py
,生成二進制數據文件person.data
。python3 person.py
將生成的
book.desc
和person.data
下載到本地保存。
配置數據解析任務
本示例將設備上報的ProtoBuf格式數據進行解析后存儲到自定義存儲表。
開發設備接入并上報數據
返回Linux操作系統,開發設備接入物聯網平臺,并上報ProtoBuf格式數據。
執行以下命令,創建并打開設備程序文件
mqtt_pub.py
。cd ~ vim mqtt_pub.py
輸入以下代碼后,保存并退出。
說明代碼中
product_key
、device_name
、device_secret
及lk.publish_topic
的/gs*****/device1/user/update
中ProductKey值,都需替換為設備證書真實值。import sys from linkkit import linkkit import logging import os.path # config log __log_format = '%(asctime)s-%(process)d-%(thread)d - %(name)s:%(module)s:%(funcName)s - %(levelname)s - %(message)s' logging.basicConfig(format=__log_format) lk = linkkit.LinkKit( host_name="cn-shanghai", product_key="gs*****", device_name="device1", device_secret="06aec**************728f56" ) lk.enable_logger(logging.DEBUG) def on_device_dynamic_register(rc, value, userdata): if rc == 0: print("dynamic register device success, value:" + value) else: print("dynamic register device fail, message:" + value) def on_connect(session_flag, rc, userdata): print("on_connect:%d,rc:%d" % (session_flag, rc)) pass def on_disconnect(rc, userdata): print("on_disconnect:rc:%d,userdata:" % rc) def on_topic_message(topic, payload, qos, userdata): print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos)) pass def on_subscribe_topic(mid, granted_qos, userdata): print("on_subscribe_topic mid:%d, granted_qos:%s" % (mid, str(','.join('%s' % it for it in granted_qos)))) pass def on_unsubscribe_topic(mid, userdata): print("on_unsubscribe_topic mid:%d" % mid) pass def on_publish_topic(mid, userdata): print("on_publish_topic mid:%d" % mid) def read_into_buffer(filename): buf = bytearray(os.path.getsize(filename)) with open(filename, 'rb') as f: f.readinto(buf) return buf lk.on_device_dynamic_register = on_device_dynamic_register lk.on_connect = on_connect lk.on_disconnect = on_disconnect lk.on_topic_message = on_topic_message lk.on_subscribe_topic = on_subscribe_topic lk.on_unsubscribe_topic = on_unsubscribe_topic lk.on_publish_topic = on_publish_topic lk.config_device_info("Eth|03ACDEFF0032|Eth|03ACDEFF0031") lk.connect_async() lk.start_worker_loop() buf = read_into_buffer('person.data') print(buf) while True: try: msg = input() except KeyboardInterrupt: sys.exit() else: if msg == "1": lk.disconnect() elif msg == "2": lk.connect_async() elif msg == "3": rc, mid = lk.subscribe_topic([(lk.to_full_topic("user/receive/from/jiexi"), 1)]) if rc == 0: print("subscribe multiple topics success:%r, mid:%r" % (rc, mid)) else: print("subscribe multiple topics fail:%d" % rc) elif msg == "4": rc, mid = lk.publish_topic("/gs*****/device1/user/update", buf) if rc == 0: print("publish topic success:%r, mid:%r" % (rc, mid)) else: print("publish topic fail:%d" % rc) else: sys.exit()
執行以下命令,確認激活VirtualEnvironments。
cd work_dir source test_env/bin/activate
執行以下命令,運行設備程序。
cd ~ python3 mqtt_pub.py
設備讀取到的數據如下:
bytearray(b'\n\x04test\x10\xd2\t\x1a\x10test@example.com')
輸入
4
,觸發設備上報消息。4 2023-09-01 11:26:46,149-****** - Paho:client:_easy_log - DEBUG - Sending PUBLISH (d0, q1, r0, m1), 'b'/gs*****/device1/user/update'', ... (43 bytes) publish topic success:0, mid:1 2023-09-01 11:26:46,172-****** - Paho:client:_easy_log - DEBUG - Received PUBACK (Mid: 1) 2023-09-01 11:26:46,172-****** - linkkit:linkkit:debug - DEBUG - post_message :'on_publish' 2023-09-01 11:26:46,172-****** - linkkit:linkkit:debug - DEBUG - post_message success 2023-09-01 11:26:46,173-****** - linkkit:linkkit:debug - DEBUG - thread runnable pop cmd:'on_publish' 2023-09-01 11:26:46,173-****** - linkkit:linkkit:debug - DEBUG - __on_internal_publish message:1 on_publish_topic mid:1
查看數據解析結果
返回物聯網平臺控制臺實例下,在 頁面的云端運行日志頁簽,可查看到設備上線并上報數據。
在 頁面的 頁簽,單擊testProtoc表右側操作列的數據預覽,可查看到數據解析任務存儲的數據。