本文介紹使用.NET語言的AMQP SDK接入阿里云物聯網平臺,接收服務端訂閱消息的示例。
前提條件
已獲取消費組ID,并訂閱Topic消息。
管理消費組:您可使用物聯網平臺默認消費組(DEFAULT_GROUP)或創建消費組。
配置AMQP服務端訂閱:您可通過消費組訂閱需要的Topic消息。
開發環境
本示例使用的開發環境要求如下表。
Framework | 支持版本 |
.NET Framework | 3.5、4.0、4.5及以上版本 |
.NET Micro Framework | 4.2及以上版本 |
.NET nanoFramework | 1.0及以上版本 |
.NET Compact Framework | 3.9及以上版本 |
.Net Core on Windows 10 and Ubuntu 14.04 | 1.0及以上版本 |
Mono | 4.2.1及以上版本 |
下載SDK
.NET版本AMQP SDK,推薦使用AMQP.Net Lite庫。請訪問AMQP.Net Lite下載庫和查看使用說明。
添加依賴
在packages.config中添加以下依賴。
<packages>
<package id="AMQPNetLite" version="2.2.0" targetFramework="net47" />
</packages>
代碼示例
using System;
using System.Text;
using Amqp;
using Amqp.Sasl;
using Amqp.Framing;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Security.Cryptography;
namespace amqp
{
class MainClass
{
//接入域名,請參見AMQP客戶端接入說明文檔。
static string Host = "${YourHost}";
static int Port = 5671;
// 工程代碼泄露可能會導致 AccessKey 泄露,并威脅賬號下所有資源的安全性。以下代碼示例使用環境變量獲取 AccessKey 的方式進行調用,僅供參考
static string AccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
static string AccessSecret = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
static string consumerGroupId = "${YourConsumerGroupId}";
static string clientId = "${YourClientId}";
//iotInstanceId:實例ID。
static string iotInstanceId = "${YourIotInstanceId}";
static int Count = 0;
static int IntervalTime = 10000;
static Address address;
public static void Main(string[] args)
{
long timestamp = GetCurrentMilliseconds();
string param = "authId=" + AccessKey + "×tamp=" + timestamp;
//userName組裝方法,請參見AMQP客戶端接入說明文檔。
string userName = clientId + "|authMode=aksign,signMethod=hmacmd5,consumerGroupId=" + consumerGroupId
+ ",iotInstanceId=" + iotInstanceId + ",authId=" + AccessKey + ",timestamp=" + timestamp + "|";
//計算簽名,password組裝方法,請參見AMQP客戶端接入說明文檔。
string password = doSign(param, AccessSecret, "HmacMD5");
DoConnectAmqp(userName, password);
ManualResetEvent resetEvent = new ManualResetEvent(false);
resetEvent.WaitOne();
}
static void DoConnectAmqp(string userName, string password)
{
address = new Address(Host, Port, userName, password);
//創建Connection。
ConnectionFactory cf = new ConnectionFactory();
//如果需要,使用本地TLS。
//cf.SSL.ClientCertificates.Add(GetCert());
//cf.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
cf.SASL.Profile = SaslProfile.External;
cf.AMQP.IdleTimeout = 120000;
//cf.AMQP.ContainerId、cf.AMQP.HostName請自定義。
cf.AMQP.ContainerId = "client.1.2";
cf.AMQP.HostName = "contoso.com";
cf.AMQP.MaxFrameSize = 8 * 1024;
var connection = cf.CreateAsync(address).Result;
//Connection Exception已關閉。
connection.AddClosedCallback(ConnClosed);
//接收消息。
DoReceive(connection);
}
static void DoReceive(Connection connection)
{
//創建Session。
var session = new Session(connection);
//創建ReceiverLink并接收消息。
var receiver = new ReceiverLink(session, "queueName", null);
receiver.Start(20, (link, message) =>
{
object messageId = message.ApplicationProperties["messageId"];
object topic = message.ApplicationProperties["topic"];
string body = Encoding.UTF8.GetString((Byte[])message.Body);
//注意:此處不要有耗時的邏輯,如果這里要進行業務處理,請另開線程,否則會堵塞消費。如果消費一直延時,會增加消息重發的概率。
Console.WriteLine("receive message, topic=" + topic + ", messageId=" + messageId + ", body=" + body);
//ACK消息。
link.Accept(message);
});
}
//連接發生異常后,進入重連模式。
//這里只是一個簡單重試的示例,您可以采用指數退避方式,來完善異常場景,重連策略。
static void ConnClosed(IAmqpObject _, Error e)
{
Console.WriteLine("ocurr error: " + e);
if(Count < 3)
{
Count += 1;
Thread.Sleep(IntervalTime * Count);
}
else
{
Thread.Sleep(120000);
}
//重連。
DoConnectAmqp(address.User, address.Password);
}
static X509Certificate GetCert()
{
string certPath = Environment.CurrentDirectory + "/root.crt";
X509Certificate crt = new X509Certificate(certPath);
return crt;
}
static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
return true;
}
static long GetCurrentMilliseconds()
{
DateTime dt1970 = new DateTime(1970, 1, 1);
DateTime current = DateTime.Now;
return (long)(current - dt1970).TotalMilliseconds;
}
//簽名方法:支持hmacmd5,hmacsha1和hmacsha256。
static string doSign(string param, string accessSecret, string signMethod)
{
//signMethod = HmacMD5
byte[] key = Encoding.UTF8.GetBytes(accessSecret);
byte[] signContent = Encoding.UTF8.GetBytes(param);
var hmac = new HMACMD5(key);
byte[] hashBytes = hmac.ComputeHash(signContent);
return Convert.ToBase64String(hashBytes);
}
}
}
您需按照如下表格中的參數說明,修改代碼中的參數值。更多參數說明,請參見AMQP客戶端接入說明。
請確保參數值輸入正確,否則AMQP客戶端接入會失敗。
參數 | 說明 |
Host | AMQP接入域名。
|
AccessKey | 登錄物聯網平臺控制臺,將鼠標移至賬號頭像上,然后單擊AccessKey管理,獲取AccessKey ID和AccessKey Secret。 說明 如果使用RAM用戶,您需授予該RAM用戶管理物聯網平臺的權限(AliyunIOTFullAccess),否則將連接失敗。授權方法請參見RAM用戶訪問。 |
AccessSecret | |
consumerGroupId | 當前物聯網平臺對應實例中的消費組ID。 登錄物聯網平臺控制臺,在對應實例的 查看您的消費組ID。 |
iotInstanceId | 實例ID。您可在物聯網平臺控制臺的實例概覽頁面,查看當前實例的ID。
|
clientId | 表示客戶端ID,需您自定義,長度不可超過64個字符。建議使用您的AMQP客戶端所在服務器UUID、MAC地址、IP等唯一標識。 AMQP客戶端接入并啟動成功后,登錄物聯網平臺控制臺,在對應實例的 頁簽,單擊消費組對應的查看,消費組詳情頁面將顯示該參數,方便您識別區分不同的客戶端。 |
運行結果示例
成功:返回類似如下日志信息,表示AMQP客戶端已接入物聯網平臺并成功接收消息。
參數
示例
說明
topic
/***********/******/thing/event/property/post
設備屬性上報的Topic。
messageId
2**************7
消息的ID。
body
{"deviceType":"CustomCategory","iotId":"4EwuVV***","requestId":"161268***","checkFailedData":{},"productKey":"g4***S","gmtCreate":1612682173249,"deviceName":"Esensor","items":{"temperature":{"value":-1,"time":1612682173247},"humidity":{"value":74,"time":1612682173247}}}
消息的內容。
失敗:返回類似如下日志信息,表示AMQP客戶端連接物聯網平臺失敗。
相關文檔
服務端訂閱消息相關錯誤碼,請參見消息相關錯誤碼。