MQTT接入使用示例
本文以C Link SDK中的Demo文件./mqtt_basic_demo.c
為例,介紹如何調(diào)用Link SDK的API,將MQTT協(xié)議的設(shè)備接入物聯(lián)網(wǎng)平臺并進行消息收發(fā)。
背景信息
MQTT接入的更多信息,請參見概述。
步驟一:初始化
添加頭文件。
#include "aiot_state_api.h" #include "aiot_sysdep_api.h" #include "aiot_mqtt_api.h"
配置底層依賴和日志輸出。
aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile); aiot_state_set_logcb(demo_state_logcb);
調(diào)用aiot_mqtt_init,創(chuàng)建MQTT客戶端實例,并初始化默認(rèn)參數(shù)。
mqtt_handle = aiot_mqtt_init(); if (mqtt_handle == NULL) { printf("aiot_mqtt_init failed\n"); return -1; }
步驟二:配置功能
調(diào)用aiot_mqtt_setopt,配置以下功能。
更多功能的配置項,請參見aiot_mqtt_option_t。
配置連接參數(shù)。
示例代碼:
/* TODO: 替換為自己設(shè)備的認(rèn)證信息。 */ char *product_key = "a18wP******"; char *device_name = "LightSwitch"; char *device_secret = "uwMTmVAMnGGHaAkqmeDY6cHxxB******"; /* TODO: 替換為自己設(shè)備的接入域名。 */ char *mqtt_host = "iot-06z00ax1o******.mqtt.iothub.aliyuncs.com"; ... ... /* 以下參數(shù)為可選,您可使用SDK文件中默認(rèn)內(nèi)容。 */ /* 配置MQTT服務(wù)器地址。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)url); /* 配置MQTT服務(wù)器端口。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port); /* 配置設(shè)備ProductKey。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key); /* 配置設(shè)備DeviceName。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name); /* 配置設(shè)備DeviceSecret。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret); /* 配置網(wǎng)絡(luò)連接的安全憑據(jù)。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
相關(guān)參數(shù):
參數(shù)
示例
說明
mqtt_host
iot-06z00ax1o******.mqtt.iothub.aliyuncs.com
product_key
a18wP******
設(shè)備認(rèn)證信息。更多信息,請參見獲取設(shè)備認(rèn)證信息。
本例程的身份認(rèn)證方式為一機一密。
device_name
LightSwitch
device_secret
uwMTmVAMnGGHaAkqmeDY6cHxxB******
MQTT?;钫f明:
重要設(shè)備端在?;顣r間間隔內(nèi),至少需要發(fā)送一次報文,包括ping請求。
從物聯(lián)網(wǎng)平臺發(fā)送CONNACK響應(yīng)CONNECT消息時,開始心跳計時。收到PUBLISH、SUBSCRIBE、PING或 PUBACK消息時,會重置計時器。物聯(lián)網(wǎng)平臺每隔30秒定時檢測一次設(shè)備的保活心跳,設(shè)備上線時間點距離最新定時檢測時間點的時間,是定時檢測的等待時間。定義最大超時時間為:
?;钚奶鴷r間*1.5+定時檢測的等待時間
。超過最大超時時間未收到設(shè)備消息,服務(wù)器會自動斷開連接。
C Link SDK具備?;钅芰?,您可以設(shè)置以下配置項,自定義設(shè)備連接的?;钚奶?。如果不配置,則取默認(rèn)值。
配置項
默認(rèn)值
說明
AIOT_MQTTOPT_HEARTBEAT_MAX_LOST
2
可容忍的心跳丟失閾值。即:心跳請求報文達到設(shè)置的次數(shù)后,發(fā)起重連。
AIOT_MQTTOPT_HEARTBEAT_INTERVAL_MS
25,000
每次發(fā)起重連之間的間隔時間。單位毫秒, 取值范圍:1000~1,200,000。
AIOT_MQTTOPT_KEEPALIVE_SEC
1200
可容忍的心跳丟失時間閾值。即:失去心跳后,設(shè)置的時間內(nèi),允許發(fā)起重連。單位秒,取值范圍:30~1,200。建議取值大于300。
配置狀態(tài)監(jiān)控和消息回調(diào)。
配置狀態(tài)監(jiān)控回調(diào)函數(shù)。
示例代碼:
int main(int argc, char *argv[]) { ... ... /* 配置MQTT默認(rèn)消息接收回調(diào)函數(shù)。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler); /* 配置MQTT事件回調(diào)函數(shù)。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)demo_mqtt_event_handler); ... ... }
相關(guān)參數(shù):
配置項
示例值
說明
AIOT_MQTTOPT_RECV_HANDLER
demo_mqtt_default_recv_handler
當(dāng)接收消息時,根據(jù)該回調(diào)函數(shù)定義的處理邏輯,執(zhí)行對應(yīng)的處理。
AIOT_MQTTOPT_EVENT_HANDLER
demo_mqtt_event_handler
當(dāng)設(shè)備連接狀態(tài)發(fā)生變化時,根據(jù)該回調(diào)函數(shù)定義的處理邏輯,執(zhí)行對應(yīng)的處理。
定義狀態(tài)監(jiān)控的回調(diào)函數(shù)。
重要請勿將事件的處理邏輯,定義得過于耗時,以免阻塞收包線程。
連接狀態(tài)的變化包括網(wǎng)絡(luò)異常、自動重連已成功、已斷開連接等。
如果要根據(jù)連接狀態(tài)的變化做應(yīng)對處理,可在
TODO
處,按照需要修改代碼。
/* MQTT事件回調(diào)函數(shù), 當(dāng)網(wǎng)絡(luò)連接、重連或斷開時,觸發(fā)該函數(shù), 事件定義見core/aiot_mqtt_api.h。 */ void demo_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event, void *userdata) { switch (event->type) { /* 調(diào)用了aiot_mqtt_connect()接口, 與MQTT服務(wù)器建立連接。 */ case AIOT_MQTTEVT_CONNECT: { printf("AIOT_MQTTEVT_CONNECT\n"); /* TODO: 處理SDK建立連接成功, 不可在此調(diào)用耗時較長的阻塞函數(shù)。 */ } break; /* SDK因網(wǎng)絡(luò)狀況被動斷開連接后, 成功自動發(fā)起重連。 */ case AIOT_MQTTEVT_RECONNECT: { printf("AIOT_MQTTEVT_RECONNECT\n"); /* TODO: 處理SDK重連成功, 不可在此調(diào)用耗時較長的阻塞函數(shù)。 */ } break; /* SDK因網(wǎng)絡(luò)狀況被動斷開了連接, network底層讀寫失敗, heartbeat沒有按預(yù)期得到服務(wù)端心跳應(yīng)答。 */ case AIOT_MQTTEVT_DISCONNECT: { char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") : ("heartbeat disconnect"); printf("AIOT_MQTTEVT_DISCONNECT: %s\n", cause); /* TODO: 處理SDK被動斷開連接, 不可在此調(diào)用耗時較長的阻塞函數(shù)。 */ } break; default: { } } }
定義消息接收的回調(diào)函數(shù)。
重要請勿將消息的處理邏輯,定義得過于耗時,以免阻塞收包線程。
如果您要根據(jù)接收的消息做應(yīng)對處理,可在
TODO
處,按照需要修改代碼。
/* MQTT默認(rèn)消息處理回調(diào), 當(dāng)SDK從服務(wù)器收到MQTT消息時, 且您未設(shè)置對應(yīng)回調(diào)的處理時,以下接口被調(diào)用。 */ void demo_mqtt_default_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata) { switch (packet->type) { case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: { printf("heartbeat response\n"); /* TODO: 處理服務(wù)器對心跳的回應(yīng), 一般不處理。 */ } break; case AIOT_MQTTRECV_SUB_ACK: { printf("suback, res: -0x%04X, packet id: %d, max qos: %d\n", -packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos); /* TODO: 處理服務(wù)器對訂閱請求的回應(yīng), 一般不處理。 */ } break; case AIOT_MQTTRECV_PUB: { printf("pub, qos: %d, topic: %.*s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic); printf("pub, payload: %.*s\n", packet->data.pub.payload_len, packet->data.pub.payload); /* TODO: 處理服務(wù)器下發(fā)的業(yè)務(wù)報文。 */ } break; case AIOT_MQTTRECV_PUB_ACK: { printf("puback, packet id: %d\n", packet->data.pub_ack.packet_id); /* TODO: 處理服務(wù)器對QoS=1上報消息的回應(yīng), 一般不處理。 */ } break; default: { } } }
步驟三:請求連接
調(diào)用aiot_mqtt_connect,根據(jù)配置連接的參數(shù),向物聯(lián)網(wǎng)平臺,發(fā)起連接認(rèn)證請求。
/* 與服務(wù)器建立MQTT連接。 */
res = aiot_mqtt_connect(mqtt_handle);
if (res < STATE_SUCCESS) {
/* 嘗試建立連接失敗, 銷毀MQTT實例, 回收資源。 */
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_connect failed: -0x%04X\n", -res);
return -1;
}
步驟四:開啟?;罹€程
調(diào)用aiot_mqtt_process,向服務(wù)器發(fā)送心跳報文,使設(shè)備保持長連接狀態(tài),并重發(fā)QoS=1
的未應(yīng)答報文。
開啟?;罹€程。
res = pthread_create(&g_mqtt_process_thread, NULL, demo_mqtt_process_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_process_thread failed: %d\n", res); return -1; }
設(shè)置?;罹€程處理函數(shù)。
void *demo_mqtt_process_thread(void *args) { int32_t res = STATE_SUCCESS; while (g_mqtt_process_thread_running) { res = aiot_mqtt_process(args); if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } sleep(1); } return NULL; }
步驟五:開啟接收線程
調(diào)用aiot_mqtt_recv,收取服務(wù)器下發(fā)的MQTT消息,根據(jù)消息回調(diào)函數(shù),執(zhí)行對應(yīng)處理。在斷線時自動重連,根據(jù)事件回調(diào)函數(shù),執(zhí)行對應(yīng)處理。
開啟接收線程。
res = pthread_create(&g_mqtt_recv_thread, NULL, demo_mqtt_recv_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_recv_thread failed: %d\n", res); return -1; }
設(shè)置接收線程處理函數(shù)。
void *demo_mqtt_recv_thread(void *args) { int32_t res = STATE_SUCCESS; while (g_mqtt_recv_thread_running) { res = aiot_mqtt_recv(args); if (res < STATE_SUCCESS) { if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } sleep(1); } } return NULL; }
步驟六:訂閱Topic
調(diào)用aiot_mqtt_sub,訂閱指定Topic。
示例代碼:
{ char *sub_topic = "/a18wP******/LightSwitch/user/get"; res = aiot_mqtt_sub(mqtt_handle, sub_topic, NULL, 1, NULL); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res); return -1; } }
說明完成配置后,請刪除相關(guān)代碼兩邊的注釋符號。
相關(guān)參數(shù):
參數(shù)
示例
說明
sub_topic
/a18wP******/LightSwitch/user/get
擁有訂閱權(quán)限的Topic。
a18wP******
為設(shè)備的ProductKey。LightSwitch
為設(shè)備的DeviceName。
本示例為默認(rèn)的自定義Topic。
設(shè)備通過該Topic,可接收物聯(lián)網(wǎng)平臺的消息。
關(guān)于Topic的更多信息,請參見什么是Topic。
步驟七:發(fā)送消息
調(diào)用aiot_mqtt_pub,向指定Topic發(fā)送消息。
示例代碼:
{ char *pub_topic = "/a18wP******/LightSwitch/user/update"; char *pub_payload = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}"; res = aiot_mqtt_pub(mqtt_handle, pub_topic, (uint8_t *)pub_payload, (uint32_t)strlen(pub_payload), 0); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res); return -1; } }
說明完成配置后,請刪除相關(guān)代碼兩邊的注釋符號。
相關(guān)參數(shù):
參數(shù)
示例
說明
pub_topic
/a18wP******/LightSwitch/user/update
擁有發(fā)布權(quán)限的Topic。
a18wP******
為設(shè)備的ProductKey。LightSwitch
為設(shè)備的DeviceName。
設(shè)備通過該Topic向物聯(lián)網(wǎng)平臺發(fā)送消息。
關(guān)于Topic的更多信息,請參見什么是Topic。
pub_payload
{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}
上報至物聯(lián)網(wǎng)平臺的消息內(nèi)容。
由于示例消息的Topic類型為自定義,因此數(shù)據(jù)格式可自定義。
關(guān)于數(shù)據(jù)格式的更多信息,請參見數(shù)據(jù)格式。
設(shè)備與物聯(lián)網(wǎng)平臺建立MQTT通信后,請確保通信量不超過閾值。
通信限制的更多信息,請參見連接通信。
如果通信超過閾值,請登錄物聯(lián)網(wǎng)平臺查看堆積消息。更多信息,請參見查看和監(jiān)控消費組。
步驟八:斷開連接
MQTT接入常應(yīng)用于長連接的設(shè)備,程序通常不會運行至此。
例程的主線程任務(wù)為配置參數(shù)并成功建立連接。連接建立后,主線程可進入休眠。
調(diào)用aiot_mqtt_disconnect,向物聯(lián)網(wǎng)平臺發(fā)送斷開連接的報文,然后斷開網(wǎng)絡(luò)連接。
res = aiot_mqtt_disconnect(mqtt_handle);
if (res < STATE_SUCCESS) {
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_disconnect failed: -0x%04X\n", -res);
return -1;
}
步驟九:退出程序
調(diào)用aiot_mqtt_deinit,銷毀MQTT客戶端實例,釋放資源。
res = aiot_mqtt_deinit(&mqtt_handle);
if (res < STATE_SUCCESS) {
printf("aiot_mqtt_deinit failed: -0x%04X\n", -res);
return -1;
}
后續(xù)步驟