日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

數(shù)據(jù)AMQP方式推送

AMQP(Advanced Message Queuing Protocol,高級消息隊列協(xié)議)轉(zhuǎn)儲功能適用于生活物聯(lián)網(wǎng)平臺與企業(yè)服務(wù)器之間的消息流轉(zhuǎn)。通過集成和使用AMQP SDK,即可實現(xiàn)身份認證、消息接收的能力。我們推薦使用AMQP的方式推送設(shè)備數(shù)據(jù)(如設(shè)備狀態(tài)數(shù)據(jù)、設(shè)備控制記錄等),用戶信息數(shù)據(jù)等。

前提條件

開啟設(shè)備數(shù)據(jù)同步,并配置要同步數(shù)據(jù)的產(chǎn)品。詳細參見設(shè)置數(shù)據(jù)同步數(shù)據(jù)同步

說明

當(dāng)開啟數(shù)據(jù)同步后,集成AMQP客戶端SDK來訂閱數(shù)據(jù)。如果通過控制臺關(guān)閉數(shù)據(jù)同步再開啟時,客戶端SDK需要重新進行連接流程,否則無法正常接收數(shù)據(jù)。如果切換不同的AMQP客戶端,需要把之前的客戶端斷開,再連接新的客戶端。否則新客戶端無法正常接收數(shù)據(jù)。

AMQP SDK使用

  1. 引入依賴。

    AMQP SDK為開源SDK。如果您使用Java開發(fā)語言,推薦使用Apache Qpid JMS客戶端。在項目中添加Maven依賴,Maven信息如下。

    說明

    目前生活物聯(lián)網(wǎng)平臺僅支持Java語言開發(fā),暫未提供其他語言版本,請以生活物聯(lián)網(wǎng)平臺官網(wǎng)為主。

    <!-- amqp 1.0 qpid client -->
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-jms-client</artifactId>
       <version>0.47.0</version>
     </dependency>
     <!-- util for base64-->
     <dependency>
       <groupId>commons-codec</groupId>
      <artifactId>commons-codec</artifactId>
      <version>1.10</version>
    </dependency>           
  2. 認證身份信息。

    • 身份認證需要使用AppKey和AppSecret,該信息可以從控制臺中獲取。獲取身份信息

    • 認證身份信息需要使用EndPoint、AppKey和AppSecret用于鑒權(quán)。

      其中,EndPoint是連接節(jié)點,具體取值如下表所示。

      區(qū)域

      End Point

      中國內(nèi)地

      amqps://ilop.iot-amqp.cn-shanghai.aliyuncs.com:5671

      新加坡

      amqps://ilop.iot-amqp.ap-southeast-1.aliyuncs.com:5671

      美國(弗吉尼亞)

      amqps://ilop.iot-amqp.us-east-1.aliyuncs.com:5671

      德國(法蘭克福)

      amqps://ilop.iot-amqp.eu-central-1.aliyuncs.com:5671

  3. 接收云端消息。

    首先需要創(chuàng)建消息接收的客戶端對象client,并傳入上面身份認證的profile信息。當(dāng)消息接收的客戶端和服務(wù)端建立連接后,服務(wù)端會立即向消息接收的客戶端推送已訂閱的消息,因此建立連接時需要提供默認消息接收的回調(diào)接口,用于處理云端推送的消息。

    完整代碼示例如下。

    import java.net.URI;
    import java.util.Hashtable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import javax.crypto.Mac;
    import javax.crypto.spec.SecretKeySpec;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import org.apache.commons.codec.binary.Base64;
    import org.apache.qpid.jms.JmsConnection;
    import org.apache.qpid.jms.JmsConnectionListener;
    import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class AmqpJavaClientDemo {
    
        private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);
    
        //業(yè)務(wù)處理異步線程池,線程池參數(shù)可以根據(jù)您的業(yè)務(wù)特點調(diào)整,或者您也可以用其他異步方式處理接收到的消息。
        private final static ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000));
    
        public static void main(String[] args) throws Exception {
    
            String appKey = "${YourAppkey}"; //在控制臺自有App,單擊密鑰對應(yīng)的查看,可顯示App的App Key和App Secret 創(chuàng)建App時系統(tǒng)
            String appSecret = "${YourAppSecret}";
            String consumerGroupId = "${YourAppkey}";
            long random = xxxxx;
            //建議使用機器UUID、MAC地址、IP等唯一標(biāo)識等作為clientId。便于您區(qū)分識別不同的客戶端。
            String clientId = "${YourClientId}";
    
            String userName = clientId + "|authMode=appkey"                 
                    + ",signMethod=" + "SHA256"
                    + ",random=" + random
                    + ",appKey=" + appKey
                    + ",groupId=" + consumerGroupId + "|";
            String signContent = "random=" + random;
            String password = doSign(signContent, appSecret, "HmacSHA256");
            String connectionUrlTemplate = "failover:(${AMQPEndPointUrl}?amqp.idleTimeout=80000)"
                    + "?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";
    
            Hashtable<String, String> hashtable = new Hashtable<>();
            hashtable.put("connectionfactory.SBCF",connectionUrlTemplate);
            hashtable.put("queue.QUEUE", "default");
            hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
            Context context = new InitialContext(hashtable);
            ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
            Destination queue = (Destination)context.lookup("QUEUE");
            // 創(chuàng)建連接。
            Connection connection = cf.createConnection(userName, password);
            ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
            // 創(chuàng)建會話。
            // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手動調(diào)用message.acknowledge()。
            // Session.AUTO_ACKNOWLEDGE: SDK自動ACK(推薦)。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            connection.start();
            // 創(chuàng)建Receiver連接。
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(messageListener);
        }
    
        private static MessageListener messageListener = new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    //1.收到消息之后一定要ACK。
                    // 推薦做法:創(chuàng)建Session選擇Session.AUTO_ACKNOWLEDGE,這里會自動ACK。
                    // 其他做法:創(chuàng)建Session選擇Session.CLIENT_ACKNOWLEDGE,這里一定要調(diào)message.acknowledge()來ACK。
                    // message.acknowledge();
                    //2.建議異步處理收到的消息,確保onMessage函數(shù)里沒有耗時邏輯。
                    // 如果業(yè)務(wù)處理耗時過程過長阻塞住線程,可能會影響SDK收到消息后的正常回調(diào)。
                    executorService.submit(() -> processMessage(message));
                } catch (Exception e) {
                    logger.error("submit task occurs exception ", e);
                }
            }
        };
    
        /**
         * 在這里處理您收到消息后的具體業(yè)務(wù)邏輯。
         */
        private static void processMessage(Message message) {
            try {
                byte[] body = message.getBody(byte[].class);
                String content = new String(body);
                String topic = message.getStringProperty("topic");
                String messageId = message.getStringProperty("messageId");
                logger.info("receive message"
                    + ", topic = " + topic
                    + ", messageId = " + messageId
                    + ", content = " + content);
            } catch (Exception e) {
                logger.error("processMessage occurs error ", e);
            }
        }
    
        private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
            /**
             * 連接成功建立。
             */
            @Override
            public void onConnectionEstablished(URI remoteURI) {
                logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
            }
    
            /**
             * 嘗試過最大重試次數(shù)之后,最終連接失敗。
             */
            @Override
            public void onConnectionFailure(Throwable error) {
                logger.error("onConnectionFailure, {}", error.getMessage());
            }
    
            /**
             * 連接中斷。
             */
            @Override
            public void onConnectionInterrupted(URI remoteURI) {
                logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
            }
    
            /**
             * 連接中斷后又自動重連上。
             */
            @Override
            public void onConnectionRestored(URI remoteURI) {
                logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
            }
    
            @Override
            public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
    
            @Override
            public void onSessionClosed(Session session, Throwable cause) {}
    
            @Override
            public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
    
            @Override
            public void onProducerClosed(MessageProducer producer, Throwable cause) {}
        };
    
        /**
         * 計算簽名,password組裝方法,請參見AMQP客戶端接入說明文檔。
         */
        private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
            SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
            Mac mac = Mac.getInstance(signMethod);
            mac.init(signingKey);
            byte[] rawHmac = mac.doFinal(toSignString.getBytes());
            return Hex.encodeHexString(rawHmac);
        }
    }
    說明

    消息推送失敗時,平臺會重新推送,重試策略如下。

    • 如果對端不在線或未回復(fù)ack消息,則會造成消息堆積,堆積的消息轉(zhuǎn)為離線消息。

    • 離線消息每隔1min重試推送一次(每次推送10條)。對端如果成功接收了消息,則重試策略會繼續(xù)推送剩余的離線消息(推送失敗的消息,下一次繼續(xù)推送)。

    • 離線消息最多會保存 1 天,如果 1 天后仍然無法推送成功,則會被刪除。

    • 離線消息會進入單獨的隊列,不會影響后續(xù)消息的實時推送。

消息格式

  • 物的屬性變更消息

    topic:/${productKey}/${deviceName}/thing/event/property/post

    消息字段說明如下。

    參數(shù)

    子參數(shù)

    子參數(shù)

    類型

    含義

    deviceType

    String

    設(shè)備所屬品類

    gmtCreate

    Long

    數(shù)據(jù)流轉(zhuǎn)消息產(chǎn)生時間,自1970-1-1起流逝的毫秒值

    iotId

    String

    設(shè)備的唯一ID

    productKey

    String

    設(shè)備所屬產(chǎn)品的唯一標(biāo)識符

    deviceName

    String

    設(shè)備名稱

    items

    JSON

    變更的狀態(tài)列表

    attribute

    String

    發(fā)生變更的屬性,具體取值由具體情況確定

    value

    具體數(shù)據(jù)類型由具體情況確定

    變更值

    time

    Long

    設(shè)備屬性發(fā)生變化的時間,自1970-1-1起流逝的毫秒值

    消息示例如下。

    {
        "deviceType": "SmartDoor",
        "iotId": "Xzf15db9xxxxxxxxWR001046b400",
        "productKey": "a17xxxxTYNA",
        "gmtCreate": 153xxxx145304,
        "deviceName": "Xzf15xxxxucTHBgUo6WR",
        "items": {
            "WIFI_Rx_Rate": {
                "value": 74274,
                "time": 1534299145344
            }
        }
    }  
  • 物的事件變更消息

    topic:/${productKey}/${deviceName}/thing/event/{tsl.event.identifier}/post

    消息字段說明如下。

    參數(shù)

    子參數(shù)

    類型

    含義

    deviceType

    String

    設(shè)備所屬品類

    iotId

    String

    設(shè)備的唯一ID

    productKey

    String

    設(shè)備所屬產(chǎn)品的唯一標(biāo)識符

    deviceName

    String

    設(shè)備名稱

    identifier

    String

    事件標(biāo)識符,對應(yīng)事件的identifier

    name

    String

    事件名稱

    type

    String

    事件類型

    time

    Long

    設(shè)備上報value對應(yīng)的時間,自1970-1-1起流逝的毫秒值

    value

    JSON

    變更的事件屬性列表:key-value鍵值對

    key

    String

    屬性key

    value

    具體數(shù)據(jù)類型由具體情況確定

    屬性取值

    消息示例如下。

    {
        "deviceType": "SmartDoor",
        "identifier": "Doorxxxxication",
        "iotId": "Xzf15db9xxxxxxxxx01046b400",
        "name": "開門通知",
        "time": 1534319108982,
        "type": "info",
        "productKey": "a17xxxxTYNA",
        "deviceName": "Xzf15xxxxucTHBgUo6WR",
        "value": {
            "KeyID": "x8xxxxxkDY",
            "LockType": 3
        }
    }
  • 設(shè)備服務(wù)返回消息

    topic:/${productKey}/${deviceName}/thing/downlink/reply/message

    消息字段說明如下。

    參數(shù)

    類型

    含義

    gmtCreate

    Long

    數(shù)據(jù)流轉(zhuǎn)消息產(chǎn)生時間,自1970-1-1起流逝的毫秒值

    iotId

    String

    設(shè)備的唯一ID

    productKey

    String

    設(shè)備所屬產(chǎn)品的唯一標(biāo)識符

    deviceName

    String

    設(shè)備名稱

    requestId

    String

    阿里云產(chǎn)生和設(shè)備通信的信息ID

    code

    Integer

    調(diào)用的結(jié)果信息

    message

    String

    結(jié)果信息說明

    topic

    String

    服務(wù)調(diào)用下行時使用的topic

    data

    Object

    設(shè)備返回的結(jié)果,非透傳之間返回設(shè)備結(jié)果,透傳則需要經(jīng)過腳本轉(zhuǎn)換

    消息示例如下。

    {
      "gmtCreate": 151xxxx39881,
      "iotId": "4z819VQHxxxxxxxxxxxx7ee200",
      "productKey": "p1gxxxxBd",
      "deviceName": "xxxxxxxxxx",
      "requestId": "1234",
      "code": 200,
      "message": "success",
      "topic": "/sys/p1gxxxxeUBd/xxxxxxxxxx/thing/service/property/set",
      "data": {}
    }           
  • 物的狀態(tài)變更消息

    為了提高消息有效性,設(shè)備上下線過于頻繁時,會對消息進行篩檢。

    topic:/as/mqtt/status/{pk}/{dn}

    消息字段說明如下。

    參數(shù)

    類型

    含義

    status

    String

    設(shè)備狀態(tài)。online:上線。offline:離線。

    iotId

    String

    設(shè)備在平臺內(nèi)的唯一標(biāo)識。

    offlineReasonCode

    Integer

    設(shè)備下線時,返回的錯誤碼。詳細說明,請參見設(shè)備行為錯誤碼

    productKey

    String

    設(shè)備所屬產(chǎn)品的唯一標(biāo)識。

    deviceName

    String

    設(shè)備名稱。

    lastTime

    String

    該參數(shù)為歷史存量字段,已無實際意義。

    utcLastTime

    String

    time

    String

    設(shè)備上、下線的時間。

    utcTime

    String

    設(shè)備上、下線的UTC時間。

    clientIp

    String

    設(shè)備公網(wǎng)出口IP。

    消息示例如下。

    {
        "status":"offline",
        "iotId":"4z819VQHk6VSLmmBJfrf00107e****",
        "offlineReasonCode":427,
        "productKey":"al12345****",
        "deviceName":"deviceName1234",
        "time":"2018-08-31 15:32:28.205",
        "utcTime":"2018-08-31T07:32:28.205Z",
        "lastTime":"2018-08-31 15:32:28.195",
        "utcLastTime":"2018-08-31T07:32:28.195Z",
        "clientIp":"192.0.2.1"
    } 
  • 用戶綁定變更消息

    用戶綁定/解綁設(shè)備產(chǎn)生的回流消息,用于同步用戶與設(shè)備的綁定、解綁。

    topic:/${productKey}/${deviceName}/thing/awss/enrollee/user

    消息字段說明如下。

    參數(shù)

    子參數(shù)

    類型

    含義

    bind

    bool

    true-綁定;false-解綁

    productKey

    String

    設(shè)備所屬產(chǎn)品的唯一標(biāo)識符

    deviceName

    String

    設(shè)備名稱

    iotId

    String

    設(shè)備的唯一ID

    messageCreateTime

    Long

    消息創(chuàng)建時間

    identityInfos

    list

    用戶信息列表

    identityId

    String

    用戶身份ID

    scopeId

    String

    隔離ID

    tenantId

    String

    租戶ID

    owned

    Integer

    擁有標(biāo)記

    • 0:分享者

    • 1:擁有者

    params

    Map

    擴展參數(shù)(暫未使用)

    {
      "bind":true,
      "productKey": "123xxxx569",
      "deviceName": "deviceNamexxxx34",
      "iotId": "",
      "messageCreateTime": 151xxxx9881,
      "identityInfos":[
         {
           "identityId":"50xxxxxxxxxxxx62060259",
           "scopeId":"",
           "tenantId":"1D89B5xxxxxxxxxxxxxxxx861678FF",
           "owned":1
         }
      ],
      "params":{
      }
    }