SDK FAQ
本文記錄通過SDK收發消息時常見的問題。
開源客戶端是否可以直接訪問云上服務?
云消息隊列 RabbitMQ 版完全兼容開源RabbitMQ。開源RabbitMQ可以直接訪問云上服務。您需要通過云消息隊列 RabbitMQ 版控制臺生成靜態用戶名密碼之后,通過靜態賬戶直接訪問云上服務。如何創建靜態用戶名密碼,請參見靜態用戶名密碼管理。
支持哪些語言的開源SDK?
開源RabbitMQ提供的多語言或框架SDK云消息隊列 RabbitMQ 版全部都支持。具體信息,請參見開源RabbitMQ AMQP協議支持的多語言或框架SDK。
是否可以直接使用開源RabbitMQ JMS Client?
云消息隊列 RabbitMQ 版不支持直接使用開源RabbitMQ JMS Client。您需要通過Maven配置依賴庫才能接入云消息隊列 RabbitMQ 版收發消息。如何通過Maven配置依賴庫來收發消息,請參見JMS概述。
JMS標準有哪些接口不支持?
如果是自動ACK,是否支持通過Reject來觸發消息重新入隊列?
不可以。您可以使用basicReject方法否定應答單條消息,或者使用basicNack方法否定應答一條或多條消息,并使消息重入隊列。
參數
說明
deliveryTag
Channel的消息投遞的唯一標識符。
requeue
被否定應答的消息是否重入隊列。如果設置為true,則消息重入隊列;如果設置為false,則消息被丟棄或發送到死信Exchange。更多信息,請參見死信Exchange。
參數
說明
deliveryTag
Channel的消息投遞的唯一標識符。
multiple
是否否定應答多條消息。如果設置為true,則否定應答帶指定deliveryTag的消息及該deliveryTag之前的多條消息;如果設置為false,則僅否定應答帶指定deliveryTag的單條消息。
requeue
被否定應答的消息是否重入隊列。如果設置為true,則消息重入隊列;如果設置為false,則消息被丟棄或發送到死信Exchange。更多信息,請參見死信Exchange。
示例代碼
channel.basicConsume("test", false, "consumertag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Rejected: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId()); channel.basicNack(envelope.getDeliveryTag(), true, true); } });
如何設置Message ID?
如果您需追蹤和識別消息,可以在云消息隊列 RabbitMQ 版的Producer客戶端設置Message ID屬性,為每條消息設置唯一標識符。關于Message ID的更多信息,請參見Message ID。
在云消息隊列 RabbitMQ 版的Producer客戶端設置Basic.Properties
的message-id
屬性。示例代碼如下:
Message ID最大長度不能超過255個字符。
Java
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId("messageid").build();
channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("消息發送Body").getBytes(StandardCharsets.UTF_8));
Python
properties = pika.BasicProperties(app_id='example-publisher', content_type='application/json', message_id='messageid')
PHP
$msg = new AMQPMessage($msgBody, ['application_headers'=>$amqpTable,'content_type' => 'text/plain', 'delivery_mode' => 2,'message_id' => 'messageid',]);
Go
err = ch.Publish( "helloExchange", "hello", false, false, amqp.Publishing { ContentType: "text/plain", Body: []byte(body), MessageId: "messageId", })
Node.js
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
如何刪除隊列消息?
您可以使用Java客戶端庫中queuePurge
方法刪除某個隊列的所有消息。示例代碼如下:
channel.queuePurge("queue-name");
如何在開源客戶端設置加密傳輸?
以下示例使用默認的非加密端口5672,如果使用加密傳輸,需要連接5671端口,并設置
com.rabbitmq.client.ConnectionFactory
的SslProtocol。
private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init((KeyStore) null);
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
factory.useSslProtocol(sslContext);
}
消費者消費到的數據量不相同,導致不同消費者的內存/CPU使用率產生較大差異怎么辦?
云消息隊列 RabbitMQ 版使用分布式部署,后臺部署有多個服務節點,同時,網關使用輪詢機制與后臺服務節點建立連接。如果客戶端連接在服務節點上分布不均衡,可能導致每個消費者拉取到的消息數量不同。例如,有5個客戶端,3個服務端節點A、B、C,每個客戶端建立一個連接,即A、B服務節點上有2個連接,C服務節點上有1個連接。那么C節點上的這一個連接將會消費到C節點上拉取到的所有數據(后端存算分離兩層架構,3個后端服務節點拉取到的數據一樣多),則該連接對應的客戶端消費到的數據量就會明顯增加。
解決方式:每個客戶端建立多個連接,例如每個客戶端節點建立50個連接,將連接數量均衡分布到后端各個節點上,均衡每個客戶端拉取到的數據。如果客戶端使用springboot,則可配置connection
模式進行消費。