本文介紹如何通過事件總線EventBridge將云消息隊列 RabbitMQ 版的數據推送到函數計算。
前提條件
您已完成以下操作:
步驟一:添加自定義事件源
- 登錄事件總線EventBridge控制臺。
- 在左側導航欄,單擊事件總線。
- 在頂部菜單欄,選擇地域。
在事件總線頁面,單擊已創建的自定義事件總線。
- 在左側導航欄,單擊事件源。
在事件源頁面,單擊添加事件源。
在添加自定義事件源面板,輸入名稱和描述,事件提供方選擇消息隊列 RabbitMQ 版,并選擇已創建的云消息隊列 RabbitMQ 版的資源信息等,然后單擊確定。
步驟二:創建事件規則
重要 目標服務和事件規則必須處于同一地域。
- 登錄事件總線EventBridge控制臺,在左側導航欄,單擊事件總線。
- 在頂部菜單欄,選擇地域,在事件總線頁面,單擊目標總線名稱。
- 在左側導航欄,單擊事件規則,然后單擊創建規則。
- 在創建規則頁面,完成以下操作。
- 在配置基本信息配置向導,在名稱文本框輸入規則名稱,在描述文本框輸入規則的描述,然后單擊下一步。
- 在配置事件模式配置向導,事件源類型選擇自定義事件源,事件源選擇步驟一添加的自定義事件源,在事件模式內容代碼框輸入事件模式,然后單擊下一步。
如需了解更多信息,請參見事件模式。
- 在配置事件目標配置向導,配置事件目標,然后單擊創建。
步驟三:發布事件
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException {
//設置實例的接入點。
String hostName = "xxx.xxx.aliyuncs.com";
//設置實例的靜態用戶名密碼。
String userName = "${UserName}";
String passWord = "${PassWord}";
//設置實例的Vhost。
String virtualHost = "${VirtualHost}";
//在生產環境中,建議提前創建好Connection,并在需要時重復使用,避免頻繁創建和關閉Connection,以提高性能、復用連接資源,以及保證系統的穩定性。
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
Channel channel = connection.createChannel();
//設置Exchange、Queue和綁定關系。
String exchangeName = "${ExchangeName}";
String queueName = "${QueueName}";
String routingKey = "${RoutingKey}";
//設置Exchange類型。
String exchangeType = "${ExchangeType}";
//此處為了體驗流暢,確保了Exchange和Queue的創建過程。
//在生產環境中,建議在控制臺提前創建,盡量避免在代碼中直接聲明,否則可能觸發單API調用的限流。
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, routingKey);
//開始發送消息。
for (int i = 0; i < 10; i++ ) {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(exchangeName, routingKey, true, props,
("消息發送示例Body-" + i).getBytes(StandardCharsets.UTF_8));
System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + routingKey);
}
connection.close();
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//設置為true,開啟Connection自動恢復功能;設置為false,關閉Connection自動恢復功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
//默認端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
//基于網絡環境合理設置超時時間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
}
}
結果驗證
您可以在函數計算控制臺使用表盤解讀數據指標。
登錄函數計算控制臺。
- 在左側導航欄,單擊服務及函數。
- 在頂部菜單欄,選擇地域。
- 在服務列表頁面,找到目標服務,在其右側操作列單擊函數管理。
- 在函數管理頁面,找到目標函數,單擊目標函數名稱。
- 在函數詳情頁面,單擊調用日志頁簽,查看日志。
FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6**** 2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject. 2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****
常見問題
事件發布失敗,我該如何定位問題?
如果事件發布失敗,您可以查看事件軌跡,在事件軌跡頁面的事件投遞區域查看投遞詳情,獲取投遞響應。針對不同投遞響應提示,采取相應的解決措施。
發布到函數計算的事件發布失敗,且投遞響應為[500]ConnectErrorconnectiontimedout,我該如何處理?
文檔內容是否對您有幫助?