跨云賬號授權場景
本文以調用Java SDK為例,介紹在RAM角色跨賬號授權場景,通過開源SDK實現消息收發的操作過程,幫助您更好地理解消息收發的完整過程,其他語言或框架的SDK消息收發過程相似。
前提條件
背景信息
當您需要通過RAM STS角色授權的方式訪問云消息隊列 RabbitMQ 版服務時,需要通過阿里云提供的權限認證類(AliyunCredentialsProvider)設置 AccessKeyID、AccessKeySecret與SecurityToken進行權限認證才能訪問。
借助訪問控制RAM的RAM角色,您可以跨云賬號授權,使某個企業訪問另一個企業的云消息隊列 RabbitMQ 版。
企業A希望能專注于業務系統,僅作為云消息隊列 RabbitMQ 版所有者。企業A希望可以授權企業B來操作部分業務,例如:云消息隊列 RabbitMQ 版的運維、監控以及管理等。
企業A希望當企業B的員工加入或離職時,無需做任何權限變更。企業B可以進一步將企業A的資源訪問權限分配給企業B的RAM用戶(員工或應用),并可以精細控制其員工或應用對資源的訪問和操作權限。
企業A希望如果雙方合同終止,企業A隨時可以撤銷企業B的授權。
更多信息,請參見RAM跨云賬號授權。
收發消息流程(以Java語言為例)
云消息隊列 RabbitMQ 版與開源RabbitMQ完全兼容。更多語言SDK,請參見開源RabbitMQ AMQP協議支持的多語言或框架SDK。
安裝Java依賴庫
在pom.xml中添加以下依賴。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version> <!-- 支持開源所有版本 -->
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp</groupId>
<artifactId>mq-amqp-client</artifactId>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibabacloud-sts20150401</artifactId>
<version>1.0.4</version>
</dependency>
配置權限認證類(AliyunCredentialsProvider)
創建權限認證類AliyunCredentialsProvider.java,根據代碼提示信息,設置相關參數。具體信息,請參見參數列表。
import com.alibaba.mq.amqp.utils.UserUtils; import com.aliyun.auth.credentials.Credential; import com.aliyun.auth.credentials.provider.StaticCredentialProvider; import com.aliyun.sdk.service.sts20150401.AsyncClient; import com.aliyun.sdk.service.sts20150401.models.AssumeRoleRequest; import com.aliyun.sdk.service.sts20150401.models.AssumeRoleResponse; import com.rabbitmq.client.impl.CredentialsProvider; import darabonba.core.client.ClientOverrideConfiguration; import org.apache.commons.lang3.StringUtils; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class AliyunCredentialsProvider implements CredentialsProvider { /** * 默認過期時間,單位毫秒??梢愿鶕I務實際情況設置。 */ private final long STS_TIMEOUT_DEFAULT = 1800 * 1000; /** * 實例ID,從云消息隊列 RabbitMQ 版控制臺獲取。 */ private final String instanceId; /** * Access Key ID。 */ private String accessKeyId; /** * Access Key Secret。 */ private String accessKeySecret; /** * (可選)security temp token。 */ private String securityToken; /** * STS過期時間, 記錄后可提前更新STS token。 */ private Long timeStampLimit; // 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。 // 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。 public AliyunCredentialsProvider(final String instanceId) { this.instanceId = instanceId; } public void updateProperties(String alibabaAccessKeyId, String alibabaAccessKeySecret, String region, String roleARN) throws ExecutionException, InterruptedException { this.timeStampLimit = System.currentTimeMillis() + STS_TIMEOUT_DEFAULT; // 自行調用AssumeRole接口實現,進行身份信息獲取。 StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder() .accessKeyId(alibabaAccessKeyId) .accessKeySecret(alibabaAccessKeySecret) .build()); AsyncClient client = AsyncClient.builder() .region(region) // 請設置Region ID, 例如cn-hangzhou。 .credentialsProvider(provider) .overrideConfiguration( ClientOverrideConfiguration.create() // Endpoint請參考https://api.aliyun.com/product/Sts。 .setEndpointOverride("sts." + region + ".aliyuncs.com") //.setConnectTimeout(Duration.ofSeconds(30)) ) .build(); AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder() .roleArn(roleARN) // 從控制臺獲取得到的角色ARN。 .roleSessionName("testRoleName") // 當前角色Session的名稱,可自定義。 .durationSeconds(STS_TIMEOUT_DEFAULT / 1000) .build(); CompletableFuture<AssumeRoleResponse> response = client.assumeRole(assumeRoleRequest); // Synchronously get the return value of the API request AssumeRoleResponse resp = response.get(); if (resp.getBody().getCredentials() != null) { System.out.println("[INFO] Update AK, SK, Token successfully."); this.accessKeyId = resp.getBody().getCredentials().getAccessKeyId(); this.securityToken = resp.getBody().getCredentials().getSecurityToken(); this.accessKeySecret = resp.getBody().getCredentials().getAccessKeySecret(); } client.close(); } // 檢測當前該token是否快要過期。 public boolean isNearlyExpired() { // 提前30秒判斷。 return System.currentTimeMillis() > timeStampLimit - 30 * 1000L; } @Override public String getUsername() { if(StringUtils.isNotEmpty(securityToken)) { return UserUtils.getUserName(accessKeyId, instanceId, securityToken); } else { return UserUtils.getUserName(accessKeyId, instanceId); } } @Override public String getPassword() { try { return UserUtils.getPassord(accessKeySecret); } catch (InvalidKeyException e) { //todo } catch (NoSuchAlgorithmException e) { //todo } return null; } }
表 1. 參數列表
參數 | 示例值 | 描述 |
hostName | 1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com | 云消息隊列 RabbitMQ 版實例接入點。 |
Port | 5672 | 默認端口。非加密端口5672,加密端口5671。 |
AccessKeyID | LTAI5tJQKnB9zVvQ**** | 阿里云賬號或RAM用戶的AccessKey ID。您可以登錄RAM訪問控制臺,創建RAM角色,并賦予角色AliyunAMQPFullAccess權限,獲取角色的ARN,調用AssumeRole接口獲取一個扮演該角色的臨時身份。AssumeRole執行成功會返回RAM角色的 AccessKeyID、AccessKeySecret以及SecurityToken。角色ARN的概念,請參見RAM角色概覽。 |
AccessKeySecret | jw6MawfKOVBveRr84u**** | 阿里云賬號或RAM用戶的AccessKey Secret。 |
region | cn-hangzhou | 調用對應地域的AssumeRole接口,詳情請參見AssumeRole。 |
roleARN | acs:ram::125xxxxxxx223:role/xxx | RAM角色的ARN。格式為 |
instanceId | amqp-cn-v0h1kb9nu*** | 云消息隊列 RabbitMQ 版的實例ID。您可以在云消息隊列 RabbitMQ 版控制臺的實例詳情頁面查看。如何查看實例ID,請參見查看實例詳情。 |
virtualHost | Test | 云消息隊列 RabbitMQ 版實例的Vhost。您可以在云消息隊列 RabbitMQ 版控制臺的Vhost 列表頁面查看。如何查看Vhost,請參見查看Vhost連接詳情。 |
ExchangeName | ExchangeTest | 云消息隊列 RabbitMQ 版的Exchange。您可以在云消息隊列 RabbitMQ 版控制臺的Exchange 列表頁面獲取。 |
RoutingKey | RoutingKeyTest | 云消息隊列 RabbitMQ 版Exchange與Queue的Routing Key。您可以在云消息隊列 RabbitMQ 版控制臺的Exchange 列表頁面查看Exchange的綁定關系,獲取Routing Key。 |
QueueName | QueueTest | 云消息隊列 RabbitMQ 版的Queue。僅在訂閱消息時候需要配置,您可以在云消息隊列 RabbitMQ 版控制臺的Exchange 列表頁面,查看Exchange的綁定關系,獲取Exchange綁定的Queue。 |
生產消息
創建并編譯運行ProducerTest.java。
編譯運行ProducerTest.java生產消息之前,您需要根據代碼提示信息配置參數列表中所列舉的參數。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
// 推薦將AK/SK/ARN等信息在環境變量中配置,若將其明文保存在工程代碼中,將帶來不必要的數據泄露風險。
// 阿里云賬號的AccessKey ID。
private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
// 阿里云賬號的AccessKey Secret。
private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 阿里云服務所在的Region。
private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
// 阿里云角色ARN,從控制臺獲取。
private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
//設置實例的接入點。
private static final String hostName = "xxx.xxx.aliyuncs.com";
private static final String instanceId = "${InstanceId}";
//設置實例的Vhost。
private static final String virtualHost = "${VirtualHost}";
//設置Exchange、Queue和綁定關系。
private static final String exchangeName = "${ExchangeName}";
private static final String queueName = "${QueueName}";
private static final String routingKey = "${RoutingKey}";
//設置Exchange類型。
private static final String exchangeType = "${ExchangeType}";
public static void main(String[] args) throws InterruptedException, IOException, TimeoutException, ExecutionException {
ConnectionFactory factory = new ConnectionFactory();
// 設置接入點,在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。
factory.setHost(hostName);
// ${instanceId}為實例ID,在云消息隊列 RabbitMQ 版控制臺概覽頁面查看。
AliyunCredentialsProvider aliyunCredentialsProvider =
new AliyunCredentialsProvider(instanceId);
updateSTSProperties(aliyunCredentialsProvider);
// ${instanceId}為實例ID,在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。
factory.setCredentialsProvider(aliyunCredentialsProvider);
//設置為true,開啟Connection自動恢復功能;設置為false,關閉Connection自動恢復功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 設置Vhost名稱,請確保已在云消息隊列 RabbitMQ 版控制臺上創建完成。
factory.setVirtualHost(virtualHost);
// 默認端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
// 基于網絡環境合理設置超時時間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, routingKey);
// 開始發送消息,3600條消息,每條發送后暫停1秒,將持續1小時。
for (int i = 0; i < 3600; i++) {
try {
if (aliyunCredentialsProvider.isNearlyExpired()) {
// 認證可能過期,重新認證
System.out.println("[WARN] Token maybe expired, so try to update it.");
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
// 當配置更新后,需要重新建立連接。
connection = factory.newConnection();
channel = connection.createChannel();
}
// ${ExchangeName}必須在云消息隊列 RabbitMQ 版控制臺上已存在,并且Exchange的類型與控制臺上的類型一致。
// ${RoutingKey}根據業務需求填入相應的RoutingKey。
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(exchangeName, routingKey, true, props,
("消息發送Body-" + i).getBytes(StandardCharsets.UTF_8));
System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId());
Thread.sleep(1000L);
} catch (Exception e) {
System.out.println("[ERROR] Send fail, error: " + e.getMessage());
Thread.sleep(5000L);
}
}
connection.close();
}
public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
System.out.println("Try to update STS properties");
// 推薦將AK/SK在環境變量中配置,若將其明文保存在工程代碼中,將帶來不必要的數據泄露風險。
aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
}
}
云消息隊列 RabbitMQ 版會對單實例的TPS流量峰值進行限流,更多信息,請參見實例限流最佳實踐。
訂閱消息
創建并編譯運行ConsumerTest.java訂閱消息。
編譯運行ConsumerTest.java訂閱消息之前,您需要根據代碼提示信息配置參數列表中所列舉的參數。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
// 推薦將AK/SK/ARN等信息在環境變量中配置,若將其明文保存在工程代碼中,將帶來不必要的數據泄露風險。
// 阿里云賬號的AccessKey ID。
private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
// 阿里云賬號的AccessKey Secret。
private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 阿里云服務所在的Region。
private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
// 阿里云角色ARN,從控制臺獲取。
private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
//設置實例的接入點。
private static final String hostName = "xxx.xxx.aliyuncs.com";
private static final String instanceId = "${InstanceId}";
//設置實例的Vhost。
private static final String virtualHost = "${VirtualHost}";
//設置Queue。
private static final String queueName = "${QueueName}";
public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
// 設置接入點,在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。
factory.setHost(hostName);
// ${instanceId}為實例ID,在云消息隊列 RabbitMQ 版控制臺概覽頁面查看。
AliyunCredentialsProvider aliyunCredentialsProvider =
new AliyunCredentialsProvider(instanceId);
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
//設置為true,開啟Connection自動恢復功能;設置為false,關閉Connection自動恢復功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 設置Vhost名稱,請確保已在云消息隊列 RabbitMQ 版控制臺上創建完成。
factory.setVirtualHost(virtualHost);
// 默認端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 創建${QueueName} ,該Queue必須在云消息隊列 RabbitMQ 版控制臺上已存在。
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
consume(channel, queueName);
System.out.println("Consumer started.");
// 循環檢測sts是否即將過期,若過期則更新connection,重新消費。
// 這里為了方便理解,使用while循環檢測認證是否接近過期。
// 可以使用定時任務,更優雅地實現定時檢查、更新操作。
while (true) {
// 每次處理完消息后,可以判斷是否接近過期。
// 如果接近過期,則更新一次認證類,
// 該過程需要重新創建連接,以確保業務持續運行。
if (aliyunCredentialsProvider.isNearlyExpired()) {
System.out.println("token maybe expired, so try to update it.");
updateSTSProperties(aliyunCredentialsProvider);
factory.setCredentialsProvider(aliyunCredentialsProvider);
connection.close();
connection = factory.newConnection();
channel = connection.createChannel();
// 重新開始消費消息。
consume(channel, queueName);
System.out.println("Consumer started.");
} else {
// 每秒檢測一次。
Thread.sleep(1000);
}
}
}
public static void consume(Channel channel, String queueName) throws IOException {
channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
//接收到的消息,進行業務邏輯處理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("Exception, cause:" + e.getMessage());
}
}
});
}
public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
System.out.println("Try to update STS properties");
aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
}
}
查詢消息
如果您想確認消息是否成功發送至云消息隊列 RabbitMQ 版,可以在云消息隊列 RabbitMQ 版控制臺查詢消息。具體操作,請參見查詢消息。