實(shí)例限流最佳實(shí)踐
云消息隊(duì)列 RabbitMQ 版會(huì)對(duì)單實(shí)例的TPS流量峰值進(jìn)行限流,本文介紹云消息隊(duì)列 RabbitMQ 版實(shí)例的限流規(guī)則、限流后的行為以及限流最佳實(shí)踐等。
限流后行為
當(dāng)云消息隊(duì)列 RabbitMQ 版實(shí)例的TPS流量峰值超過(guò)您所購(gòu)買(mǎi)實(shí)例的TPS規(guī)格上限時(shí),云消息隊(duì)列 RabbitMQ 版實(shí)例會(huì)被限流。
限流后的行為如下:
云消息隊(duì)列 RabbitMQ 版服務(wù)端會(huì)返回錯(cuò)誤碼信息。具體請(qǐng)參見(jiàn)錯(cuò)誤碼說(shuō)明。
云消息隊(duì)列 RabbitMQ 版服務(wù)端關(guān)閉當(dāng)前請(qǐng)求的Channel。代碼中可以捕獲異常重新開(kāi)啟Channel。
限流Java示例代碼如下:
private static final int MAX_RETRIES = 5; // 最大重試次數(shù)
private static final long WAIT_TIME_MS = 2000; // 每次重試的等待時(shí)間(以毫秒為單位)
private void doAnythingWithReopenChannels(Connection connection, Channel channel) {
try {
// ......
// 在當(dāng)前通道channel下執(zhí)行的任何操作
// 例如消息發(fā)送、消費(fèi)等
// ......
} catch (AlreadyClosedException e) {
String message = e.getMessage();
if (isChannelClosed(message)) {
// 如果通道已經(jīng)關(guān)閉,關(guān)閉并重新創(chuàng)建通道
channel = createChannelWithRetry(connection);
// 在重連后可以繼續(xù)執(zhí)行其它操作
// ......
} else {
throw e;
}
}
}
private Channel createChannelWithRetry(Connection connection) {
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
return connection.createChannel();
} catch (Exception e) {
System.err.println("Failed to create channel. Attempt " + attempt + " of " + MAX_RETRIES);
// 檢查錯(cuò)誤, 若仍是被限流導(dǎo)致的關(guān)閉錯(cuò)誤,則可以等待后繼續(xù)重試
// 也可移除本部分重試邏輯
if (attempt < MAX_RETRIES) {
try {
Thread.sleep(WAIT_TIME_MS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // 還原中斷狀態(tài)
}
} else {
throw new RuntimeException("Exceeded maximum retries to create channel", e);
}
}
}
throw new RuntimeException("This line should never be reached"); // 理論上不會(huì)到達(dá)這里
}
private boolean isChannelClosed(String errorMsg) {
// 判斷是否包含channel.close報(bào)錯(cuò),該報(bào)錯(cuò)代表通道已關(guān)閉。
// 可能涵蓋530,541等錯(cuò)誤信息。
if (errorMsg != null && errorMsg.contains("channel.close")) {
System.out.println("[ChannelClosed] Error details: " + errorMsg);
return true;
}
return false;
}
錯(cuò)誤碼信息:
錯(cuò)誤碼:reply-code=530
錯(cuò)誤信息:reply-text=denied for too many requests
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>
(reply-code=530, reply-text=denied for too many requests, ReqId:5FB4C999314635F952FCBFF6, ErrorHelp[dstQueue=XXX_test_queue,
srcExchange=Producer.ExchangeName,bindingKey=XXX_test_bk, http://mrw.so/6rNqO8], class-id=50, method-id=20)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
at java.lang.Thread.run(Thread.java:748)
實(shí)例秒級(jí)TPS峰值查詢(xún)
通過(guò)查詢(xún)實(shí)例實(shí)際使用的秒級(jí)TPS峰值,您可以了解業(yè)務(wù)的流量波動(dòng)情況和流量峰值,判斷實(shí)例規(guī)格是否滿(mǎn)足業(yè)務(wù)需求。
云消息隊(duì)列 RabbitMQ 版提供以下三種方式查詢(xún)實(shí)例的秒級(jí)TPS峰值:
查詢(xún)方式 | 說(shuō)明 | 查詢(xún)時(shí)間級(jí)別 | 查詢(xún)資源級(jí)別 |
優(yōu)勢(shì):
| 分鐘級(jí)TPS峰值 取值為1分鐘周期內(nèi),每秒鐘實(shí)例TPS的最大值。 | 實(shí)例級(jí)別TPS峰值 | |
| 秒級(jí)TPS峰值 |
| |
| 秒級(jí)TPS峰值 | 實(shí)例級(jí)別TPS峰值 |
實(shí)例TPS計(jì)算規(guī)則
以下接口調(diào)用時(shí),會(huì)被計(jì)算進(jìn)TPS流量中,即調(diào)用一次接口,計(jì)算為一次TPS。
ConnectionOpen、ChannelOpen
QueueDeclare、QueueDelete、QueueBind、QueueUnbind
ExchangeDeclare、ExchangeDelete
ExchangeBind、ExchangeUnBind
SendMessage、BasicConsume、BasicGet、BasicAck、BasicReject、BasicNack、BasicRecover
延時(shí)消息是云消息隊(duì)列 RabbitMQ 版的高級(jí)特性消息,發(fā)送延時(shí)消息時(shí),調(diào)用API接口的次數(shù)需要在普通消息的基礎(chǔ)上乘以5倍,消費(fèi)延時(shí)消息時(shí)與普通消息次數(shù)相同。
示例:1秒內(nèi)發(fā)送2條延時(shí)消息,消費(fèi)3條延時(shí)消息。則此時(shí)API調(diào)用TPS為:2×5+3=13次/秒。
統(tǒng)計(jì)SendMessage接口的調(diào)用次數(shù)時(shí),實(shí)際計(jì)算值為消息經(jīng)過(guò)路由后要存儲(chǔ)到的Queue的數(shù)量。
例如,發(fā)送1條到Fanout類(lèi)型Exchange的消息,最后要保存到10個(gè)Queue中,則SendMessage調(diào)用次數(shù)計(jì)算為10次。