日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

ProtoBuf數據解析示例

本文介紹通過數據解析任務解析并存儲設備上報的ProtoBuf格式數據的完整流程。

前提條件

  • 已創建產品和設備,獲取設備證書(ProductKey、DeviceName和DeviceSecret)。具體操作,請參見創建產品單個創建設備

  • 已完成開發環境安裝與配置。

    本文示例使用Python Link SDK開發設備上報ProtoBuf格式數據,使用Linux操作系統(Ubuntu 20.04 64-bit)作為開發環境。Python安裝以及SDK配置,請參見環境要求與配置。

說明

本示例操作步驟以普通用戶權限為例。如果您在操作過程中涉及到管理員權限才能執行的操作,可嘗試使用sudo命令執行。

安裝ProtoBuf

  1. 登錄已準備的開發環境。

  2. 執行以下命令,安裝ProtoBuf工具。

    pip install protobuf
  3. 依此執行以下命令,下載ProtoBuf編譯器文件包,并解壓縮。

    說明

    編譯器文件包版本需要與安裝的ProtoBuf版本一致。您可根據上一步安裝返回的ProtoBuf版本號,查找對應版本的編譯器文件包。具體信息,請參見ProtoBuf版本。

    wget https://github.com/protocolbuffers/protobuf/releases/download/v24.2/protoc-24.2-linux-x86_64.zip
    unzip protoc-24.2-linux-x86_64.zip

    解壓文件如下:

    image.png
  4. 執行以下命令查看PIP安裝路徑。

    which pip

    本示例返回:

    /usr/bin/pip
  5. 執行以下命令,將bin/protoc文件復制到PIP安裝路徑目錄下。

    cp bin/protoc /usr/bin
  6. 執行以下命令,返回如圖所示信息,表示編譯器安裝成功。

    protoc
    image.png

為示例數據生成.desc和.data文件

本示例以設備通過自定義Topic上報ProtoBuf格式數據為例,通過.desc文件讀取對應的二進制數據.data文件,最終解析輸出JSON格式數據,以便數據解析任務后續處理和存儲。

本示例解析的JSON格式數據如下:

說明

數據解析任務不支持解析數組類型數據。

{
  "pname": "test",
  "pid": 1234,
  "pemail": "test@example.com"
}

在Linux操作系統中,使用已安裝的ProtoBuf,生成對應文件。

  1. 根據Proto協議編寫.proto文件,解析JSON數據示例中字段。

    1. 執行以下命令,創建并打開book.proto文件。

      vim book.proto
    2. 輸入以下內容后,保存并退出。

      syntax = "proto3";
      
      package tutorial;
      
      message Person {
        optional string pname = 1;
        optional int32 pid = 2;
        optional string pemail = 3;
      }
      
      message AddressBook {
        repeated Person people = 1;
      }
  2. 執行以下命令,生成對應的book.desc文件。

    protoc  --descriptor_set_out=./book.desc ./book.proto
  3. 執行以下命令生成一個可調用的Python類文件book_pb2.py。

    protoc --python_out=.  book.proto
  4. 根據定義的book.proto文件創建文件person.py,使用person.SerializeToString()將數據序列化轉化為二進制數據,并寫進person.data文件。

    1. 執行以下命令,創建并打開person.py文件。

      vim person.py
    2. 輸入以下內容后,保存并退出。

      import book_pb2
      person = book_pb2.Person()
      person.pid = 1234
      person.pname = "test"
      person.pemail = "test@example.com"
      with open('person.data', "wb") as f:
          f.write(person.SerializeToString())  
         
    3. 執行以下命令,運行person.py,生成二進制數據文件person.data。

      python3 person.py
  5. 將生成的book.descperson.data下載到本地保存。

配置數據解析任務

本示例將設備上報的ProtoBuf格式數據進行解析后存儲到自定義存儲表。

  1. 物聯網平臺控制臺實例概覽頁面,單擊目標企業版實例卡片。

  2. 創建自定義存儲表表顯示名表標識符設置為testProtoc,其他設置使用默認值或自定義。

  3. 創建數據解析任務:任務名稱設置為testProtoc。

  4. 配置源節點

    1. 數據源類型選擇IoT實例TopicTopic類型選擇自定義Topic,單擊下一步。

      image.png
    2. Topic數據格式選擇ProtoBuf,上傳已生成保存的book.descperson.data文件,單擊校驗解析。

      解析成功,可在解析預覽中查看數據。

      image.png
    3. 單擊保存

  5. 配置數據過濾:如下圖所示,當pid值等于1234時,輸出源節點中字段數據。

    image.png
  6. 配置目標節點:將輸出數據存儲到自定義存儲表testProtoc

    image.png
  7. 啟動數據解析任務。

開發設備接入并上報數據

返回Linux操作系統,開發設備接入物聯網平臺,并上報ProtoBuf格式數據。

  1. 執行以下命令,創建并打開設備程序文件mqtt_pub.py。

    cd ~
    vim mqtt_pub.py
  2. 輸入以下代碼后,保存并退出。

    說明

    代碼中product_key、device_name、device_secretlk.publish_topic/gs*****/device1/user/update中ProductKey值,都需替換為設備證書真實值。

    import sys
    from linkkit import linkkit
    import logging
    import os.path
    
    # config log
    __log_format = '%(asctime)s-%(process)d-%(thread)d - %(name)s:%(module)s:%(funcName)s - %(levelname)s - %(message)s'
    logging.basicConfig(format=__log_format)
    
    lk = linkkit.LinkKit(
        host_name="cn-shanghai",
        product_key="gs*****",
        device_name="device1",
        device_secret="06aec**************728f56"
         )
    lk.enable_logger(logging.DEBUG)
    
    
    def on_device_dynamic_register(rc, value, userdata):
        if rc == 0:
            print("dynamic register device success, value:" + value)
        else:
            print("dynamic register device fail, message:" + value)
    
    
    def on_connect(session_flag, rc, userdata):
        print("on_connect:%d,rc:%d" % (session_flag, rc))
        pass
    
    
    def on_disconnect(rc, userdata):
        print("on_disconnect:rc:%d,userdata:" % rc)
    
    
    def on_topic_message(topic, payload, qos, userdata):
        print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos))
        pass
    
    
    def on_subscribe_topic(mid, granted_qos, userdata):
        print("on_subscribe_topic mid:%d, granted_qos:%s" %
              (mid, str(','.join('%s' % it for it in granted_qos))))
        pass
    
    
    def on_unsubscribe_topic(mid, userdata):
        print("on_unsubscribe_topic mid:%d" % mid)
        pass
    
    
    def on_publish_topic(mid, userdata):
        print("on_publish_topic mid:%d" % mid)
    
    def read_into_buffer(filename):
        buf = bytearray(os.path.getsize(filename))
        with open(filename, 'rb') as f:
            f.readinto(buf)
        return buf
    
                                  
    lk.on_device_dynamic_register = on_device_dynamic_register
    lk.on_connect = on_connect
    lk.on_disconnect = on_disconnect
    lk.on_topic_message = on_topic_message
    lk.on_subscribe_topic = on_subscribe_topic
    lk.on_unsubscribe_topic = on_unsubscribe_topic
    lk.on_publish_topic = on_publish_topic
    
    
    lk.config_device_info("Eth|03ACDEFF0032|Eth|03ACDEFF0031")
    lk.connect_async()
    lk.start_worker_loop()
    buf = read_into_buffer('person.data')
    print(buf)
    
    
    while True:
        try:
            msg = input()
        except KeyboardInterrupt:
            sys.exit()
        else:
            if msg == "1":
                lk.disconnect()
            elif msg == "2":
                lk.connect_async()
            elif msg == "3":
                rc, mid = lk.subscribe_topic([(lk.to_full_topic("user/receive/from/jiexi"), 1)])
               
                if rc == 0:
                    print("subscribe multiple topics success:%r, mid:%r" % (rc, mid))
                else:
                    print("subscribe multiple topics fail:%d" % rc)
            elif msg == "4":
                rc, mid = lk.publish_topic("/gs*****/device1/user/update", buf)
                if rc == 0:
                    print("publish topic success:%r, mid:%r" % (rc, mid))
                else:
                    print("publish topic fail:%d" % rc)
            else:
                sys.exit()
                
  3. 執行以下命令,確認激活VirtualEnvironments。

    cd work_dir
    source test_env/bin/activate
  4. 執行以下命令,運行設備程序。

    cd ~
    python3 mqtt_pub.py

    設備讀取到的數據如下:

    bytearray(b'\n\x04test\x10\xd2\t\x1a\x10test@example.com')
  5. 輸入4,觸發設備上報消息。

    4
    2023-09-01 11:26:46,149-****** - Paho:client:_easy_log - DEBUG - Sending PUBLISH (d0, q1, r0, m1), 'b'/gs*****/device1/user/update'', ... (43 bytes)
    publish topic success:0, mid:1
    2023-09-01 11:26:46,172-****** - Paho:client:_easy_log - DEBUG - Received PUBACK (Mid: 1)
    2023-09-01 11:26:46,172-****** - linkkit:linkkit:debug - DEBUG - post_message :'on_publish' 
    2023-09-01 11:26:46,172-****** - linkkit:linkkit:debug - DEBUG - post_message success
    2023-09-01 11:26:46,173-****** - linkkit:linkkit:debug - DEBUG - thread runnable pop cmd:'on_publish'
    2023-09-01 11:26:46,173-****** - linkkit:linkkit:debug - DEBUG - __on_internal_publish message:1
    on_publish_topic mid:1

查看數據解析結果

  1. 返回物聯網平臺控制臺實例下,在監控運維 > 日志服務頁面的云端運行日志頁簽,可查看到設備上線并上報數據。

    image.png
  2. 數據服務 > 數據存儲頁面的離線存儲 > 自定義存儲表頁簽,單擊testProtoc表右側操作列的數據預覽,可查看到數據解析任務存儲的數據。

    image.png