步驟三:調(diào)用SDK收發(fā)消息
本文以Java SDK為例,說明如何將開源SDK客戶端接入云消息隊(duì)列 RabbitMQ 版服務(wù)端,并完成消息收發(fā)。
前提條件
您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA Ultimate為例。
安裝Java依賴庫
在IDEA中創(chuàng)建一個Java工程。
在pom.xml文件中添加以下依賴引入Java依賴庫。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- 支持開源所有版本 --> </dependency>
生產(chǎn)消息
在已創(chuàng)建的Java工程中,創(chuàng)建消息發(fā)送程序,按照SDK參數(shù)填寫說明配置相關(guān)參數(shù)并運(yùn)行。
示例代碼如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException {
//設(shè)置實(shí)例的接入點(diǎn)。
String hostName = "xxx.xxx.aliyuncs.com";
//設(shè)置實(shí)例的靜態(tài)用戶名密碼。
String userName = "${UserName}";
String passWord = "${PassWord}";
//設(shè)置實(shí)例的Vhost。
String virtualHost = "${VirtualHost}";
//在生產(chǎn)環(huán)境中,建議提前創(chuàng)建好Connection,并在需要時重復(fù)使用,避免頻繁創(chuàng)建和關(guān)閉Connection,以提高性能、復(fù)用連接資源,以及保證系統(tǒng)的穩(wěn)定性。
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
Channel channel = connection.createChannel();
//設(shè)置Exchange、Queue和綁定關(guān)系。
String exchangeName = "${ExchangeName}";
String queueName = "${QueueName}";
String routingKey = "${RoutingKey}";
//設(shè)置Exchange類型。
String exchangeType = "${ExchangeType}";
//此處為了體驗(yàn)流暢,確保了Exchange和Queue的創(chuàng)建過程。
//在生產(chǎn)環(huán)境中,建議在控制臺提前創(chuàng)建,盡量避免在代碼中直接聲明,否則可能觸發(fā)單API調(diào)用的限流。
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, routingKey);
//開始發(fā)送消息。
for (int i = 0; i < 10; i++ ) {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(exchangeName, routingKey, true, props,
("消息發(fā)送示例Body-" + i).getBytes(StandardCharsets.UTF_8));
System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + routingKey);
}
connection.close();
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//設(shè)置為true,開啟Connection自動恢復(fù)功能;設(shè)置為false,關(guān)閉Connection自動恢復(fù)功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
//默認(rèn)端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
//基于網(wǎng)絡(luò)環(huán)境合理設(shè)置超時時間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
}
}
云消息隊(duì)列 RabbitMQ 版會對單實(shí)例的TPS流量峰值進(jìn)行限流,更多限流信息,請參見實(shí)例限流最佳實(shí)踐。
訂閱消息
在已創(chuàng)建的Java工程中,創(chuàng)建消息訂閱程序,按照SDK參數(shù)填寫說明配置相關(guān)參數(shù)并運(yùn)行。
示例代碼如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
//設(shè)置實(shí)例的接入點(diǎn)。
String hostName = "xxx.xxx.aliyuncs.com";
//設(shè)置實(shí)例的靜態(tài)用戶名密碼。
String userName = "${UserName}";
String passWord = "${PassWord}";
//設(shè)置實(shí)例的Vhost。
String virtualHost = "${VirtualHost}";
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
final Channel channel = connection.createChannel();
//聲明Queue。
String queueName = "${QueueName}";
//創(chuàng)建${QueueName} ,該Queue必須在云消息隊(duì)列RabbitMQ版控制臺上已存在。
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
//開始消費(fèi)消息。
channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,進(jìn)行業(yè)務(wù)邏輯處理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//設(shè)置為true,開啟Connection自動恢復(fù)功能;設(shè)置為false,關(guān)閉Connection自動恢復(fù)功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
// 默認(rèn)端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
};
}
SDK參數(shù)填寫說明
參數(shù) | 示例值 | 描述 |
hostName | XXX.net.mq.amqp.aliyuncs.com | 云消息隊(duì)列 RabbitMQ 版實(shí)例接入點(diǎn)。獲取方式,請參見步驟二:創(chuàng)建資源。 |
Port | 5672 | 默認(rèn)端口。非加密端口為5672,加密端口為5671。 |
userName | MjoxODgwNzcwODY5MD**** | 客戶端接入云消息隊(duì)列 RabbitMQ 版服務(wù)端用于權(quán)限認(rèn)證的靜態(tài)用戶名。 需要提前在云消息隊(duì)列 RabbitMQ 版控制臺創(chuàng)建。 具體操作,請參見步驟二:創(chuàng)建資源。 |
passWord | NDAxREVDQzI2MjA0OT**** | 客戶端接入云消息隊(duì)列 RabbitMQ 版服務(wù)端用于權(quán)限認(rèn)證的靜態(tài)用戶密碼。 需要提前在云消息隊(duì)列 RabbitMQ 版控制臺創(chuàng)建。 具體操作,請參見步驟二:創(chuàng)建資源。 |
virtualHost | amqp_vhost | 云消息隊(duì)列 RabbitMQ 版實(shí)例的Vhost。需要提前在云消息隊(duì)列 RabbitMQ 版控制臺創(chuàng)建。 具體操作,請參見步驟二:創(chuàng)建資源。 |
exchangeName | ExchangeTest | 云消息隊(duì)列 RabbitMQ 版的Exchange。 需要提前在云消息隊(duì)列 RabbitMQ 版控制臺創(chuàng)建。 具體操作,請參見步驟二:創(chuàng)建資源。 |
queueName | QueueTest | 云消息隊(duì)列 RabbitMQ 版的Queue。 需要提前在云消息隊(duì)列 RabbitMQ 版控制臺創(chuàng)建。 具體操作,請參見步驟二:創(chuàng)建資源。 |
routingKey | RoutingKeyTest | 云消息隊(duì)列 RabbitMQ 版Exchange與Queue綁定的Routing Key。 需要提前在云消息隊(duì)列 RabbitMQ 版控制臺創(chuàng)建綁定關(guān)系。 具體操作,請參見步驟二:創(chuàng)建資源。 |
exchangeType | topic | Exchange的類型。云消息隊(duì)列 RabbitMQ 版支持的類型如下,更多信息,請參見Exchange。
重要 請確保填寫的Exchange類型和您創(chuàng)建Exchange時選擇的類型一致。 |
相關(guān)文檔
云消息隊(duì)列 RabbitMQ 版與開源RabbitMQ完全兼容,支持多語言SDK。更多語言SDK,請參見開源RabbitMQ AMQP協(xié)議支持的多語言或框架SDK,更多參數(shù)說明,請參見開源RabbitMQ客戶端文檔。
客戶端運(yùn)行時若返回異常報(bào)錯,您可以參考錯誤碼說明查看異常原因和解決方案。