云消息隊列 RocketMQ 版5.x版本實例可兼容Java ONS 1.x SDK客戶端接入,您可以使用ONS 1.x SDK的接入5.x實例進行消息收發。本文為您介紹Java ONS 1.x SDK消息收發示例代碼。
重要
- 推薦您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作為主力研發版本,和云消息隊列 RocketMQ 版5.x服務端完全兼容,提供了更全面的功能并支持更多增強特性。更多信息,請參見5.x系列SDK。
- RocketMQ 4.x/3.x系列SDK和ONS系列SDK后續僅做功能維護,建議僅存量業務使用。
Serverless版實例公網訪問版本說明
Serverless版實例使用公網訪問接入云消息隊列 RocketMQ 版時,需要保證使用的Java ONS 1.x SDK版本為1.9.0.Final及以上版本,并在消息收發代碼中補充如下內容:
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");
說明
其中,InstanceId
需要替換為您實際使用的實例ID。
普通消息收發示例
發送普通消息(同步發送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Date;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置發送超時時間,單位:毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
// 循環發送消息。
for (int i = 0; i < 100; i++){
Message msg = new Message(
// 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
// 普通消息所屬的Topic,切勿使用普通消息的Topic來收發其他類型的消息。
"TopicTestMQ",
// Message Tag可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
// Tag的具體格式和設置方法,請參見消息過濾。
"TagA",
// Message Body可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預。
// 需要Producer與Consumer協商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
// 以方便您在無法正常收到消息情況下,可通過消息隊列RocketMQ版控制臺查詢消息并補發。
// 注意:不設置也不會影響消息正常收發。
msg.setKey("ORDERID_" + i);
try {
SendResult sendResult = producer.send(msg);
// 同步發送消息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
producer.shutdown();
}
}
發送普通消息(異步發送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class ProducerTest {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
//設置發送超時時間,單位毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
Message msg = new Message(
// 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
// 普通消息所屬的Topic,切勿使用普通消息的Topic來收發其他類型的消息。
"TopicTestMQ",
// Message Tag,可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
"TagA",
// Message Body,任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 設置代表消息的業務關鍵屬性,請盡可能全局唯一。 以方便您在無法正常收到消息情況下,可通過消息隊列RocketMQ版控制臺查詢消息并補發。
// 注意:不設置也不會影響消息正常收發。
msg.setKey("ORDERID_100");
// 異步發送消息, 發送結果通過callback返回給客戶端。
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// 消息發送成功。
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// 阻塞當前線程3秒,等待異步發送結果。
TimeUnit.SECONDS.sleep(3);
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
producer.shutdown();
}
}
發送普通消息(單向發送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置發送超時時間,單位:毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
// 循環發送消息。
for (int i = 0; i < 100; i++){
Message msg = new Message(
// 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
// 普通消息所屬的Topic,切勿使用普通消息的Topic來收發其他類型的消息。
"TopicTestMQ",
// Message Tag,
// 可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
"TagA",
// Message Body
// 任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
// 以方便您在無法正常收到消息情況下,可通過消息隊列RocketMQ版控制臺查詢消息并補發。
// 注意:不設置也不會影響消息正常收發。
msg.setKey("ORDERID_" + i);
// 由于在oneway方式發送消息時沒有請求應答處理,如果出現消息發送失敗,則會因為沒有重試而導致數據丟失。若數據不可丟,建議選用可靠同步或可靠異步發送方式。
producer.sendOneway(msg);
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
producer.shutdown();
}
}
發送普通消息(多線程發送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Date;
import java.util.Properties;
public class SharedProducer {
public static void main(String[] args) {
// producer實例配置初始化。
Properties properties = new Properties();
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置發送超時時間,單位:毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
Producer producer = ONSFactory.createProducer(properties);
producer.start();
// 創建的Producer和Consumer對象為線程安全的,可以在多線程間進行共享,避免每個線程創建一個實例。
// thread和anotherThread共享Producer對象,并發地發送消息至消息隊列RocketMQ版。
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Message msg = new Message(
// 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
// 普通消息所屬的Topic,切勿使用普通消息的Topic來收發其他類型的消息。
"TopicTestMQ",
// Message Tag可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
"TagA",
// Message Body可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預。
// 需要Producer與Consumer協商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
try {
SendResult sendResult = producer.send(msg);
// 同步發送消息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
try {
SendResult sendResult = producer.send(msg);
// 同步發送消息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
});
anotherThread.start();
// (可選)Producer實例若不再使用時,可將Producer關閉,進行資源釋放。
// producer.shutdown();
}
}
訂閱普通消息(PushConsumer)
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 集群訂閱方式(默認)。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// 廣播訂閱方式。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //訂閱多個Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
// 訂閱另外一個Topic,如需取消訂閱該Topic,請刪除該部分的訂閱代碼,重新啟動消費端即可。
consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // 訂閱全部Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
訂閱普通消息(PushConsumer批量消費)
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
public class SimpleBatchConsumer {
public static void main(String[] args) {
Properties consumerProperties = new Properties();
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
consumerProperties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
consumerProperties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
consumerProperties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
consumerProperties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 設置批量消費最大消息數量,當指定Topic的消息數量已經攢夠128條,SDK立即執行回調進行消費。默認值:32,取值范圍:1~1024。
consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
// 設置批量消費最大等待時長,當等待時間達到10秒,SDK立即執行回調進行消費。默認值:0,取值范圍:0~450,單位:秒。
consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
batchConsumer.subscribe("TopicTestMQ", "TagA", new BatchMessageListener() {
@Override
public Action consume(final List<Message> messages, ConsumeContext context) {
System.out.printf("Batch-size: %d\n", messages.size());
// 批量消息處理。
return Action.CommitMessage;
}
});
// 啟動batchConsumer。
batchConsumer.start();
System.out.println("Consumer start success.");
// 等待固定時間防止進程退出。
try {
Thread.sleep(200000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
訂閱普通消息(PullConsumer)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class PullConsumerClient {
public static void main(String[] args){
Properties properties = new Properties();
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
PullConsumer consumer = ONSFactory.createPullConsumer(properties);
// 啟動Consumer。
consumer.start();
// 獲取topic-xxx下的所有分區。
Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
// 指定需要拉取消息的分區。
consumer.assign(topicPartitions);
while (true) {
// 拉取消息,超時時間為3000 ms。
List<Message> messages = consumer.poll(3000);
System.out.printf("Received message: %s %n", messages);
}
}
}
順序消息收發示例
發送順序消息
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import java.util.Date;
import java.util.Properties;
public class ProducerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
OrderProducer producer = ONSFactory.createOrderProducer(properties);
// 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
Message msg = new Message(
// 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
"Order_global_topic",
// Message Tag,可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
"TagA",
// Message Body,可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
"send order global msg".getBytes()
);
// 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
// 以方便您在無法正常收到消息情況下,可通過消息隊列RocketMQ版控制臺查詢消息并補發。
// 注意:不設置也不會影響消息正常收發。
msg.setKey(orderId);
// 分區順序消息中區分不同分區的關鍵字段,Sharding Key與普通消息的key是完全不同的概念。
// 全局順序消息,該字段可以設置為任意非空字符串。
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = producer.send(msg, shardingKey);
// 發送消息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
producer.shutdown();
}
}
訂閱順序消息
package com.aliyun.openservices.ons.example.order;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import java.util.Properties;
public class ConsumerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 順序消息消費失敗進行重試前的等待時間,單位(毫秒),取值范圍:10毫秒~30000毫秒。
properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
// 消息消費失敗時的最大重試次數。
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
// 在訂閱消息前,必須調用start方法來啟動Consumer,只需調用一次即可。
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
consumer.subscribe(
// 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
"Order_global_topic",
// 訂閱指定Topic下的Tags:
// 1. * 表示訂閱所有消息。
// 2. TagA || TagB || TagC表示訂閱TagA或TagB或TagC的消息。
"*",
new MessageOrderListener() {
/**
* 1. 消息消費處理失敗或者處理出現異常,返回OrderAction.Suspend。
* 2. 消息處理成功,返回OrderAction.Success。
*/
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
System.out.println(message);
return OrderAction.Success;
}
});
consumer.start();
}
}
定時消息收發示例
發送定時消息
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
Message msg = new Message(
// 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
"Topic",
// Message Tag可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
"tag",
// Message Body可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
// 以方便您在無法正常收到消息情況下,可通過消息隊列RocketMQ版控制臺查詢消息并補發。
// 注意:不設置也不會影響消息正常收發。
msg.setKey("ORDERID_100");
try {
// 定時消息,單位毫秒(ms),在指定時間戳(當前時間之后)進行投遞,例如2016-03-07 16:21:00投遞。如果被設置成當前時間戳之前的某個時刻,消息將立即被投遞給消費者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// 發送消息,只要不拋異常就是成功。
SendResult sendResult = producer.send(msg);
System.out.println("Message Id:" + sendResult.getMessageId());
}
catch (Exception e) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
producer.shutdown();
}
}
訂閱定時消息
訂閱定時消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息。
延時消息收發示例
發送延時消息
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Date;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
Message msg = new Message(
// 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
"Topic",
// Message Tag,可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版服務器過濾。
"tag",
// Message Body可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
// 以方便您在無法正常收到消息情況下,可通過控制臺查詢消息并補發。
// 注意:不設置也不會影響消息正常收發。
msg.setKey("ORDERID_100");
try {
// 延時消息,在指定延遲時間(當前時間之后)進行投遞。最大可設置延遲40天投遞,單位毫秒(ms)。
// 以下示例表示消息在3秒后投遞。
long delayTime = System.currentTimeMillis() + 3000;
// 設置消息需要被投遞的時間。
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// 同步發送消息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
producer.shutdown();
}
}
訂閱延時消息
訂閱延時消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息。
事務消息收發示例
發送事務消息
package com.aliyun.openservices.tcp.example.producer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Date;
import java.util.Properties;
public class SimpleTransactionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在消息隊列RocketMQ版控制臺創建的Group ID。注意:事務消息的Group ID不能與其他類型消息的Group ID共用。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
// 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
// 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 初始化事務消息Producer時,需要注冊一個本地事務狀態的Checker。
LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
transactionProducer.start();
Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());
for (int i = 0; i < 3; i++) {
try{
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("執行本地事務,并根據本地事務的狀態提交TransactionStatus。");
return TransactionStatus.CommitTransaction;
}
}, null);
assert sendResult != null;
}catch (ONSClientException e){
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
System.out.println("Send transaction message success.");
}
}
// 本地事務檢查器。
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("收到事務消息的回查請求,MsgId: " + msg.getMsgID());
return TransactionStatus.CommitTransaction;
}
}
訂閱事務消息
訂閱事務消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息。
文檔內容是否對您有幫助?