Go SDK接入示例
本文介紹使用AMQP協(xié)議的Go客戶(hù)端接入阿里云物聯(lián)網(wǎng)平臺(tái),接收服務(wù)端訂閱消息的示例。
前提條件
已獲取消費(fèi)組ID,并訂閱Topic消息。
管理消費(fèi)組:您可使用物聯(lián)網(wǎng)平臺(tái)默認(rèn)消費(fèi)組(DEFAULT_GROUP)或創(chuàng)建消費(fèi)組。
配置AMQP服務(wù)端訂閱:您可通過(guò)消費(fèi)組訂閱需要的Topic消息。
準(zhǔn)備開(kāi)發(fā)環(huán)境
本示例的測(cè)試環(huán)境為Go 1.12.7。
下載SDK
可使用以下命令導(dǎo)入Go語(yǔ)言AMQP SDK。
import "pack.ag/amqp"
SDK使用說(shuō)明,請(qǐng)參見(jiàn)package amqp。
代碼示例
package main
import (
"os"
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"pack.ag/amqp"
"time"
)
//參數(shù)說(shuō)明,請(qǐng)參見(jiàn)AMQP客戶(hù)端接入說(shuō)明文檔。
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
//iotInstanceId:實(shí)例ID。
const iotInstanceId = "${YourIotInstanceId}"
//接入域名,請(qǐng)參見(jiàn)AMQP客戶(hù)端接入說(shuō)明文檔。
const host = "${YourHost}"
func main() {
//工程代碼泄露可能會(huì)導(dǎo)致 AccessKey 泄露,并威脅賬號(hào)下所有資源的安全性。以下代碼示例使用環(huán)境變量獲取 AccessKey 的方式進(jìn)行調(diào)用,僅供參考
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
accessSecret := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
address := "amqps://" + host + ":5671"
timestamp := time.Now().Nanosecond() / 1000000
//userName組裝方法,請(qǐng)參見(jiàn)AMQP客戶(hù)端接入說(shuō)明文檔。
userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|",
clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
stringToSign := fmt.Sprintf("authId=%s×tamp=%d", accessKey, timestamp)
hmacKey := hmac.New(sha1.New, []byte(accessSecret))
hmacKey.Write([]byte(stringToSign))
//計(jì)算簽名,password組裝方法,請(qǐng)參見(jiàn)AMQP客戶(hù)端接入說(shuō)明文檔。
password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))
amqpManager := &AmqpManager{
address:address,
userName:userName,
password:password,
}
//如果需要做接受消息通信或者取消操作,從Background衍生context。
ctx := context.Background()
amqpManager.startReceiveMessage(ctx)
}
//業(yè)務(wù)函數(shù)。用戶(hù)自定義實(shí)現(xiàn),該函數(shù)被異步執(zhí)行,請(qǐng)考慮系統(tǒng)資源消耗情況。
func (am *AmqpManager) processMessage(message *amqp.Message) {
fmt.Println("data received:", string(message.GetData()), " properties:", message.ApplicationProperties)
}
type AmqpManager struct {
address string
userName string
password string
client *amqp.Client
session *amqp.Session
receiver *amqp.Receiver
}
func (am *AmqpManager) startReceiveMessage(ctx context.Context) {
childCtx, _ := context.WithCancel(ctx)
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
defer func() {
am.receiver.Close(childCtx)
am.session.Close(childCtx)
am.client.Close()
}()
for {
//阻塞接受消息,如果ctx是background則不會(huì)被打斷。
message, err := am.receiver.Receive(ctx)
if nil == err {
go am.processMessage(message)
message.Accept()
} else {
fmt.Println("amqp receive data error:", err)
//如果是主動(dòng)取消,則退出程序。
select {
case <- childCtx.Done(): return
default:
}
//非主動(dòng)取消,則重新建立連接。
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
}
}
}
func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
//退避重連,從10ms依次x2,直到20s。
duration := 10 * time.Millisecond
maxDuration := 20000 * time.Millisecond
times := 1
//異常情況,退避重連。
for {
select {
case <- ctx.Done(): return amqp.ErrConnClosed
default:
}
err := am.generateReceiver()
if nil != err {
time.Sleep(duration)
if duration < maxDuration {
duration *= 2
}
fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
times ++
} else {
fmt.Println("amqp connect init success")
return nil
}
}
}
//由于包不可見(jiàn),無(wú)法判斷Connection和Session狀態(tài),重啟連接獲取。
func (am *AmqpManager) generateReceiver() error {
if am.session != nil {
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
//如果斷網(wǎng)等行為發(fā)生,Connection會(huì)關(guān)閉導(dǎo)致Session建立失敗,未關(guān)閉連接則建立成功。
if err == nil {
am.receiver = receiver
return nil
}
}
//清理上一個(gè)連接。
if am.client != nil {
am.client.Close()
}
client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
if err != nil {
return err
}
am.client = client
session, err := client.NewSession()
if err != nil {
return err
}
am.session = session
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
if err != nil {
return err
}
am.receiver = receiver
return nil
}
您需按照如下表格中的參數(shù)說(shuō)明,修改代碼中的參數(shù)值。更多參數(shù)說(shuō)明,請(qǐng)參見(jiàn)AMQP客戶(hù)端接入說(shuō)明。
請(qǐng)確保參數(shù)值輸入正確,否則AMQP客戶(hù)端接入會(huì)失敗。
參數(shù) | 說(shuō)明 |
accessKey | 登錄物聯(lián)網(wǎng)平臺(tái)控制臺(tái),將鼠標(biāo)移至賬號(hào)頭像上,然后單擊AccessKey管理,獲取AccessKey ID和AccessKey Secret。 說(shuō)明 如果使用RAM用戶(hù),您需授予該RAM用戶(hù)管理物聯(lián)網(wǎng)平臺(tái)的權(quán)限(AliyunIOTFullAccess),否則將連接失敗。授權(quán)方法請(qǐng)參見(jiàn)RAM用戶(hù)訪問(wèn)。 |
accessSecret | |
consumerGroupId | 當(dāng)前物聯(lián)網(wǎng)平臺(tái)對(duì)應(yīng)實(shí)例中的消費(fèi)組ID。 登錄物聯(lián)網(wǎng)平臺(tái)控制臺(tái),在對(duì)應(yīng)實(shí)例的 查看您的消費(fèi)組ID。 |
iotInstanceId | 實(shí)例ID。您可在物聯(lián)網(wǎng)平臺(tái)控制臺(tái)的實(shí)例概覽頁(yè)面,查看當(dāng)前實(shí)例的ID。
|
clientId | 表示客戶(hù)端ID,需您自定義,長(zhǎng)度不可超過(guò)64個(gè)字符。建議使用您的AMQP客戶(hù)端所在服務(wù)器UUID、MAC地址、IP等唯一標(biāo)識(shí)。 AMQP客戶(hù)端接入并啟動(dòng)成功后,登錄物聯(lián)網(wǎng)平臺(tái)控制臺(tái),在對(duì)應(yīng)實(shí)例的 頁(yè)簽,單擊消費(fèi)組對(duì)應(yīng)的查看,消費(fèi)組詳情頁(yè)面將顯示該參數(shù),方便您識(shí)別區(qū)分不同的客戶(hù)端。 |
host | AMQP接入域名。
|
運(yùn)行結(jié)果示例
成功:返回類(lèi)似如下日志信息,表示AMQP客戶(hù)端已接入物聯(lián)網(wǎng)平臺(tái)并成功接收消息。
失敗:返回類(lèi)似如下日志信息,表示AMQP客戶(hù)端連接物聯(lián)網(wǎng)平臺(tái)失敗。
您可根據(jù)日志提示,檢查代碼或網(wǎng)絡(luò)環(huán)境,然后修正問(wèn)題,重新運(yùn)行代碼。
相關(guān)文檔
服務(wù)端訂閱消息相關(guān)錯(cuò)誤碼,請(qǐng)參見(jiàn)消息相關(guān)錯(cuò)誤碼。