快速使用MQTT的Java SDK收發(fā)消息(終端和云端消息收發(fā))
本文介紹如何快速使用云消息隊列 MQTT 版的Java SDK實現(xiàn)MQTT終端和云端服務(wù)的消息收發(fā)。
前提條件
背景信息
接入點說明
終端和云端服務(wù)與云消息隊列 MQTT 版通信時,需要在各自的SDK代碼中設(shè)置云消息隊列 MQTT 版實例的接入點信息,通過接入點和云消息隊列 MQTT 版服務(wù)端連接。
終端SDK接入點格式
使用終端SDK接入云消息隊列 MQTT 版時,需要填寫的接入點格式如下:
公網(wǎng)接入點:
MQTT實例ID.mqtt.aliyuncs.com
VPC 接入點:
MQTT實例ID-internal-vpc.mqtt.aliyuncs.com
終端SDK接入點也可以直接在云消息隊列 MQTT 版控制臺實例詳情頁面的接入點頁簽中查看。
云端SDK接入點格式
使用云端SDK接入云消息隊列 MQTT 版時,需要填寫的接入點格式如下:
重要僅實例地域?qū)儆谥袊鴥?nèi)地的實例支持云端SDK接入。
公網(wǎng)接入點:
MQTT實例ID-server-internet.mqtt.aliyuncs.com
VPC 接入點:
MQTT實例ID-server-internal.mqtt.aliyuncs.com
MQTT實例ID可在云消息隊列 MQTT 版控制臺實例詳情頁面的基礎(chǔ)信息區(qū)域查看。
終端SDK接入點和云端SDK接入點同時支持公網(wǎng)接入點和VPC 接入點。公網(wǎng)接入點為本地公網(wǎng)環(huán)境訪問的IP地址,一般用于物聯(lián)網(wǎng)和移動互聯(lián)網(wǎng)場景中;VPC 接入點為云上私網(wǎng)訪問的IP地址,一般用于云端應(yīng)用接入云消息隊列 MQTT 版。
SDK使用接入點連接服務(wù)時務(wù)必使用域名接入,不得直接使用域名背后的IP地址直接連接,因為IP地址隨時會變化。在以下使用情況中出現(xiàn)的問題云消息隊列 MQTT 版產(chǎn)品方概不負責(zé):
終端或云端不使用域名接入而是使用IP地址接入,產(chǎn)品方更新了域名解析導(dǎo)致原有IP地址失效。
終端或云端網(wǎng)絡(luò)側(cè)對IP地址設(shè)置網(wǎng)絡(luò)防火墻策略,產(chǎn)品方更新了域名解析后新IP地址被您的防火墻策略攔截。
調(diào)用終端SDK發(fā)送消息
- 下載第三方的開源Java SDK。下載地址為Eclipse Paho Java Client。
下載終端SDK的Demo示例作為您代碼開發(fā)的參考。下載地址為mqtt-java-demo。
- 解壓該Demo工程包至您指定的文件夾。
在IntelliJ IDEA中,導(dǎo)入解壓后的文件以創(chuàng)建相應(yīng)的工程,并確認pom.xml中已包含以下依賴。
<dependencies> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-onsmqtt</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.0</version> </dependency> </dependencies>
在MQ4IoTProducerDemo.java類中,按代碼注釋說明填寫相應(yīng)參數(shù),主要涉及您已在創(chuàng)建資源中所創(chuàng)建的MQTT資源信息。然后執(zhí)行Main函數(shù)運行代碼完成消息發(fā)送。
示例代碼如下。
說明在使用示例代碼前,需要配置環(huán)境變量,通過環(huán)境變量讀取訪問憑證。關(guān)于配置環(huán)境變量的方法,請參見配置訪問憑證。
云消息隊列 MQTT 版的AccessKey ID和AccessKey Secret的環(huán)境變量名稱分別為MQTT_AK_ENV和MQTT_SK_ENV。
package com.aliyun.openservices.lmq.example.demo; import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MQ4IoTProducerDemo { public static void main(String[] args) throws Exception { /** * 您創(chuàng)建的云消息隊列 MQTT 版的實例ID。 */ String instanceId = "XXXXX"; /** * 設(shè)置終端SDK的接入點,進入云消息隊列 MQTT 版控制臺實例詳情頁面的接入點頁簽查看。 * 接入點地址必須填寫分配的域名,不得使用IP地址直接連接,否則可能會導(dǎo)致客戶端異常。 */ String endPoint = "XXXXX.mqtt.aliyuncs.com"; /** * AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。 * 阿里云賬號AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進行API訪問或日常運維。 * 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。 * 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。 */ String accessKey = System.getenv("MQTT_AK_ENV"); /** * AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權(quán)模式下需要設(shè)置。 */ String secretKey = System.getenv("MQTT_SK_ENV"); /** * MQTT客戶端ID,由業(yè)務(wù)系統(tǒng)分配,需要保證每個TCP連接都不一樣,保證全局唯一,如果不同的客戶端對象(TCP連接)使用了相同的clientId會導(dǎo)致連接異常斷開。 * clientId由兩部分組成,格式為GroupID@@@DeviceID,其中GroupID在云消息隊列 MQTT 版控制臺創(chuàng)建,DeviceID由業(yè)務(wù)方自己設(shè)置,clientId總長度不得超過64個字符。 */ String clientId = "GID_XXXXX@@@XXXXX"; /** * 云消息隊列 MQTT 版消息的一級Topic,需要在控制臺創(chuàng)建才能使用。 * 如果使用了沒有創(chuàng)建或者沒有被授權(quán)的Topic會導(dǎo)致鑒權(quán)失敗,服務(wù)端會斷開客戶端連接。 */ final String parentTopic = "XXXXX"; /** * 云消息隊列 MQTT 版支持子級Topic,用來做自定義的過濾,此處為示例,可以填寫任意字符串。 * 需要注意的是,完整的Topic長度不得超過128個字符。 */ final String mq4IotTopic = parentTopic + "/" + "testMq4Iot"; /** * QoS參數(shù)代表傳輸質(zhì)量,可選0,1,2。詳細信息,請參見名詞解釋。 */ final int qosLevel = 0; ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId); final MemoryPersistence memoryPersistence = new MemoryPersistence(); /** * 客戶端協(xié)議和端口。客戶端使用的協(xié)議和端口必須匹配,如果是SSL加密則設(shè)置ssl://endpoint:8883。 */ final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence); /** * 設(shè)置客戶端發(fā)送超時時間,防止無限阻塞。 */ mqttClient.setTimeToWait(5000); final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { /** * 客戶端連接成功后就需要盡快訂閱需要的Topic。 */ System.out.println("connect success"); } @Override public void connectionLost(Throwable throwable) { throwable.printStackTrace(); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { /** * 消費消息的回調(diào)接口,需要確保該接口不拋異常,該接口運行返回即代表消息消費成功。 * 消費消息需要保證在規(guī)定時間內(nèi)完成,如果消費耗時超過服務(wù)端約定的超時時間,對于可靠傳輸?shù)哪J剑?wù)端可能會重試推送,業(yè)務(wù)需要做好冪等去重處理。 */ System.out.println( "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]); } }); mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions()); for (int i = 0; i < 10; i++) { MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes()); message.setQos(qosLevel); /** * 發(fā)送普通消息時,Topic必須和接收方訂閱的Topic一致,或者符合通配符匹配規(guī)則。 */ mqttClient.publish(mq4IotTopic, message); /** * 云消息隊列 MQTT 版支持點對點消息,即如果發(fā)送方明確知道該消息只需要給特定的一個設(shè)備接收,且知道對端的clientId,則可以直接發(fā)送點對點消息。 * 點對點消息不需要經(jīng)過訂閱關(guān)系匹配,可以簡化訂閱方的邏輯。點對點消息的Topic格式規(guī)范是 {{parentTopic}}/p2p/{{targetClientId}}。 */ String receiverId = "xxx"; final String p2pSendTopic = parentTopic + "/p2p/" + receiverId; message = new MqttMessage("hello mq4Iot p2p msg".getBytes()); message.setQos(qosLevel); mqttClient.publish(p2pSendTopic, message); } Thread.sleep(Long.MAX_VALUE); } }
調(diào)用云端SDK接收消息
下載云消息隊列 MQTT 版提供的云端SDK。下載地址為云端SDK版本說明。
下載云端SDK的Demo示例做為您代碼開發(fā)的參考。下載地址為mqtt-server-sdk-demo。
- 解壓該Demo工程包至您指定的文件夾。
在IntelliJ IDEA中,導(dǎo)入解壓后的文件以創(chuàng)建相應(yīng)的工程,并確認pom.xml中已包含以下依賴。
<dependencies> <dependency> <groupId>com.alibaba.mqtt</groupId> <artifactId>server-sdk</artifactId> <version>1.0.0.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies>
在MQTTConsumerDemo.java類中,按代碼注釋說明填寫相應(yīng)參數(shù),主要涉及您已在創(chuàng)建資源中所創(chuàng)建好的MQTT資源信息。然后執(zhí)行Main函數(shù)運行代碼完成消息接收。
示例代碼如下。
說明在使用示例代碼前,需要配置環(huán)境變量,通過環(huán)境變量讀取訪問憑證。關(guān)于配置環(huán)境變量的方法,請參見配置訪問憑證。
云消息隊列 MQTT 版的AccessKey ID和AccessKey Secret的環(huán)境變量名稱分別為MQTT_AK_ENV和MQTT_SK_ENV。
package com.aliyun.openservices.lmq.example; import com.alibaba.fastjson.JSONObject; import com.alibaba.mqtt.server.ServerConsumer; import com.alibaba.mqtt.server.callback.MessageListener; import com.alibaba.mqtt.server.config.ChannelConfig; import com.alibaba.mqtt.server.config.ConsumerConfig; import com.alibaba.mqtt.server.model.MessageProperties; public class MQTTConsumerDemo { public static void main(String[] args) throws Exception { /** * 設(shè)置云端SDK的接入點,請參見接入點說明中的云端SDK接入點格式。 * 接入點地址必須填寫分配的域名,不得使用IP地址直接連接,否則可能會導(dǎo)致服務(wù)端異常。 */ String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com"; /** * 使用的協(xié)議和端口必須匹配,該參數(shù)值固定為5672。 */ int port = "5672"; /** * 您創(chuàng)建的云消息隊列 MQTT 版的實例ID。 */ String instanceId = "post-cn-jaj3h8i****"; /** * AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。 * 阿里云賬號AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進行API訪問或日常運維。 * 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。 * 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。 */ String accessKey = System.getenv("MQTT_AK_ENV"); /** * AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權(quán)模式下需要設(shè)置。 */ String secretKey = System.getenv("MQTT_SK_ENV"); /** * 云消息隊列 MQTT 版消息的一級Topic,需要在控制臺創(chuàng)建才能使用。 * 由于云端SDK訂閱消息一般用于云上應(yīng)用進行消息匯總和分析等場景,因此,云端SDK訂閱消息不支持設(shè)置子級Topic。 * 如果使用了沒有創(chuàng)建或者沒有被授權(quán)的Topic會導(dǎo)致鑒權(quán)失敗,服務(wù)端會斷開客戶端連接。 */ String firstTopic = "firstTopic"; ChannelConfig channelConfig = new ChannelConfig(); channelConfig.setDomain(domain); channelConfig.setPort(port); channelConfig.setInstanceId(instanceId); channelConfig.setAccessKey(accessKey); channelConfig.setSecretKey(secretKey); ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig()); serverConsumer.start(); serverConsumer.subscribeTopic(firstTopic, new MessageListener() { @Override public void process(String msgId, MessageProperties messageProperties, byte[] payload) { System.out.println("Receive:" + msgId + "," + JSONObject.toJSONString(messageProperties) + "," + new String(payload)); } }); } }
說明云端SDK消息發(fā)送的示例代碼,請參見MQTTProducerDemo.java。