本文介紹如何通過事件總線EventBridge將消息隊列RocketMQ版的數據推送到函數計算。
前提條件
您已完成以下操作:
事件總線EventBridge
函數計算
消息隊列RocketMQ版
步驟一:添加自定義事件源
- 登錄事件總線EventBridge控制臺。
- 在左側導航欄,單擊事件總線。
- 在頂部菜單欄,選擇地域。
在事件總線頁面,單擊已創建的自定義事件總線。
- 在左側導航欄,單擊事件源。
在事件源頁面,單擊添加事件源。
在添加自定義事件源面板,輸入名稱和描述,事件提供方選擇消息隊列 RocketMQ 版,并選擇已創建的云消息隊列 RocketMQ 版的資源信息等,然后單擊確定。
步驟二:創建事件規則
重要 目標服務和事件規則必須處于同一地域。
- 登錄事件總線EventBridge控制臺,在左側導航欄,單擊事件總線。
- 在頂部菜單欄,選擇地域,在事件總線頁面,單擊目標總線名稱。
- 在左側導航欄,單擊事件規則,然后單擊創建規則。
- 在創建規則頁面,完成以下操作。
- 在配置基本信息配置向導,在名稱文本框輸入規則名稱,在描述文本框輸入規則的描述,然后單擊下一步。
- 在配置事件模式配置向導,事件源類型選擇自定義事件源,事件源選擇步驟一添加的自定義事件源,在事件模式內容代碼框輸入事件模式,然后單擊下一步。
如需了解更多信息,請參見事件模式。
- 在配置事件目標配置向導,配置事件目標,然后單擊創建。
步驟三:發布事件
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// AccessKey ID阿里云身份驗證,在阿里云服務器管理控制臺創建。
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
// AccessKey Secret阿里云身份驗證,在阿里云服務器管理控制臺創建。
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
//設置發送超時時間,單位毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設置TCP協議接入點,進入控制臺的實例詳情頁面的TCP協議客戶端接入點區域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
Message msg = new Message(
// Message所屬的Topic。
"TopicTestMQ",
// Message Tag,可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
"TagA",
// Message Body,任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 設置代表消息的業務關鍵屬性,請盡可能全局唯一。以方便您在無法正常收到消息情況下,可通過控制臺查詢消息并補發。
// 注意:不設置也不會影響消息正常收發。
msg.setKey("ORDERID_100");
// 異步發送消息,發送結果通過callback返回給客戶端。
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// 消息發送成功。
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// 在callback返回之前即可取得msgId。
System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
// 在應用退出前,銷毀Producer對象。 注意:如果不銷毀也沒有問題。
producer.shutdown();
}
}
結果驗證
您可以在函數計算控制臺使用表盤解讀數據指標。
登錄函數計算控制臺。
- 在左側導航欄,單擊服務及函數。
- 在頂部菜單欄,選擇地域。
- 在服務列表頁面,找到目標服務,在其右側操作列單擊函數管理。
- 在函數管理頁面,找到目標函數,單擊目標函數名稱。
- 在函數詳情頁面,單擊調用日志頁簽,查看日志。
FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6**** 2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject. 2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****
常見問題
事件發布失敗,我該如何定位問題?
如果事件發布失敗,您可以查看事件軌跡,在事件軌跡頁面的事件投遞區域查看投遞詳情,獲取投遞響應。針對不同投遞響應提示,采取相應的解決措施。
發布到函數計算的事件發布失敗,且投遞響應為[500]ConnectErrorconnectiontimedout,我該如何處理?
文檔內容是否對您有幫助?