MQTT數(shù)據(jù)流入規(guī)則的實現(xiàn)
如果您的云端應用需要使用云消息隊列 RocketMQ 版產品的某些功能,例如順序消息特性、事務消息特性等,您可以通過消息流入或流出規(guī)則將云消息隊列 MQTT 版和云消息隊列 RocketMQ 版數(shù)據(jù)進行流轉。本文介紹如何將云消息隊列 RocketMQ 版產品的數(shù)據(jù)導入云消息隊列 MQTT 版。
背景信息
云消息隊列 MQTT 版支持云端SDK,云上應用可直接通過云端SDK接入云消息隊列 MQTT 版服務端進行消息收發(fā)。云端SDK使用,請參見云端開發(fā)概述。
同時云消息隊列 MQTT 版支持和其他云產品進行互通,當前支持的云產品有云消息隊列 RocketMQ 版。
本文以公網(wǎng)環(huán)境中的Java SDK為例說明如何將云消息隊列 RocketMQ 版數(shù)據(jù)流入至云消息隊列 MQTT 版。
網(wǎng)絡訪問
- 公網(wǎng)接入點為本地公網(wǎng)環(huán)境訪問的IP地址,一般用于物聯(lián)網(wǎng)和移動互聯(lián)網(wǎng)場景中;
- VPC 接入點為云上私網(wǎng)訪問的IP地址,一般用于云端應用接入云消息隊列 MQTT 版。
- 客戶端不使用域名接入而是使用IP地址接入,產品方更新了域名解析導致原有IP地址失效。
- 客戶端網(wǎng)絡對IP地址設置網(wǎng)絡防火墻策略,產品方更新了域名解析后新IP地址被您的防火墻策略攔截。
前提條件
- 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
- 下載安裝JDK。
已創(chuàng)建云消息隊列 MQTT 版實例、Topic和Group ID,具體操作,請參見創(chuàng)建資源。
已創(chuàng)建云消息隊列 RocketMQ 版實例、Topic和Group ID,具體操作,請參見步驟二:創(chuàng)建資源。
云消息隊列 MQTT 版數(shù)據(jù)流入規(guī)則僅支持云消息隊列 RocketMQ 版4.x系列實例。
云消息隊列 MQTT 版數(shù)據(jù)流入規(guī)則不能跨地域使用,因此,云消息隊列 MQTT 版和云消息隊列 RocketMQ 版的資源都必須創(chuàng)建在同一地域。
1.創(chuàng)建數(shù)據(jù)流入規(guī)則
登錄云消息隊列 MQTT 版控制臺,并在左側導航欄單擊實例列表。
在頂部菜單欄選擇目標地域,然后在實例列表中單擊實例名稱進入實例詳情頁面。
在左側導航欄單擊規(guī)則管理,然后在頁面左上角,單擊創(chuàng)建規(guī)則。
在創(chuàng)建規(guī)則頁面完成以下操作。
配置基本信息。輸入規(guī)則ID,選擇數(shù)據(jù)流入的規(guī)則類型。
配置規(guī)則源。選擇已經創(chuàng)建好的云消息隊列 RocketMQ 版的實例和Topic。
配置規(guī)則目標。選擇已經創(chuàng)建好的云消息隊列 MQTT 版的Topic。
2.準備測試代碼
2.1下載示例代碼
下載mqtt-java-demo,并解壓該Demo工程包至您指定的文件夾。
在解壓的Demo工程中找到lmq-java-demo文件夾,將此文件夾導入IntelliJ IDEA,并確認pom.xml中已包含以下依賴。
<dependencies> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcprov-jdk15on</artifactId> <version>1.70</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.5.Final</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-onsmqtt</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.0</version> </dependency> </dependencies>
配置訪問憑證。
獲取AccessKey信息。獲取方式,請參見創(chuàng)建AccessKey。
配置環(huán)境變量。云消息隊列 MQTT 版的AccessKey ID和AccessKey Secret的環(huán)境變量名稱分別為MQTT_AK_ENV和MQTT_SK_ENV。關于配置環(huán)境變量的方法,請參見配置訪問憑證。
2.2收發(fā)消息代碼
在RocketMQSendMessageToMQ4IoT.java
類中包含了發(fā)送RocketMQ消息和使用MQTT消費的代碼,按代碼注釋說明填寫云消息隊列 RocketMQ 版和云消息隊列 MQTT 版資源的參數(shù)。
測試中可以將其中發(fā)送P2P消息的相關代碼注釋掉,示例代碼如下。
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class RocketMQSendMessageToMQ4IoT {
public static void main(String[] args) throws Exception {
/**
* 初始化云消息隊列 RocketMQ 版發(fā)送客戶端,實際業(yè)務中一般部署在服務端應用中。
*/
Properties properties = new Properties();
/**
* 設置云消息隊列 RocketMQ 版Group ID,在云消息隊列 RocketMQ 版控制臺創(chuàng)建。
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
/**
* AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。
* 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。
* 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
* 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
*/
properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
/**
* AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權模式下需要設置。
*/
properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
/**
* 設置TCP接入點,該接入點為云消息隊列 RocketMQ 版實例的接入點。進入云消息隊列 RocketMQ 版控制臺實例詳情頁面獲取。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
/**
* 設置云消息隊列 RocketMQ 版的Topic,在云消息隊列 RocketMQ 版控制臺創(chuàng)建。
* 云消息隊列 RocketMQ 版和云消息隊列 MQTT 版配合使用時,RocketMQ客戶端僅操作一級Topic。
*/
final String parentTopic = "XXXXX";
Producer producer = ONSFactory.createProducer(properties);
producer.start();
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* 初始化云消息隊列 MQTT 版接收客戶端,實際業(yè)務中云消息隊列 MQTT 版一般部署在移動終端環(huán)境。
*/
/**
* 您在控制臺創(chuàng)建的云消息隊列 MQTT 版的實例ID。
*/
String instanceId = "XXXXX";
/**
* 設置接入點,進入云消息隊列 MQTT 版控制臺實例詳情頁面獲取。
*/
String endPoint = "XXXXXX.mqtt.aliyuncs.com";
/**
* AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。
* 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。
* 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
* 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
*/
String accessKey = System.getenv("MQTT_AK_ENV");
/**
* AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權模式下需要設置。
*/
String secretKey = System.getenv("MQTT_SK_ENV");
/**
* MQTT客戶端ID,由業(yè)務系統(tǒng)分配,需要保證每個TCP連接都不一樣,保證全局唯一,如果不同的客戶端對象(TCP連接)使用了相同的clientId會導致連接異常斷開。
* clientId由兩部分組成,格式為GroupID@@@DeviceID,其中GroupID在云消息隊列 MQTT 版控制臺創(chuàng)建,DeviceID由業(yè)務方自己設置,clientId總長度不得超過64個字符。
*/
String clientId = "GID_XXXX@@@XXXXX";
/**
* 云消息隊列 MQTT 版支持子級Topic,用來做自定義的過濾,此處為示例,可以填寫任何字符串。
* 需要注意的是,完整的Topic長度不得超過128個字符。
*/
final String subTopic = "/testMq4Iot";
final String mq4IotTopic = parentTopic + subTopic;
/**
* QoS參數(shù)代表傳輸質量,可選0,1,2。詳細信息,請參見名詞解釋。
*/
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客戶端協(xié)議和端口。客戶端使用的協(xié)議和端口必須匹配,如果是SSL加密則設置ssl://endpoint:8883。
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 設置客戶端發(fā)送超時時間,防止無限阻塞。
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* 客戶端連接成功后就需要盡快訂閱需要的Topic。
*/
System.out.println("connect success");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
final String topicFilter[] = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
/**
* 消費消息的回調接口,需要確保該接口不拋異常,該接口運行返回即代表消息消費成功。
* 消費消息需要保證在規(guī)定時間內完成,如果消費耗時超過服務端約定的超時時間,對于可靠傳輸?shù)哪J剑斩丝赡軙卦囃扑停瑯I(yè)務需要做好冪等去重處理。
*/
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
/**
* 使用RocketMQ客戶端發(fā)消息給MQTT客戶端時,Topic指定為一級父Topic,Tag指定為MQ2MQTT。
*/
Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());
/**
* 使用RocketMQ客戶端發(fā)消息給MQTT客戶端時,可以通過MqttSecondTopic屬性設置MQTT的子級Topic屬性。
*/
msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
SendResult result = producer.send(msg);
System.out.println(result);
// /**
// * 發(fā)送P2P消息,設置子級Topic。
// */
// msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId);
// result = producer.send(msg);
// System.out.println(result);
}
Thread.sleep(Long.MAX_VALUE);
}
}
3.結果驗證
執(zhí)行RocketMQSendMessageToMQ4IoT.java
類中的Main函數(shù)運行代碼。可以根據(jù)下面操作驗證消息的發(fā)送和消費情況。
代碼驗證
如下圖所示,RocketMQ消息已經成功發(fā)送,MQTT客戶端也已經成功消費。
控制臺驗證
查詢消息發(fā)送情況。在云消息隊列 RocketMQ 版控制臺消息查詢頁面,根據(jù)Topic和Message ID查詢到消息已經成功發(fā)送,如下圖所示。
查詢消息消費情況。在云消息隊列 MQTT 版控制臺消息軌跡查詢頁面,根據(jù)Message ID查詢消息已經被消費,如下圖所示。