主賬號(hào)和子賬號(hào)授權(quán)場景
阿里云的云消息隊(duì)列 RabbitMQ 版支持AMQP 0-9-1協(xié)議,兼容開源的RabbitMQ客戶端,您可以使用開源的客戶端SDK接入云消息隊(duì)列 RabbitMQ 版服務(wù)端進(jìn)行消息收發(fā)。
前提條件
背景信息
借助訪問控制RAM的RAM用戶,您可以實(shí)現(xiàn)阿里云賬號(hào)(主賬號(hào))和RAM用戶(子賬號(hào))權(quán)限分割,按需為RAM用戶賦予不同的權(quán)限,并避免因暴露阿里云賬號(hào)密鑰而造成安全風(fēng)險(xiǎn)。
收發(fā)消息流程(以Java語言為例)
云消息隊(duì)列 RabbitMQ 版與開源RabbitMQ完全兼容。更多語言SDK,請(qǐng)參見開源RabbitMQ AMQP協(xié)議支持的多語言或框架SDK。
獲取接入點(diǎn)
您需要在云消息隊(duì)列 RabbitMQ 版控制臺(tái)獲取實(shí)例的接入點(diǎn)。在收發(fā)消息時(shí),您需要為發(fā)布端和訂閱端配置該接入點(diǎn),通過接入點(diǎn)接入云消息隊(duì)列 RabbitMQ 版實(shí)例。
登錄云消息隊(duì)列 RabbitMQ 版控制臺(tái),然后在左側(cè)導(dǎo)航欄選擇實(shí)例列表。
在實(shí)例列表頁面的頂部菜單欄選擇地域,然后在實(shí)例列表中,單擊目標(biāo)實(shí)例名稱。
在實(shí)例詳情頁面的接入點(diǎn)信息頁簽,將鼠標(biāo)指針移動(dòng)到目標(biāo)類型的接入點(diǎn),單擊該接入點(diǎn)右側(cè)的圖標(biāo),復(fù)制該接入點(diǎn)。
類型
說明
示例值
公網(wǎng)接入點(diǎn)
公網(wǎng)環(huán)境可讀寫。按量付費(fèi)實(shí)例默認(rèn)支持,預(yù)付費(fèi)實(shí)例需在購買時(shí)選擇才支持。
XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
VPC接入點(diǎn)
VPC環(huán)境可讀寫。按量付費(fèi)實(shí)例和預(yù)付費(fèi)實(shí)例默認(rèn)都支持。
XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
安裝Java依賴庫
在pom.xml添加以下依賴。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- 支持開源所有版本 -->
</dependency>
生成用戶名密碼
開源RabbitMQ客戶端接入云上服務(wù)時(shí),需要先通過AccessKey ID和AccessKey Secret生成用戶名和密碼,將用戶名和密碼設(shè)置到開源客戶端SDK的userName和passWord參數(shù)中。云消息隊(duì)列 RabbitMQ 版會(huì)通過用戶名和密碼進(jìn)行權(quán)限認(rèn)證。
登錄云消息隊(duì)列 RabbitMQ 版控制臺(tái),然后在左側(cè)導(dǎo)航欄選擇實(shí)例列表。
在實(shí)例列表頁面的頂部菜單欄選擇地域,然后在實(shí)例列表中,單擊目標(biāo)實(shí)例名稱。
在左側(cè)導(dǎo)航欄,單擊靜態(tài)用戶名密碼。
在靜態(tài)用戶名密碼頁面,單擊創(chuàng)建用戶名密碼。
在創(chuàng)建用戶名密碼面板,輸入AccessKey ID和AccessKey Secret,然后單擊確定。
說明AccessKey ID和AccessKey Secret需要在阿里云RAM控制臺(tái)獲取,具體獲取方式,請(qǐng)參見創(chuàng)建AccessKey。
靜態(tài)用戶名密碼頁面,顯示創(chuàng)建的靜態(tài)用戶名與密碼,密碼處于隱藏狀態(tài)。
在創(chuàng)建的靜態(tài)用戶名密碼的密碼列,單擊顯示密碼,可查看用戶名的密碼。
創(chuàng)建客戶端連接
創(chuàng)建連接管理工廠ConnectionFactory.java
,用于啟動(dòng)開源客戶端和云消息隊(duì)列 RabbitMQ 版服務(wù)端的連接。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
public class ConnectionFactory {
private final String hostName;
private final int port;
private final String userName;
private final String password;
private final String virtualHost;
private final boolean enableSSL;
public ConnectionFactory(String hostName, int port, String userName,
String password, String virtualHost, boolean enableSSL) {
this.hostName = hostName;
this.port = port;
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.enableSSL = enableSSL;
}
public Channel createChannel() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
//create a new con
Connection con = createCon();
//create a new channel
return con.createChannel();
}
private Connection createCon() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(password);
//設(shè)置為true,開啟Connection自動(dòng)恢復(fù)功能;設(shè)置為false,關(guān)閉Connection自動(dòng)恢復(fù)功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
// 默認(rèn)端口。
factory.setPort(port);
if (enableSSL) {
setSSL(factory);
}
// 基于網(wǎng)絡(luò)環(huán)境合理設(shè)置超時(shí)時(shí)間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
return factory.newConnection();
}
private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init((KeyStore) null);
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
factory.useSslProtocol(sslContext);
}
public void closeCon(Channel channel) {
if (channel != null && channel.getConnection() != null) {
try {
channel.getConnection().close();
} catch (Throwable t) {
}
}
}
創(chuàng)建生產(chǎn)者發(fā)送消息
創(chuàng)建并編譯運(yùn)行Producer.java。
編譯運(yùn)行Producer.java生產(chǎn)消息之前,您需要根據(jù)代碼提示信息配置參數(shù)列表中所列舉的參數(shù)。
表 1. 參數(shù)列表
參數(shù) | 示例值 | 描述 |
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | 云消息隊(duì)列 RabbitMQ 版實(shí)例的接入點(diǎn)。獲取方式,請(qǐng)參見獲取接入點(diǎn)。 |
port | 5672 |
|
userName | MjoxODgwNzcwODY5MD**** | 云消息隊(duì)列 RabbitMQ 版實(shí)例的靜態(tài)用戶名,用于權(quán)限驗(yàn)證。 獲取方式,請(qǐng)參見生成用戶名密碼。 |
passWord | NDAxREVDQzI2MjA0OT**** | 云消息隊(duì)列 RabbitMQ 版實(shí)例的靜態(tài)用戶名密碼,用于權(quán)限驗(yàn)證。 獲取方式,請(qǐng)參見生成用戶名密碼。 |
virtualHost | vhost_test | 云消息隊(duì)列 RabbitMQ 版實(shí)例的Vhost,需要提前創(chuàng)建。 具體操作,請(qǐng)參見前提條件。 |
exchangeName | ExchangeTest | 云消息隊(duì)列 RabbitMQ 版的Exchange。
|
bindingKey | BindingKeyTest | 云消息隊(duì)列 RabbitMQ 版Exchange與Queue綁定關(guān)系的Binding Key。
|
queueName | QueueTest | 云消息隊(duì)列 RabbitMQ 版的Queue。
|
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class Producer {
//設(shè)置為云消息隊(duì)列 RabbitMQ 版實(shí)例的接入點(diǎn)。
public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
//設(shè)置為云消息隊(duì)列 RabbitMQ 版實(shí)例的靜態(tài)用戶名。
public static final String userName = "MjoxODgwNzcwODY5MD****";
//設(shè)置為云消息隊(duì)列 RabbitMQ 版實(shí)例的靜態(tài)用戶名密碼。
public static final String password = "NDAxREVDQzI2MjA0OT****";
//設(shè)置為云消息隊(duì)列 RabbitMQ 版實(shí)例的Vhost名稱。
public static final String virtualHost = "vhost_test";
//如果使用5671端口,需要enableSSL設(shè)置為true。
public static final int port = 5672;
public static final boolean enableSSL = false;
private Channel channel;
private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
private final ConnectionFactory factory;
private final String exchangeName;
private final String queueName;
private final String bindingKey;
public Producer(ConnectionFactory factory, String exchangeName, String queueName, String bindingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
this.factory = factory;
this.outstandingConfirms = new ConcurrentSkipListMap<>();
this.channel = factory.createChannel();
this.exchangeName = exchangeName;
this.queueName = queueName;
this.bindingKey = bindingKey;
}
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
//構(gòu)建連接工廠。
ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
//初始化生產(chǎn)者。
Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "BindingKeyTest");
//declare。
producer.declare();
producer.initChannel();
//發(fā)送消息。
producer.doSend("hello,amqp");
}
private void initChannel() throws IOException {
channel.confirmSelect();
ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
for (Long tag : confirmed.keySet()) {
String msgId = confirmed.get(tag);
System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
}
confirmed.clear();
} else {
String msgId = outstandingConfirms.remove(deliveryTag);
System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
String msgId = outstandingConfirms.get(deliveryTag);
System.err.format("Message with msgId %s has been nack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
// send msg failed, re-publish
});
channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
}
private void declare() throws IOException {
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, bindingKey);
}
private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
try {
String msgId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();
channel.basicPublish(exchangeName, bindingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
} catch (AlreadyClosedException e) {
//need reconnect if channel is closed.
String message = e.getMessage();
System.out.println(message);
if (channelClosedByServer(message)) {
factory.closeCon(channel);
channel = factory.createChannel();
this.initChannel();
doSend(content);
} else {
throw e;
}
}
}
private boolean channelClosedByServer(String errorMsg) {
if (errorMsg != null
&& errorMsg.contains("channel.close")
&& errorMsg.contains("reply-code=541")
&& errorMsg.contains("reply-text=InternalError")) {
return true;
} else {
return false;
}
}
}
云消息隊(duì)列 RabbitMQ 版會(huì)對(duì)單實(shí)例的TPS流量峰值進(jìn)行限流,更多信息,請(qǐng)參見實(shí)例限流最佳實(shí)踐。
創(chuàng)建消費(fèi)者訂閱消息
創(chuàng)建并編譯運(yùn)行Consumer.java。
編譯運(yùn)行Consumer.java訂閱消息之前,您需要根據(jù)代碼提示信息配置參數(shù)列表中所列舉的參數(shù)。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
public class Consumer {
//設(shè)置為云消息隊(duì)列 RabbitMQ 版實(shí)例的接入點(diǎn)。
public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
//設(shè)置為云消息隊(duì)列 RabbitMQ 版實(shí)例的靜態(tài)用戶名。
public static final String userName = "MjoxODgwNzcwODY5MD****";
//設(shè)置為云消息隊(duì)列 RabbitMQ 版實(shí)例的靜態(tài)用戶名密碼。
public static final String password = "NDAxREVDQzI2MjA0OT****";
//設(shè)置為云消息隊(duì)列 RabbitMQ 版實(shí)例的Vhost名稱。
public static final String virtualHost = "vhost_test";
//如果使用5671端口,需要enableSSL設(shè)置為true。
public static final int port = 5672;
public static final boolean enableSSL = false;
private final Channel channel;
private final String queue;
public Consumer(Channel channel, String queue) {
this.channel = channel;
this.queue = queue;
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
Channel channel = factory.createChannel();
channel.basicQos(50);
//設(shè)置為云消息隊(duì)列 RabbitMQ 版實(shí)例的Queue名稱。需要和生產(chǎn)者中設(shè)置的Queue名稱一致。
Consumer consumer = new Consumer(channel, "queue-1");
consumer.consume();
}
public void consume() throws IOException, InterruptedException {
channel.basicConsume(queue, false, "yourConsumerTag", new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
//業(yè)務(wù)處理。
System.out.println("receive: msgId=" + properties.getMessageId());
//消費(fèi)者需要在有效時(shí)間內(nèi)提交ack,否則消息會(huì)重新推送,最多推送16次。
//若推送16次還未成功,則消息被丟棄或者進(jìn)入死信Exchange。
//專業(yè)版實(shí)例的有效時(shí)間為1分鐘,企業(yè)版和Serverless實(shí)例為5分鐘,鉑金版實(shí)例為30分鐘。
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
channel.getConnection().close();
} catch (IOException e) {
System.out.println("close connection error." + e);
}
latch.countDown();
}));
latch.await();
}
}
云消息隊(duì)列 RabbitMQ 版與開源RabbitMQ完全兼容。更多參數(shù)說明,請(qǐng)參見開源RabbitMQ客戶端文檔。
結(jié)果驗(yàn)證
您可以在云消息隊(duì)列 RabbitMQ 版控制臺(tái)通過消息查詢或軌跡查詢驗(yàn)證消息的收發(fā)狀態(tài)和消息軌跡。具體操作,請(qǐng)參見