日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

跨云賬號授權場景

更新時間:

本文以調用Java SDK為例,介紹在RAM角色跨賬號授權場景,通過開源SDK實現消息收發的操作過程,幫助您更好地理解消息收發的完整過程,其他語言或框架的SDK消息收發過程相似。

前提條件

背景信息

當您需要通過RAM STS角色授權的方式訪問云消息隊列 RabbitMQ 版服務時,需要通過阿里云提供的權限認證類(AliyunCredentialsProvider)設置 AccessKeyID、AccessKeySecretSecurityToken進行權限認證才能訪問。

借助訪問控制RAM的RAM角色,您可以跨云賬號授權,使某個企業訪問另一個企業的云消息隊列 RabbitMQ 版

  • 企業A希望能專注于業務系統,僅作為云消息隊列 RabbitMQ 版所有者。企業A希望可以授權企業B來操作部分業務,例如:云消息隊列 RabbitMQ 版的運維、監控以及管理等。

  • 企業A希望當企業B的員工加入或離職時,無需做任何權限變更。企業B可以進一步將企業A的資源訪問權限分配給企業B的RAM用戶(員工或應用),并可以精細控制其員工或應用對資源的訪問和操作權限。

  • 企業A希望如果雙方合同終止,企業A隨時可以撤銷企業B的授權。

更多信息,請參見RAM跨云賬號授權。

收發消息流程(以Java語言為例)

開源SDK消息收發流程

說明

云消息隊列 RabbitMQ 版與開源RabbitMQ完全兼容。更多語言SDK,請參見開源RabbitMQ AMQP協議支持的多語言或框架SDK。

安裝Java依賴庫

pom.xml中添加以下依賴。

<dependency>            
    <groupId>com.rabbitmq</groupId>            
    <artifactId>amqp-client</artifactId>            
    <version>5.5.0</version> <!-- 支持開源所有版本 -->       
</dependency>        
<dependency>          
    <groupId>com.alibaba.mq-amqp</groupId>          
    <artifactId>mq-amqp-client</artifactId>         
    <version>1.0.5</version>       
</dependency>       
<dependency>          
    <groupId>com.aliyun</groupId>          
    <artifactId>alibabacloud-sts20150401</artifactId>          
    <version>1.0.4</version>       
</dependency>

配置權限認證類(AliyunCredentialsProvider)

  1. 創建權限認證類AliyunCredentialsProvider.java,根據代碼提示信息,設置相關參數。具體信息,請參見參數列表。

    import com.alibaba.mq.amqp.utils.UserUtils;
    import com.aliyun.auth.credentials.Credential;
    import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
    import com.aliyun.sdk.service.sts20150401.AsyncClient;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleRequest;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleResponse;
    import com.rabbitmq.client.impl.CredentialsProvider;
    import darabonba.core.client.ClientOverrideConfiguration;
    import org.apache.commons.lang3.StringUtils;
    
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class AliyunCredentialsProvider implements CredentialsProvider {
        /**
         * 默認過期時間,單位毫秒??梢愿鶕I務實際情況設置。
         */
        private final long STS_TIMEOUT_DEFAULT = 1800 * 1000;
        /**
         * 實例ID,從云消息隊列 RabbitMQ 版控制臺獲取。
         */
        private final String instanceId;
        /**
         * Access Key ID。
         */
        private String accessKeyId;
        /**
         * Access Key Secret。
         */
        private String accessKeySecret;
        /**
         *  (可選)security temp token。
         */
        private String securityToken;
        /**
         * STS過期時間, 記錄后可提前更新STS token。
         */
        private Long timeStampLimit;
    
        // 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。
        // 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
        public AliyunCredentialsProvider(final String instanceId) {
            this.instanceId = instanceId;
        }
        public void updateProperties(String alibabaAccessKeyId, String alibabaAccessKeySecret, String region, String roleARN) throws ExecutionException, InterruptedException {
            this.timeStampLimit = System.currentTimeMillis() + STS_TIMEOUT_DEFAULT;
            // 自行調用AssumeRole接口實現,進行身份信息獲取。
            StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder()
                    .accessKeyId(alibabaAccessKeyId)
                    .accessKeySecret(alibabaAccessKeySecret)
                    .build());
            AsyncClient client = AsyncClient.builder()
                    .region(region) // 請設置Region ID, 例如cn-hangzhou。
                    .credentialsProvider(provider)
                    .overrideConfiguration(
                            ClientOverrideConfiguration.create()
                                    // Endpoint請參考https://api.aliyun.com/product/Sts。
                                    .setEndpointOverride("sts." + region + ".aliyuncs.com")
                            //.setConnectTimeout(Duration.ofSeconds(30))
                    )
                    .build();
            AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder()
                    .roleArn(roleARN) // 從控制臺獲取得到的角色ARN。
                    .roleSessionName("testRoleName") // 當前角色Session的名稱,可自定義。
                    .durationSeconds(STS_TIMEOUT_DEFAULT / 1000)
                    .build();
            CompletableFuture<AssumeRoleResponse> response = client.assumeRole(assumeRoleRequest);
            // Synchronously get the return value of the API request
            AssumeRoleResponse resp = response.get();
            if (resp.getBody().getCredentials() != null) {
                System.out.println("[INFO] Update AK, SK, Token successfully.");
                this.accessKeyId = resp.getBody().getCredentials().getAccessKeyId();
                this.securityToken = resp.getBody().getCredentials().getSecurityToken();
                this.accessKeySecret = resp.getBody().getCredentials().getAccessKeySecret();
            }
            client.close();
        }
    
        // 檢測當前該token是否快要過期。
        public boolean isNearlyExpired() {
            // 提前30秒判斷。
            return System.currentTimeMillis() > timeStampLimit - 30 * 1000L;
        }
    
        @Override
        public String getUsername() {
            if(StringUtils.isNotEmpty(securityToken)) {
                return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
            } else {
                return UserUtils.getUserName(accessKeyId, instanceId);
            }
        }
    
        @Override
        public String getPassword() {
            try {
                return UserUtils.getPassord(accessKeySecret);
            } catch (InvalidKeyException e) {
                //todo
            } catch (NoSuchAlgorithmException e) {
                //todo
            }
            return null;
        }
    }

表 1. 參數列表

參數

示例值

描述

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

云消息隊列 RabbitMQ 版實例接入點。

Port

5672

默認端口。非加密端口5672,加密端口5671。

AccessKeyID

LTAI5tJQKnB9zVvQ****

阿里云賬號或RAM用戶的AccessKey ID。您可以登錄RAM訪問控制臺,創建RAM角色,并賦予角色AliyunAMQPFullAccess權限,獲取角色的ARN,調用AssumeRole接口獲取一個扮演該角色的臨時身份。AssumeRole執行成功會返回RAM角色的 AccessKeyID、AccessKeySecret以及SecurityToken。角色ARN的概念,請參見RAM角色概覽。

AccessKeySecret

jw6MawfKOVBveRr84u****

阿里云賬號或RAM用戶的AccessKey Secret。

region

cn-hangzhou

調用對應地域的AssumeRole接口,詳情請參見AssumeRole。

roleARN

acs:ram::125xxxxxxx223:role/xxx

RAM角色的ARN。格式為acs:ram::<account_id>:role/<role_name>。詳情請參見AssumeRole。

instanceId

amqp-cn-v0h1kb9nu***

云消息隊列 RabbitMQ 版的實例ID。您可以在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。如何查看實例ID,請參見查看實例詳情。

virtualHost

Test

云消息隊列 RabbitMQ 版實例的Vhost。您可以在云消息隊列 RabbitMQ 版控制臺Vhost 列表頁面查看。如何查看Vhost,請參見查看Vhost連接詳情。

ExchangeName

ExchangeTest

云消息隊列 RabbitMQ 版的Exchange。您可以在云消息隊列 RabbitMQ 版控制臺Exchange 列表頁面獲取。

RoutingKey

RoutingKeyTest

云消息隊列 RabbitMQ 版Exchange與Queue的Routing Key。您可以在云消息隊列 RabbitMQ 版控制臺Exchange 列表頁面查看Exchange的綁定關系,獲取Routing Key。

QueueName

QueueTest

云消息隊列 RabbitMQ 版的Queue。僅在訂閱消息時候需要配置,您可以在云消息隊列 RabbitMQ 版控制臺Exchange 列表頁面,查看Exchange的綁定關系,獲取Exchange綁定的Queue。

生產消息

創建并編譯運行ProducerTest.java。

重要

編譯運行ProducerTest.java生產消息之前,您需要根據代碼提示信息配置參數列表中所列舉的參數。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    // 推薦將AK/SK/ARN等信息在環境變量中配置,若將其明文保存在工程代碼中,將帶來不必要的數據泄露風險。
    // 阿里云賬號的AccessKey ID。
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // 阿里云賬號的AccessKey Secret。
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // 阿里云服務所在的Region。
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // 阿里云角色ARN,從控制臺獲取。
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    //設置實例的接入點。
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    //設置實例的Vhost。
    private static final String virtualHost = "${VirtualHost}";
    //設置Exchange、Queue和綁定關系。
    private static final String exchangeName = "${ExchangeName}";
    private static final String queueName = "${QueueName}";
    private static final String routingKey = "${RoutingKey}";
    //設置Exchange類型。
    private static final String exchangeType = "${ExchangeType}";

    public static void main(String[] args) throws InterruptedException, IOException, TimeoutException, ExecutionException {
        ConnectionFactory factory = new ConnectionFactory();
        // 設置接入點,在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。
        factory.setHost(hostName);
        // ${instanceId}為實例ID,在云消息隊列 RabbitMQ 版控制臺概覽頁面查看。
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        // ${instanceId}為實例ID,在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        //設置為true,開啟Connection自動恢復功能;設置為false,關閉Connection自動恢復功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 設置Vhost名稱,請確保已在云消息隊列 RabbitMQ 版控制臺上創建完成。
        factory.setVirtualHost(virtualHost);
        // 默認端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于網絡環境合理設置超時時間。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        // 開始發送消息,3600條消息,每條發送后暫停1秒,將持續1小時。
        for (int i = 0; i < 3600; i++) {
            try {
                if (aliyunCredentialsProvider.isNearlyExpired()) {
                    // 認證可能過期,重新認證
                    System.out.println("[WARN] Token maybe expired, so try to update it.");
                    updateSTSProperties(aliyunCredentialsProvider);
                    factory.setCredentialsProvider(aliyunCredentialsProvider);
                    // 當配置更新后,需要重新建立連接。
                    connection = factory.newConnection();
                    channel = connection.createChannel();
                }
                // ${ExchangeName}必須在云消息隊列 RabbitMQ 版控制臺上已存在,并且Exchange的類型與控制臺上的類型一致。
                // ${RoutingKey}根據業務需求填入相應的RoutingKey。
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                channel.basicPublish(exchangeName, routingKey, true, props,
                        ("消息發送Body-"  + i).getBytes(StandardCharsets.UTF_8));
                System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId());
                Thread.sleep(1000L);
            } catch (Exception e) {
                System.out.println("[ERROR] Send fail, error: " + e.getMessage());
                Thread.sleep(5000L);
            }
        }
        connection.close();
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        // 推薦將AK/SK在環境變量中配置,若將其明文保存在工程代碼中,將帶來不必要的數據泄露風險。
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}
說明

云消息隊列 RabbitMQ 版會對單實例的TPS流量峰值進行限流,更多信息,請參見實例限流最佳實踐。

訂閱消息

創建并編譯運行ConsumerTest.java訂閱消息。

重要

編譯運行ConsumerTest.java訂閱消息之前,您需要根據代碼提示信息配置參數列表中所列舉的參數。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    // 推薦將AK/SK/ARN等信息在環境變量中配置,若將其明文保存在工程代碼中,將帶來不必要的數據泄露風險。
    // 阿里云賬號的AccessKey ID。
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // 阿里云賬號的AccessKey Secret。
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // 阿里云服務所在的Region。
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // 阿里云角色ARN,從控制臺獲取。
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    //設置實例的接入點。
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    //設置實例的Vhost。
    private static final String virtualHost = "${VirtualHost}";
    //設置Queue。
    private static final String queueName = "${QueueName}";

    public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        // 設置接入點,在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。
        factory.setHost(hostName);
        // ${instanceId}為實例ID,在云消息隊列 RabbitMQ 版控制臺概覽頁面查看。
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        //設置為true,開啟Connection自動恢復功能;設置為false,關閉Connection自動恢復功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 設置Vhost名稱,請確保已在云消息隊列 RabbitMQ 版控制臺上創建完成。
        factory.setVirtualHost(virtualHost);
        // 默認端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 創建${QueueName} ,該Queue必須在云消息隊列 RabbitMQ 版控制臺上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        consume(channel, queueName);
        System.out.println("Consumer started.");

        // 循環檢測sts是否即將過期,若過期則更新connection,重新消費。
        // 這里為了方便理解,使用while循環檢測認證是否接近過期。
        // 可以使用定時任務,更優雅地實現定時檢查、更新操作。
        while (true) {
            // 每次處理完消息后,可以判斷是否接近過期。
            // 如果接近過期,則更新一次認證類,
            // 該過程需要重新創建連接,以確保業務持續運行。
            if (aliyunCredentialsProvider.isNearlyExpired()) {
                System.out.println("token maybe expired, so try to update it.");
                updateSTSProperties(aliyunCredentialsProvider);
                factory.setCredentialsProvider(aliyunCredentialsProvider);
                connection.close();
                connection = factory.newConnection();
                channel = connection.createChannel();
                // 重新開始消費消息。
                consume(channel, queueName);
                System.out.println("Consumer started.");
            } else {
                // 每秒檢測一次。
                Thread.sleep(1000);
            }
        }
    }

    public static void consume(Channel channel, String queueName) throws IOException {

        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) {
                try {
                    //接收到的消息,進行業務邏輯處理。
                    System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    System.out.println("Exception, cause:" + e.getMessage());
                }
            }
        });
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}

查詢消息

如果您想確認消息是否成功發送至云消息隊列 RabbitMQ 版,可以在云消息隊列 RabbitMQ 版控制臺查詢消息。具體操作,請參見查詢消息