操作鏈路概述
本文將引導(dǎo)您快速體驗(yàn) SOFAStack 消息隊(duì)列,從創(chuàng)建資源、配置接入點(diǎn)到使用 SDK 收發(fā)消息。
具體操作步驟如下:
創(chuàng)建資源
注意事項(xiàng)
在使用 SOFAStack 消息隊(duì)列時(shí),請(qǐng)注意以下網(wǎng)絡(luò)訪問限制:
Topic 和 Group ID 需創(chuàng)建在同一個(gè)地域(Region)下的同一個(gè)工作空間中才能互通。例如,當(dāng)某 Topic 創(chuàng)建在華東 1(杭州)下的工作空間 A 中,那么該 Topic 只能被在華東 1(杭州)下的工作空間 A 中創(chuàng)建的 Group ID 對(duì)應(yīng)的生產(chǎn)端和消費(fèi)端訪問。
目前不支持公網(wǎng)訪問,生產(chǎn)端和消費(fèi)端需要部署在相同地域的 ECS 上,或者保證網(wǎng)絡(luò)聯(lián)通。
創(chuàng)建工作空間
要使用消息隊(duì)列,您需要確保 SOFAStack 控制臺(tái)已創(chuàng)建至少一個(gè)工作空間。如 SOFAStack 未創(chuàng)建工作空間或您需要?jiǎng)?chuàng)建一個(gè)新的工作空間,可參見 添加工作空間。創(chuàng)建好工作空間后,將為您自動(dòng)創(chuàng)建一個(gè)消息隊(duì)列實(shí)例。
創(chuàng)建 Topic
Topic 是消息隊(duì)列里對(duì)消息的一級(jí)歸類。消息生產(chǎn)者將消息發(fā)送到一個(gè) Topic,而消息消費(fèi)者則通過訂閱該 Topic 來獲取和消費(fèi)消息。
在左側(cè)導(dǎo)航欄,選擇 中間件 > 消息隊(duì)列 > Topic 管理。
單擊 創(chuàng)建 Topic,然后在 創(chuàng)建 Topic 對(duì)話框配置 Topic 信息:
參數(shù)
是否必填
說明
Topic
必填
Topic 格式要求如下:
Topic 只能包含英文、數(shù)字、 短橫線(-)和下劃線(_),其中英文和數(shù)字必須要有一種, 短橫線(-)和下劃線(_)可選。
長(zhǎng)度需控制在 3~64 個(gè)字符之間。
命名不能以“CID”和“GID”開頭。
消息類型
必填
支持的消息類型有普通消息、分區(qū)順序消息、事務(wù)消息和定時(shí)消息。詳細(xì)消息類型的說明,可參見 消息類型。
描述
選填
對(duì)該 Topic 的備注信息,長(zhǎng)度限制在 256 個(gè)字符以內(nèi)。
單擊 確定。
創(chuàng)建 Group ID
創(chuàng)建完 Topic 后,您需要為消息的消費(fèi)者(或生產(chǎn)者)創(chuàng)建客戶端 ID ,即 Group ID 作為標(biāo)識(shí)。
Group ID 和 Topic 的關(guān)系是 N:N,即一個(gè)消費(fèi)者可以訂閱多個(gè) Topic,同一個(gè) Topic 也可以被多個(gè)消費(fèi)者訂閱;一個(gè)生產(chǎn)者可以向多個(gè) Topic 發(fā)送消息,同一個(gè) Topic 也可以接收來自多個(gè)生產(chǎn)者的消息。
消費(fèi)者必須有對(duì)應(yīng)的 Group ID,生產(chǎn)者不做強(qiáng)制要求。
在左側(cè)導(dǎo)航欄,選擇 中間件 > 消息隊(duì)列 > Group 管理。
單擊 創(chuàng)建 Group ID,然后在 創(chuàng)建 Group ID 對(duì)話框配置 Group ID 信息:
參數(shù)
是否必填
說明
Group ID
必填
Group ID 格式要求如下:
命名以 “GID_” 或者 “GID-” 開頭。
只能包含字母、數(shù)字、短橫線和下劃線。
長(zhǎng)度限制在 7~64 字符之間。
說明Group ID 一旦創(chuàng)建,則無法修改。
描述
選填
對(duì)該 Group ID 的備注信息,長(zhǎng)度限制在 256 個(gè)字符以內(nèi)。
單擊 確定。
獲取 AK(AccessKey ID)和 SK(AccessKey Secret)
阿里云 AccessKey 用于收發(fā)消息時(shí)進(jìn)行賬戶鑒權(quán)。
在調(diào)用 SDK 發(fā)送和訂閱消息的時(shí)候,除了需要指定創(chuàng)建的 Topic 和 Group ID 以外,還需輸入您在 RAM 控制臺(tái)創(chuàng)建的身份驗(yàn)證信息,即 AccessKey。AccessKey 的信息包含 AccessKeyId 和 AccessKeySecret。
由于 RAM 是阿里云產(chǎn)品,非阿里云飛天底座輸出的環(huán)境中可使用螞蟻 IAM 創(chuàng)建訪問密鑰(AccessKey)做身份驗(yàn)證。
創(chuàng)建 AccessKey 的具體步驟,參見 創(chuàng)建 AccessKey。
獲取接入配置
在控制臺(tái)創(chuàng)建好資源后,您需通過控制臺(tái)獲取工作空間的接入點(diǎn)。在收發(fā)消息時(shí),您需要為生產(chǎn)端和消費(fèi)端配置該接入點(diǎn),以此接入某個(gè)具體工作空間或地域的服務(wù)。
在左側(cè)導(dǎo)航欄,選擇 中間件 > 消息隊(duì)列 > 概覽。
在頁面底部的 接入配置 找到 實(shí)例 ID 及 內(nèi)網(wǎng)接入點(diǎn)。
將 內(nèi)網(wǎng)接入點(diǎn) 配置到客戶端的 SDK 代碼的 ENDPOINT 參數(shù)。
將 實(shí)例 ID 配置到客戶端的 SDK 代碼的 INSTANCE_ID 參數(shù)。
發(fā)送消息
您可以通過控制臺(tái)發(fā)送測(cè)試消息或通過調(diào)用 TCP Java SDK 發(fā)送消息。
發(fā)送測(cè)試消息
用于快速驗(yàn)證 Topic 資源的可用性,主要用作測(cè)試。
在左側(cè)導(dǎo)航欄,選擇 中間件 > 消息隊(duì)列 > Topic 管理。
在 Topic 管理頁面,找到您剛剛創(chuàng)建的 Topic,單擊右側(cè)操作列的 發(fā)送測(cè)試消息。
在 發(fā)送測(cè)試消息 對(duì)話框中的 消息體 一欄,輸入消息的具體內(nèi)容,單擊 確定。控制臺(tái)即會(huì)返回消息發(fā)送成功通知以及相應(yīng)的 Message ID。
調(diào)用 SDK 發(fā)送消息
通過 Maven 方式引入依賴。Java SDK 的最新版本號(hào),可參見 SDK 版本說明。
<dependency> <groupId>com.alipay.sofa</groupId> <artifactId>sofamq-client-all</artifactId> <version>"XXX"</version> //設(shè)置為 Java SDK 的最新版本號(hào) </dependency> <repositories> <repository> <id>antcloudrelease</id> <name>Ant Cloud</name> <url>http://mvn.cloud.alipay.com/nexus/content/groups/open</url> </repository> </repositories>
根據(jù)以下說明設(shè)置相關(guān)參數(shù),運(yùn)行示例代碼:
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.Message; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; import io.openmessaging.api.OMSBuiltinKeys; import io.openmessaging.api.Producer; import io.openmessaging.api.SendResult; public class Main{ public static void main(String... args){ Properties credentials =new Properties(); // 阿里云賬號(hào) AccessKey 擁有所有 API 的訪問權(quán)限,風(fēng)險(xiǎn)很高。強(qiáng)烈建議您創(chuàng)建并使用 RAM 用戶進(jìn)行 API 訪問或日常運(yùn)維,請(qǐng)登錄 RAM 控制臺(tái)創(chuàng)建 RAM 用戶。 // 此處以把 AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。 // 強(qiáng)烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會(huì)存在密鑰泄漏風(fēng)險(xiǎn) credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV"); credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV"); // 設(shè)置 TCP 接入域名,進(jìn)入控制臺(tái)的概覽頁面查看接入點(diǎn)配置 MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint") .withCredentials(credentials).build(); Properties properties =new Properties(); // 設(shè)置用戶實(shí)例,進(jìn)入控制臺(tái)的概覽頁面查看接入點(diǎn)配置 properties.setProperty(PropertyKeyConst.INSTANCE_ID,"$instanceId"); // 您在控制臺(tái)創(chuàng)建的 Group ID properties.setProperty(PropertyKeyConst.GROUP_ID,"YOUR_GROUP"); Producer producer = accessPoint.createProducer(properties); producer.start(); Message message =new Message("$topic","YOUR_TAG","hello world".getBytes()); SendResult sendResult = producer.send(message); System.out.println(sendResult); } }
消息發(fā)送后,您可以在控制臺(tái)查看消息發(fā)送狀態(tài),步驟如下:
在左側(cè)導(dǎo)航欄,選擇 中間件 > 消息隊(duì)列 > 消息查詢。
單擊 按 Message ID 查詢,在搜索框中輸入發(fā)送消息后返回的 Message ID,單擊 搜索 查詢消息發(fā)送狀態(tài)。
存儲(chǔ)時(shí)間 表示消息隊(duì)列服務(wù)端存儲(chǔ)這條消息的時(shí)間。如果查詢到此消息,表示消息已經(jīng)成功發(fā)送到服務(wù)端。
訂閱消息
消息發(fā)送成功后,需要啟動(dòng)消費(fèi)者來訂閱消息。
調(diào)用 TCP Java SDK 訂閱消息。您可以運(yùn)行以下示例代碼來啟動(dòng)消費(fèi)者,并測(cè)試訂閱消息的功能。請(qǐng)按照說明正確設(shè)置相關(guān)參數(shù)。
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.Action; import io.openmessaging.api.ConsumeContext; import io.openmessaging.api.Consumer; import io.openmessaging.api.Message; import io.openmessaging.api.MessageListener; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; import io.openmessaging.api.OMSBuiltinKeys; public class Main{ public static void main(String... args){ Properties credentials =new Properties(); // 阿里云賬號(hào)AccessKey擁有所有API的訪問權(quán)限,風(fēng)險(xiǎn)很高。強(qiáng)烈建議您創(chuàng)建并使用RAM用戶進(jìn)行API訪問或日常運(yùn)維,請(qǐng)登錄RAM控制臺(tái)創(chuàng)建RAM用戶。 // 此處以把AccessKey和AccessKeySecret保存在環(huán)境變量為例說明。 // 強(qiáng)烈建議不要把AccessKey和AccessKeySecret保存到代碼里,會(huì)存在密鑰泄漏風(fēng)險(xiǎn) credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV"); credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV"); // 設(shè)置 TCP 接入域名,進(jìn)入控制臺(tái)的概覽頁面查看接入點(diǎn)配置 MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint") .withCredentials(credentials).build(); Properties properties =new Properties(); // 設(shè)置用戶實(shí)例,進(jìn)入控制臺(tái)的概覽頁面查看接入點(diǎn)配置 properties.setProperty(PropertyKeyConst.INSTANCE_ID,"$instanceId"); // 您在控制臺(tái)創(chuàng)建的 Group ID properties.setProperty(PropertyKeyConst.GROUP_ID,"YOUR_GROUP"); Consumer consumer = accessPoint.createConsumer(properties); consumer.subscribe("YOUR_TOPIC","YOUR_TAG",new MessageListener(){ @Override public Action consume(Message message,ConsumeContext context){ System.out.println(new String(message.getBody())); return Action.CommitMessage; } }); consumer.start(); } }
完成上述步驟后,您可以在控制臺(tái)查看消費(fèi)者是否啟動(dòng)成功,即消息訂閱是否成功。
在左側(cè)導(dǎo)航欄,選擇 中間件 > 消息隊(duì)列 > Group 管理。
單擊目標(biāo) Group ID 名稱進(jìn)入詳情頁。
單擊 訂閱關(guān)系。
如果 是否在線 顯示為 在線,且訂閱關(guān)系一致,則說明訂閱成功。否則說明訂閱失敗。