開源SDK接入指南(數(shù)據(jù)加密傳輸場景)
云消息隊(duì)列 RabbitMQ 版支持TLS加密傳輸。本文以調(diào)用Java SDK為例,介紹如何在使用開源SDK實(shí)現(xiàn)消息收發(fā)的操作過程中,使用TLS實(shí)現(xiàn)數(shù)據(jù)傳輸加密,其他語言或框架的SDK數(shù)據(jù)傳輸加密相似。
前提條件
背景信息
開源RabbitMQ客戶端支持通過用戶名/密碼和AK/SK兩種方式接入云上服務(wù),當(dāng)您需要對傳輸?shù)臄?shù)據(jù)進(jìn)行加密時(shí),需要修改連接端口為5671加密端口,并啟用TLS加密傳輸(默認(rèn)使用TLSv1.2版本)。
加密收發(fā)消息流程(以Java語言為例)
獲取接入點(diǎn)
您需要在云消息隊(duì)列 RabbitMQ 版控制臺(tái)獲取實(shí)例的接入點(diǎn)。在收發(fā)消息時(shí),您需要為發(fā)布端和訂閱端配置該接入點(diǎn),通過接入點(diǎn)接入云消息隊(duì)列 RabbitMQ 版實(shí)例。
登錄云消息隊(duì)列 RabbitMQ 版控制臺(tái),然后在左側(cè)導(dǎo)航欄選擇實(shí)例列表。
在實(shí)例列表頁面的頂部菜單欄選擇地域,然后在實(shí)例列表中,單擊目標(biāo)實(shí)例名稱。
在實(shí)例詳情頁面的接入點(diǎn)信息頁簽,將鼠標(biāo)指針移動(dòng)到目標(biāo)類型的接入點(diǎn),單擊該接入點(diǎn)右側(cè)的圖標(biāo),復(fù)制該接入點(diǎn)。
類型
說明
示例值
公網(wǎng)接入點(diǎn)
公網(wǎng)環(huán)境可讀寫。按量付費(fèi)實(shí)例默認(rèn)支持,預(yù)付費(fèi)實(shí)例需在購買時(shí)選擇才支持。
XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
VPC接入點(diǎn)
VPC環(huán)境可讀寫。按量付費(fèi)實(shí)例和預(yù)付費(fèi)實(shí)例默認(rèn)都支持。
XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
安裝Java依賴庫
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- 支持開源所有版本 -->
</dependency>
生成用戶名密碼
登錄云消息隊(duì)列 RabbitMQ 版控制臺(tái),然后在左側(cè)導(dǎo)航欄選擇實(shí)例列表。
在實(shí)例列表頁面的頂部菜單欄選擇地域,然后在實(shí)例列表中,單擊目標(biāo)實(shí)例名稱。
在左側(cè)導(dǎo)航欄,單擊靜態(tài)用戶名密碼。
在靜態(tài)用戶名密碼頁面,單擊創(chuàng)建用戶名密碼。
在創(chuàng)建用戶名密碼面板,輸入AccessKey ID和AccessKey Secret,然后單擊確定。
說明AccessKey ID和AccessKey Secret需要在阿里云RAM控制臺(tái)獲取,具體獲取方式,請參見創(chuàng)建AccessKey。
靜態(tài)用戶名密碼頁面,顯示創(chuàng)建的靜態(tài)用戶名與密碼,密碼處于隱藏狀態(tài)。
在創(chuàng)建的靜態(tài)用戶名密碼的密碼列,單擊顯示密碼,可查看用戶名的密碼。
生產(chǎn)消息
創(chuàng)建并編譯運(yùn)行ProducerTest.java
編譯運(yùn)行ProducerTest.java生產(chǎn)消息之前,您需要根據(jù)代碼提示信息修改表1 配置參數(shù)列表中所列舉的參數(shù)。
表1 配置參數(shù)列表
參數(shù) | 示例值 | 描述 |
hostName | XXX.net.mq.amqp.aliyuncs.com | 云消息隊(duì)列 RabbitMQ 版實(shí)例接入點(diǎn)。獲取方式,請參見獲取接入點(diǎn)。 |
Port | 5671 | 默認(rèn)端口。非加密端口為5672,加密端口為5671。 |
userName | MjoxODgwNzcwODY5MD**** | 客戶端接入云消息隊(duì)列 RabbitMQ 版服務(wù)端用于權(quán)限認(rèn)證的靜態(tài)用戶名。 需要提前在云消息隊(duì)列 RabbitMQ 版控制臺(tái)創(chuàng)建。 具體操作,請參見創(chuàng)建用戶名密碼。 |
passWord | NDAxREVDQzI2MjA0OT**** | 客戶端接入云消息隊(duì)列 RabbitMQ 版服務(wù)端用于權(quán)限認(rèn)證的靜態(tài)用戶密碼。 需要提前在云消息隊(duì)列 RabbitMQ 版控制臺(tái)創(chuàng)建。 具體操作,請參見創(chuàng)建用戶名密碼。 |
virtualHost | amqp_vhost | 云消息隊(duì)列 RabbitMQ 版實(shí)例的Vhost。需要提前在云消息隊(duì)列 RabbitMQ 版控制臺(tái)創(chuàng)建。 具體操作,請參見創(chuàng)建Vhost。 |
exchangeName | ExchangeTest | 云消息隊(duì)列 RabbitMQ 版的Exchange。 需要提前在云消息隊(duì)列 RabbitMQ 版控制臺(tái)創(chuàng)建。 具體操作,請參見創(chuàng)建Exchange。 |
queueName | QueueTest | 云消息隊(duì)列 RabbitMQ 版的Queue。 需要提前在云消息隊(duì)列 RabbitMQ 版控制臺(tái)創(chuàng)建。 具體操作,請參見創(chuàng)建Queue。 |
bindingKey | BindingKeyTest | 云消息隊(duì)列 RabbitMQ 版Exchange與Queue綁定的Binding Key。 需要提前在云消息隊(duì)列 RabbitMQ 版控制臺(tái)創(chuàng)建綁定關(guān)系。 具體操作,請參見創(chuàng)建綁定關(guān)系。 |
exchangeType | topic | Exchange的類型。云消息隊(duì)列 RabbitMQ 版支持的類型如下,更多信息,請參見Exchange。
重要 請確保填寫的Exchange類型和您創(chuàng)建Exchange時(shí)選擇的類型一致。 |
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
//設(shè)置實(shí)例的接入點(diǎn)。
String hostName = "xxx.xxx.aliyuncs.com";
//設(shè)置實(shí)例的靜態(tài)用戶名密碼。
String userName = "${UserName}";
String passWord = "${PassWord}";
//設(shè)置實(shí)例的Vhost。
String virtualHost = "${VirtualHost}";
//在生產(chǎn)環(huán)境中,建議提前創(chuàng)建好Connection,并在需要時(shí)重復(fù)使用,避免頻繁創(chuàng)建和關(guān)閉Connection,以提高性能、復(fù)用連接資源,以及保證系統(tǒng)的穩(wěn)定性。
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
Channel channel = connection.createChannel();
//設(shè)置Exchange、Queue和綁定關(guān)系。
String exchangeName = "${ExchangeName}";
String queueName = "${QueueName}";
String bindingKey = "${BindingKey}";
//設(shè)置Exchange類型。
String exchangeType = "${ExchangeType}";
//此處為了體驗(yàn)流暢,確保了Exchange和Queue的創(chuàng)建過程。
//在生產(chǎn)環(huán)境中,建議在控制臺(tái)提前創(chuàng)建,盡量避免在代碼中直接聲明,否則可能觸發(fā)單API調(diào)用的限流。
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, bindingKey);
//開始發(fā)送消息。
for (int i = 0; i < 10; i++) {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(exchangeName, bindingKey, true, props,
("消息發(fā)送示例Body-" + i).getBytes(StandardCharsets.UTF_8));
System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
}
connection.close();
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//設(shè)置為true,開啟Connection自動(dòng)恢復(fù)功能;設(shè)置為false,關(guān)閉Connection自動(dòng)恢復(fù)功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
//默認(rèn)端口,非加密端口5672,加密端口5671。
factory.setPort(5671);
factory.useSslProtocol();
//基于網(wǎng)絡(luò)環(huán)境合理設(shè)置超時(shí)時(shí)間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
}
}
訂閱消息
創(chuàng)建并編譯運(yùn)行ConsumerTest.java
。
編譯運(yùn)行ConsumerTest.java
訂閱消息之前,您需要根據(jù)代碼提示信息修改表1 配置參數(shù)列表中所列舉的參數(shù)。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
//設(shè)置實(shí)例的接入點(diǎn)。
String hostName = "xxx.xxx.aliyuncs.com";
//設(shè)置實(shí)例的靜態(tài)用戶名密碼。
String userName = "${UserName}";
String passWord = "${PassWord}";
//設(shè)置實(shí)例的Vhost。
String virtualHost = "${VirtualHost}";
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
final Channel channel = connection.createChannel();
//聲明Queue。
String queueName = "${QueueName}";
//創(chuàng)建${QueueName} ,該Queue必須在云消息隊(duì)列RabbitMQ版控制臺(tái)上已存在。
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
//開始消費(fèi)消息。
channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,進(jìn)行業(yè)務(wù)邏輯處理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//設(shè)置為true,開啟Connection自動(dòng)恢復(fù)功能;設(shè)置為false,關(guān)閉Connection自動(dòng)恢復(fù)功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
// 默認(rèn)端口,非加密端口5672,加密端口5671。
factory.setPort(5671);
factory.useSslProtocol();
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
};
}