Headers Exchange綁定
更新時間:
如果不使用Routing Key去做綁定,而是根據消息Headers屬性和Binding Headers屬性的匹配規則路由消息,需要使用Headers Exchange。本文介紹Headers Exchange的使用示例。
背景信息
向Headers Exchange發送消息時,可以在Headers中定義鍵值對。Headers Exchange將根據消息Headers屬性鍵值對和綁定屬性鍵值對的匹配情況路由消息。
匹配算法由一個特殊的綁定屬性鍵值對控制。該屬性為x-match,只有以下兩種取值:
all:所有除x-match以外的綁定屬性鍵值對必須和消息Headers屬性鍵值對匹配才會路由消息。
any:只要有一組除x-match以外的綁定屬性鍵值對和消息Headers屬性鍵值對匹配就會路由消息。
更多信息,請參見Headers Exchange。
綁定成功后,您可以在云消息隊列 RabbitMQ 版控制臺的消息查詢頁面,按照Queue查詢消息,驗證綁定結果。具體操作,請參見查詢消息。
示例代碼
Headers Exchange綁定Java示例代碼如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class HeadersExchange {
// 設置實例的接入點。
private static String host = "xxx.xxx.aliyuncs.com";
//設置實例的靜態用戶名密碼。
private static String userName = "${UserName}";
private static String password = "${PassWord}";
//設置實例的Vhost。
private static String vhost = "${VirtualHost}";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(userName);
factory.setPassword(password);
// 注意:開啟Connection才能自動恢復。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(vhost);
// 默認端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
// 基于網絡環境合理設置超時時間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// 請盡可能使用長期存活的Connection,以免每次發送消息都創建新的Connection,導致大量的網絡資源和服務端資源消耗,甚至引起服務端SYN Flood防護。
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "${ExchangeName}";
String exchangeType = "headers";
String queueName = "${QueueName}";
String routingKey = "${RoutingKey}";
Map<String, Object> argument = new HashMap<>();
argument.put("format", "pdf");
argument.put("type", "log");
argument.put("x-match", "all");
channel.queueDeclare(queueName, true, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey, argument);
// 當mandatory=true,消息沒有路由時,將會返回給客戶端。
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("no route, msgId=" + properties.getMessageId());
}
});
// 設置消息Headers屬性鍵值對。
// 1.當注釋(type, log)鍵值對,僅(format, pdf)一組鍵值對與argument匹配,執行該代碼后,消息將無法接收到。
// 2.當注釋(type, log)鍵值對被取消,即(format, pdf)和(type, log)兩組鍵值對與argument完全匹配,執行該代碼后,將接收到消息。
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
//headers.put("type", "log");
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();
channel.basicPublish(exchangeName, routingKey, true, props, ("消息發送Body").getBytes(StandardCharsets.UTF_8));
Thread.sleep(10000);
connection.close();
}
}
文檔內容是否對您有幫助?