日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Headers Exchange綁定

更新時間:

如果不使用Routing Key去做綁定,而是根據消息Headers屬性和Binding Headers屬性的匹配規則路由消息,需要使用Headers Exchange。本文介紹Headers Exchange的使用示例。

背景信息

  • 向Headers Exchange發送消息時,可以在Headers中定義鍵值對。Headers Exchange將根據消息Headers屬性鍵值對和綁定屬性鍵值對的匹配情況路由消息。

    匹配算法由一個特殊的綁定屬性鍵值對控制。該屬性為x-match,只有以下兩種取值:

    • all:所有除x-match以外的綁定屬性鍵值對必須和消息Headers屬性鍵值對匹配才會路由消息。

    • any:只要有一組除x-match以外的綁定屬性鍵值對和消息Headers屬性鍵值對匹配就會路由消息。

    更多信息,請參見Headers Exchange。

  • 綁定成功后,您可以在云消息隊列 RabbitMQ 版控制臺消息查詢頁面,按照Queue查詢消息,驗證綁定結果。具體操作,請參見查詢消息。

示例代碼

Headers Exchange綁定Java示例代碼如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class HeadersExchange {
    // 設置實例的接入點。
    private static String host = "xxx.xxx.aliyuncs.com";
    //設置實例的靜態用戶名密碼。
    private static String userName = "${UserName}";
    private static String password = "${PassWord}";
    //設置實例的Vhost。
    private static String vhost = "${VirtualHost}";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(userName);
        factory.setPassword(password);
        // 注意:開啟Connection才能自動恢復。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(vhost);
        // 默認端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于網絡環境合理設置超時時間。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        // 請盡可能使用長期存活的Connection,以免每次發送消息都創建新的Connection,導致大量的網絡資源和服務端資源消耗,甚至引起服務端SYN Flood防護。
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "${ExchangeName}";
        String exchangeType = "headers";
        String queueName = "${QueueName}";
        String routingKey = "${RoutingKey}";


        Map<String, Object> argument = new HashMap<>();
        argument.put("format", "pdf");
        argument.put("type", "log");
        argument.put("x-match", "all");

        channel.queueDeclare(queueName, true, false, false, null);
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey, argument);


        // 當mandatory=true,消息沒有路由時,將會返回給客戶端。
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("no route, msgId=" + properties.getMessageId());
            }
        });

        // 設置消息Headers屬性鍵值對。
        // 1.當注釋(type, log)鍵值對,僅(format, pdf)一組鍵值對與argument匹配,執行該代碼后,消息將無法接收到。
        // 2.當注釋(type, log)鍵值對被取消,即(format, pdf)和(type, log)兩組鍵值對與argument完全匹配,執行該代碼后,將接收到消息。
        Map<String, Object> headers = new HashMap<>();
        headers.put("format", "pdf");
        //headers.put("type", "log");

        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();
        channel.basicPublish(exchangeName, routingKey, true, props, ("消息發送Body").getBytes(StandardCharsets.UTF_8));

        Thread.sleep(10000);
        connection.close();
    }
}