本文介紹如何通過事件總線EventBridge將云消息隊列 MQTT 版的數據推送到函數計算。
前提條件
您已完成以下操作:
事件總線EventBridge
函數計算
云消息隊列 RocketMQ 版
云消息隊列 MQTT 版
注意事項
事件總線EventBridge不支持直接從云消息隊列 MQTT 版的Topic拉取事件。您可以通過云消息隊列 MQTT 版的數據流出功能,將數據流轉到云消息隊列 RocketMQ 版的Topic中,并通過添加事件總線EventBridge的自定義事件源云消息隊列 RocketMQ 版,實現云消息隊列 MQTT 版與事件總線EventBridge的集成。
步驟一:創建數據流出規則
- 登錄云消息隊列 MQTT 版控制臺,并在左側導航欄單擊實例列表。
- 在頂部菜單欄選擇目標地域,然后在實例列表中單擊實例名稱進入實例詳情頁面。
- 在左側導航欄單擊規則管理,然后在頁面左上角,單擊創建規則。
- 在創建規則頁面完成以下操作。
- 在配置基本信息配置向導頁面,填寫規則的基本信息,然后單擊下一步。
參數 取值示例 說明 規則ID 111111 規則的全局唯一標識,說明如下: - 只能包含字母、數字、短劃線(-)和下劃線(_),至少包含一個字母或數字。
- 名稱長度限制在3~64字符之間,長于64字符將被自動截取。
- 創建后無法更新。
描述 migrate from rocketmq 對規則的描述。 狀態 啟用 是否啟用當前規則,取值說明如下: - 啟用
- 停用
規則類型 數據流出 創建的規則類型,取值說明如下: - 數據流出:用于將云消息隊列 MQTT 版的數據導出至其他阿里云產品。詳細信息,請參見跨云產品的數據流出。
- 數據流入:用于將其他阿里云產品的數據導入至云消息隊列 MQTT 版。詳細信息,請參見跨云產品數據流入。
- 上下線通知:用于將獲取的云消息隊列 MQTT 版客戶端上下線事件數據導出至其他阿里云產品。詳細信息,請參見MQTT客戶端上下線事件數據流出。
- 在配置規則源配置向導頁面,配置數據源,然后單擊下一步。
參數 取值示例 說明 Topic TopicA 指定您需導出數據的源Topic,即云消息隊列 MQTT 版的Topic。 - 在配置規則目標配置向導頁面,配置數據的流轉目標,然后單擊創建。
參數 取值示例 說明 目標服務類型 消息隊列 RocketMQ 版 指定您需將源Topic的數據轉發至的目標云產品。 說明 當前僅支持云消息隊列 RocketMQ 版。RocketMQ 實例 MQ_INST_13801563067*****_BbyOD2jQ 指定目標云產品的實例ID,即云消息隊列 RocketMQ 版的實例ID。 說明 僅支持選擇和云消息隊列 MQTT 版實例為同一地域的云產品實例。Topic TopicB 指定目標云產品的資源鍵值,即云消息隊列 RocketMQ 版的Topic。源Topic的數據將流轉至TopicB。
您可以在規則管理的規則列表查看到剛創建的數據流出規則。 - 在配置基本信息配置向導頁面,填寫規則的基本信息,然后單擊下一步。
步驟二:創建自定義事件源
- 登錄事件總線EventBridge控制臺。
- 在左側導航欄,單擊事件總線。
- 在頂部菜單欄,選擇地域。
- 在事件總線頁面,單擊已創建的自定義事件總線。
- 在左側導航欄,單擊事件源。
- 在事件源頁面,單擊添加事件源。
- 在添加自定義事件源面板,輸入名稱和描述,事件提供方選擇消息隊列 RocketMQ 版,并選擇已創建的云消息隊列 RocketMQ 版的資源信息等,然后單擊確定。
步驟三:創建事件規則
重要 目標服務和事件規則必須處于同一地域。
- 登錄事件總線EventBridge控制臺,在左側導航欄,單擊事件總線。
- 在頂部菜單欄,選擇地域,在事件總線頁面,單擊目標總線名稱。
- 在左側導航欄,單擊事件規則,然后單擊創建規則。
- 在創建規則頁面,完成以下操作。
- 在配置基本信息配置向導,在名稱文本框輸入規則名稱,在描述文本框輸入規則的描述,然后單擊下一步。
- 在配置事件模式配置向導,事件源類型選擇自定義事件源,事件源選擇步驟一添加的自定義事件源,在事件模式內容代碼框輸入事件模式,然后單擊下一步。
如需了解更多信息,請參見事件模式。
- 在配置事件目標配置向導,配置事件目標,然后單擊創建。
步驟四:發布事件
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* 本代碼提供簽名鑒權模式下MQ4IOT客戶端發送消息到MQ4IOT客戶端的示例,其中初始化參數請根據實際情況修改。
* 簽名模式即使用阿里云賬號系統提供的AccessKey和SecretKey對每個客戶端計算出一個獨立的簽名供客戶端識別使用。
* 對于實際業務場景使用過程中,考慮到私鑰SecretKey的隱私性,可以將簽名過程放在受信任的環境完成。
*
* 完整demo工程,參考https://github.com/AliwareMQ/lmq-demo。
*/
public class MQ4IoTProducerDemo {
public static void main(String[] args) throws Exception {
/**
* MQ4IOT實例ID,購買后控制臺獲取。
*/
String instanceId = "XXXXX";
/**
* 接入點地址,購買MQ4IOT實例,且配置完成后即可獲取,接入點地址必須填寫分配的域名,不得使用IP地址直接連接,否則可能會導致客戶端異常。
*/
String endPoint = "XXXXX.mqtt.aliyuncs.com";
/**
* 賬號accesskey,從賬號系統控制臺獲取。
*/
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
/**
* 賬號secretKey,從賬號系統控制臺獲取,僅在Signature鑒權模式下需要設置。
*/
String secretKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
/**
* MQ4IOT Client Id,由業務系統分配,需要保證每個TCP連接都不一樣,保證全局唯一,如果不同的客戶端對象(TCP連接)使用了相同的Client Id會導致連接異常斷開。
* Client Id 由兩部分組成,格式為GroupID@@@DeviceId,其中groupId在MQ4IOT控制臺申請,DeviceId由業務方自己設置,Client Id總長度不得超過64個字符。
*/
String clientId = "GID_XXXXX@@@XXXXX";
/**
* MQ4IOT 消息的一級Topic,需要在控制臺申請才能使用。
* 如果使用了沒有申請或者沒有被授權的Topic會導致鑒權失敗,服務端會斷開客戶端連接。
*/
final String parentTopic = "XXXXX";
/**
* MQ4IOT支持子級Topic,用來做自定義的過濾,此處為示意,可以填寫任何字符串。
*/
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
/**
* QoS參數代表傳輸質量,可選0,1,2。
*/
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客戶端使用的協議和端口必須匹配。
* 如果SSL加密則設置ssl://endpoint:8883。
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 客戶端設置好發送超時時間,防止無限阻塞。
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* 客戶端連接成功后就需要盡快訂閱需要的Topic。
*/
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
/**
* 消費消息的回調接口,需要確保該接口不拋異常,該接口運行返回即代表消息消費成功。
* 消費消息需要保證在規定時間內完成,如果消費耗時超過服務端約定的超時時間,對于可靠傳輸的模式,服務端可能會重試推送,業務需要做好冪等去重處理。超時時間約定參考限制。
*/
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
/**
* 發送普通消息時,Topic必須和接收方訂閱的Topic一致,或者符合通配符匹配規則。
*/
mqttClient.publish(mq4IotTopic, message);
/**
* MQ4IoT支持點對點消息,即如果發送方明確知道該消息只需要給特定的一個設備接收,且知道對端的Client Id,則可以直接發送點對點消息。
* 點對點消息不需要經過訂閱關系匹配,可以簡化訂閱方的邏輯。點對點消息的Topic格式規范是{{parentTopic}}/p2p/{{targetClientId}}。
*/
String receiverId = "xxx";
final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}
結果驗證
您可以在函數計算控制臺使用表盤解讀數據指標。
- 登錄函數計算控制臺。
- 在左側導航欄,單擊服務及函數。
- 在頂部菜單欄,選擇地域。
- 在服務列表頁面,找到目標服務,在其右側操作列單擊函數管理。
- 在函數管理頁面,找到目標函數,單擊目標函數名稱。
- 在函數詳情頁面,單擊調用日志頁簽,查看日志。
FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6**** 2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject. 2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****
常見問題
事件發布失敗,我該如何定位問題?
如果事件發布失敗,您可以查看事件軌跡,在事件軌跡頁面的事件投遞區域查看投遞詳情,獲取投遞響應。針對不同投遞響應提示,采取相應的解決措施。
發布到函數計算的事件發布失敗,且投遞響應為[500]ConnectErrorconnectiontimedout,我該如何處理?
文檔內容是否對您有幫助?