本文介紹如何通過云消息隊列 RocketMQ 版的Java SDK訂閱消息。
訂閱方式
云消息隊列 RocketMQ 版支持以下兩種訂閱方式:
集群訂閱
同一個Group ID所標識的所有Consumer平均分攤消費消息。例如某個Topic有9條消息,一個Group ID有3個Consumer實例,那么在集群消費模式下每個實例平均分攤,只消費其中的3條消息。設置方式如下所示。
// 集群訂閱方式設置(不設置的情況下,默認為集群訂閱方式)。 properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
廣播訂閱
同一個Group ID所標識的所有Consumer都會各自消費某條消息一次。例如某個Topic有9條消息,一個Group ID有3個Consumer實例,那么在廣播消費模式下每個實例都會各自消費9條消息。設置方式如下所示。
// 廣播訂閱方式設置。 properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
消息獲取方式
云消息隊列 RocketMQ 版支持以下兩種消息獲取方式:
Push:消息由云消息隊列 RocketMQ 版推送至Consumer。Push方式下,云消息隊列 RocketMQ 版還支持批量消費功能,可以將批量消息統一推送至Consumer進行消費。更多信息,請參見批量消費。
Pull:消息由Consumer主動從云消息隊列 RocketMQ 版拉取。
Pull Consumer提供了更多接收消息的選擇。相比于Push Consumer,您可以使用Pull Consumer更加自由地控制消息拉取。具體的接口信息請參見接口和參數說明。
如需使用Pull Consumer,請確保您的實例為企業鉑金版。
Pull Consumer不支持公網訪問,僅支持VPC網絡訪問。
示例代碼
具體的示例代碼,請以云消息隊列 RocketMQ 版代碼庫為準。以下分別列舉Push和Pull兩種消息獲取方式的示例代碼。
Push方式
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); // 您在消息隊列RocketMQ版控制臺創建的Group ID。 properties.put(PropertyKeyConst.GROUP_ID, "XXX"); // 請確保環境變量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設置。 // AccessKey ID,阿里云身份驗證標識。 properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // AccessKey Secret,阿里云身份驗證密鑰。 properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // 設置TCP接入域名,進入消息隊列RocketMQ版控制臺實例詳情頁面的接入點區域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); // 集群訂閱方式(默認)。 // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); // 廣播訂閱方式。 // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); Consumer consumer = ONSFactory.createConsumer(properties); //訂閱多個Tag。 consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); // 訂閱另外一個Topic,如需取消訂閱該Topic,請刪除該部分的訂閱代碼,重新啟動消費端即可。 //訂閱全部Tag。 consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }
Push方式(批量消費)
重要配置云消息隊列 RocketMQ 版的批量消費功能,請升級TCP Java SDK到1.8.7.3或以上版本,詳細版本說明和獲取方式,請參見Java SDK版本說明。
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.batch.BatchConsumer; import com.aliyun.openservices.ons.api.batch.BatchMessageListener; import java.util.List; import java.util.Properties; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.tcp.example.MqConfig; public class SimpleBatchConsumer { public static void main(String[] args) { Properties consumerProperties = new Properties(); // 您在消息隊列RocketMQ版控制臺創建的Group ID。 consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID); // 請確保環境變量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設置。 // AccessKey ID,阿里云身份驗證標識。 properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // AccessKey Secret,阿里云身份驗證密鑰。 properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // 設置TCP接入域名,進入消息隊列RocketMQ版控制臺實例詳情頁面的接入點區域查看。 consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR); // 設置批量消費最大消息數量,當指定Topic的消息數量已經攢夠128條,SDK立即執行回調進行消費。默認值:32,取值范圍:1~1024。 consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128)); // 設置批量消費最大等待時長,當等待時間達到10秒,SDK立即執行回調進行消費。默認值:0,取值范圍:0~450,單位:秒。 consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10)); BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties); batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() { @Override public Action consume(final List<Message> messages, ConsumeContext context) { System.out.printf("Batch-size: %d\n", messages.size()); // 批量消息處理。 return Action.CommitMessage; } }); //啟動batchConsumer。 batchConsumer.start(); System.out.println("Consumer start success."); //等待固定時間防止進程退出。 try { Thread.sleep(200000); } catch (InterruptedException e) { e.printStackTrace(); } } }
Pull方式
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.PullConsumer; import com.aliyun.openservices.ons.api.TopicPartition; import java.util.List; import java.util.Properties; import java.util.Set; public class PullConsumerClient { public static void main(String[] args){ Properties properties = new Properties(); // 您在消息隊列RocketMQ版控制臺創建的Group ID。 properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx"); // 請確保環境變量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設置。 // AccessKey ID,阿里云身份驗證標識。 properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // AccessKey Secret,阿里云身份驗證密鑰。 properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // 設置TCP接入域名,進入消息隊列RocketMQ版控制臺實例詳情頁面的接入點區域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "xxxxx"); PullConsumer consumer = ONSFactory.createPullConsumer(properties); // 啟動Consumer。 consumer.start(); // 獲取topic-xxx下的所有分區。 Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx"); // 指定需要拉取消息的分區。 consumer.assign(topicPartitions); while (true) { // 拉取消息,超時時間為3000 ms。 List<Message> messages = consumer.poll(3000); System.out.printf("Received message: %s %n", messages); } } }
分區和位點的詳細說明,請參見基本概念。
更多信息
云消息隊列 RocketMQ 版消費端流控的最佳實踐,請參見消息隊列RocketMQ客戶端流控設計。