日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

示例代碼

云消息隊列 RocketMQ 版5.x版本實例可兼容Java ONS 1.x SDK客戶端接入,您可以使用ONS 1.x SDK的接入5.x實例進行消息收發。本文為您介紹Java ONS 1.x SDK消息收發示例代碼。

重要
  • 推薦您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作為主力研發版本,和云消息隊列 RocketMQ 版5.x服務端完全兼容,提供了更全面的功能并支持更多增強特性。更多信息,請參見5.x系列SDK
  • RocketMQ 4.x/3.x系列SDK和ONS系列SDK后續僅做功能維護,建議僅存量業務使用。

Serverless版實例公網訪問版本說明

Serverless版實例使用公網訪問接入云消息隊列 RocketMQ 版時,需要保證使用的Java ONS 1.x SDK版本為1.9.0.Final及以上版本,并在消息收發代碼中補充如下內容:

properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");

說明

其中,InstanceId需要替換為您實際使用的實例ID。

普通消息收發示例

發送普通消息(同步發送)

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Date;
import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        /**
        * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
        * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
        * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
        * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
        */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
 
        // 設置發送超時時間,單位:毫秒。
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        Producer producer = ONSFactory.createProducer(properties);
        // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
        producer.start();

        // 循環發送消息。
        for (int i = 0; i < 100; i++){
            Message msg = new Message( 
                // 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
                // 普通消息所屬的Topic,切勿使用普通消息的Topic來收發其他類型的消息。
                "TopicTestMQ",
                // Message Tag可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
                // Tag的具體格式和設置方法,請參見消息過濾。
                "TagA",
                // Message Body可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預。
                // 需要Producer與Consumer協商好一致的序列化和反序列化方式。
                "Hello MQ".getBytes());
            // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
            // 以方便您在無法正常收到消息情況下,可通過消息隊列RocketMQ版控制臺查詢消息并補發。
            // 注意:不設置也不會影響消息正常收發。
            msg.setKey("ORDERID_" + i);

            try {
                SendResult sendResult = producer.send(msg);
                // 同步發送消息,只要不拋異常就是成功。
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
        producer.shutdown();
    }
}               

發送普通消息(異步發送)

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class ProducerTest {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        /**
        * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
        * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
        * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
        * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
        */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
  
        //設置發送超時時間,單位毫秒。
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");

        Producer producer = ONSFactory.createProducer(properties);
        // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
        producer.start();

        Message msg = new Message(
                // 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
                // 普通消息所屬的Topic,切勿使用普通消息的Topic來收發其他類型的消息。
                "TopicTestMQ",
                // Message Tag,可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
                "TagA",
                // Message Body,任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
                "Hello MQ".getBytes());

        // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。 以方便您在無法正常收到消息情況下,可通過消息隊列RocketMQ版控制臺查詢消息并補發。
        // 注意:不設置也不會影響消息正常收發。
        msg.setKey("ORDERID_100");

        // 異步發送消息, 發送結果通過callback返回給客戶端。
        producer.sendAsync(msg, new SendCallback() {
            @Override
            public void onSuccess(final SendResult sendResult) {
                // 消息發送成功。
                System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
                System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
            }
        });

        // 阻塞當前線程3秒,等待異步發送結果。
        TimeUnit.SECONDS.sleep(3);

        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
        producer.shutdown();
    }
}                

發送普通消息(單向發送)

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        /**
        * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
        * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
        * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
        * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
        */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。

        // 設置發送超時時間,單位:毫秒。
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");

        Producer producer = ONSFactory.createProducer(properties);
        // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
        producer.start();
        // 循環發送消息。
        for (int i = 0; i < 100; i++){
            Message msg = new Message(
                    // 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
                    // 普通消息所屬的Topic,切勿使用普通消息的Topic來收發其他類型的消息。
                    "TopicTestMQ",
                    // Message Tag,
                    // 可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
                    "TagA",
                    // Message Body
                    // 任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
                    "Hello MQ".getBytes());

            // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
            // 以方便您在無法正常收到消息情況下,可通過消息隊列RocketMQ版控制臺查詢消息并補發。
            // 注意:不設置也不會影響消息正常收發。
            msg.setKey("ORDERID_" + i);

            // 由于在oneway方式發送消息時沒有請求應答處理,如果出現消息發送失敗,則會因為沒有重試而導致數據丟失。若數據不可丟,建議選用可靠同步或可靠異步發送方式。
            producer.sendOneway(msg);
        }

        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
        producer.shutdown();
    }
}

發送普通消息(多線程發送)

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;

import java.util.Date;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        // producer實例配置初始化。
        Properties properties = new Properties();
        // 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        /**
         * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
         * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
         * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
         * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
         */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。

        // 設置發送超時時間,單位:毫秒。 
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();

        // 創建的Producer和Consumer對象為線程安全的,可以在多線程間進行共享,避免每個線程創建一個實例。

        // thread和anotherThread共享Producer對象,并發地發送消息至消息隊列RocketMQ版。
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                Message msg = new Message(
                        // 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
                        // 普通消息所屬的Topic,切勿使用普通消息的Topic來收發其他類型的消息。
                        "TopicTestMQ",
                        // Message Tag可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
                        "TagA",
                        // Message Body可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預。
                        // 需要Producer與Consumer協商好一致的序列化和反序列化方式。
                        "Hello MQ".getBytes());
                try {
                    SendResult sendResult = producer.send(msg);
                    // 同步發送消息,只要不拋異常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                    e.printStackTrace();
                }
            }
        });
        thread.start();


        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                try {
                    SendResult sendResult = producer.send(msg);
                    // 同步發送消息,只要不拋異常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();


        // (可選)Producer實例若不再使用時,可將Producer關閉,進行資源釋放。
        // producer.shutdown();
    }
}

訂閱普通消息(PushConsumer)

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        /**
         * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
         * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
         * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
         * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
         */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。

        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        // 集群訂閱方式(默認)。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 廣播訂閱方式。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //訂閱多個Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        // 訂閱另外一個Topic,如需取消訂閱該Topic,請刪除該部分的訂閱代碼,重新啟動消費端即可。
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // 訂閱全部Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");
    }
} 

訂閱普通消息(PushConsumer批量消費)

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

public class SimpleBatchConsumer {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        // 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
        consumerProperties.put(PropertyKeyConst.GROUP_ID, "XXX");
        /**
        * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
        * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
        * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
        * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
        */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        consumerProperties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        consumerProperties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。
        
        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        consumerProperties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");

        // 設置批量消費最大消息數量,當指定Topic的消息數量已經攢夠128條,SDK立即執行回調進行消費。默認值:32,取值范圍:1~1024。
        consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
        // 設置批量消費最大等待時長,當等待時間達到10秒,SDK立即執行回調進行消費。默認值:0,取值范圍:0~450,單位:秒。
        consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));

        BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
        batchConsumer.subscribe("TopicTestMQ", "TagA", new BatchMessageListener() {

             @Override
            public Action consume(final List<Message> messages, ConsumeContext context) {
                System.out.printf("Batch-size: %d\n", messages.size());
           // 批量消息處理。
                return Action.CommitMessage;
            }
        });
        // 啟動batchConsumer。
        batchConsumer.start();
        System.out.println("Consumer start success.");

        // 等待固定時間防止進程退出。
        try {
            Thread.sleep(200000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}  

訂閱普通消息(PullConsumer)

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;

public class PullConsumerClient {
    public static void main(String[] args){
        Properties properties = new Properties();
        // 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
        /**
        * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
        * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
        * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
        * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
        */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。

        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        PullConsumer consumer = ONSFactory.createPullConsumer(properties);
        // 啟動Consumer。
        consumer.start();
        // 獲取topic-xxx下的所有分區。
        Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
        // 指定需要拉取消息的分區。
        consumer.assign(topicPartitions);

        while (true) {
            // 拉取消息,超時時間為3000 ms。
            List<Message> messages = consumer.poll(3000);
            System.out.printf("Received message: %s %n", messages);
        }
    }
}

順序消息收發示例

發送順序消息

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;

import java.util.Date;
import java.util.Properties;

public class ProducerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        /**
        * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
        * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
        * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
        * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
        */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。

        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        OrderProducer producer = ONSFactory.createOrderProducer(properties);
        // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
        producer.start();
        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(
                    // 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
                    "Order_global_topic",
                    // Message Tag,可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
                    "TagA",
                    // Message Body,可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
                    "send order global msg".getBytes()
            );
            // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
            // 以方便您在無法正常收到消息情況下,可通過消息隊列RocketMQ版控制臺查詢消息并補發。
            // 注意:不設置也不會影響消息正常收發。
            msg.setKey(orderId);
            // 分區順序消息中區分不同分區的關鍵字段,Sharding Key與普通消息的key是完全不同的概念。
            // 全局順序消息,該字段可以設置為任意非空字符串。
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                // 發送消息,只要不拋異常就是成功。
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
        producer.shutdown();
    }

}

訂閱順序消息

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;

import java.util.Properties;


public class ConsumerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 設置為您在消息隊列RocketMQ版控制臺創建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        /**
        * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
        * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
        * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
        * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
        */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。

        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        // 順序消息消費失敗進行重試前的等待時間,單位(毫秒),取值范圍:10毫秒~30000毫秒。
        properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
        // 消息消費失敗時的最大重試次數。
        properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");

        // 在訂閱消息前,必須調用start方法來啟動Consumer,只需調用一次即可。
        OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

        consumer.subscribe(
                // 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
                "Order_global_topic",
                // 訂閱指定Topic下的Tags:
                // 1. * 表示訂閱所有消息。
                // 2. TagA || TagB || TagC表示訂閱TagA或TagB或TagC的消息。
                "*",
                new MessageOrderListener() {
                    /**
                     * 1. 消息消費處理失敗或者處理出現異常,返回OrderAction.Suspend。
                     * 2. 消息處理成功,返回OrderAction.Success。
                     */
                    @Override
                    public OrderAction consume(Message message, ConsumeOrderContext context) {
                        System.out.println(message);
                        return OrderAction.Success;
                    }
                });

        consumer.start();
    }
}

定時消息收發示例

發送定時消息

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        /**
        * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
        * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
        * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
        * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
        */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。

        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        Producer producer = ONSFactory.createProducer(properties);
        // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
        producer.start();
        Message msg = new Message(
                // 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
                "Topic",
                // Message Tag可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版的服務器過濾。
                "tag",
                // Message Body可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
                "Hello MQ".getBytes());
        // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
        // 以方便您在無法正常收到消息情況下,可通過消息隊列RocketMQ版控制臺查詢消息并補發。
        // 注意:不設置也不會影響消息正常收發。
        msg.setKey("ORDERID_100");

        try {
            // 定時消息,單位毫秒(ms),在指定時間戳(當前時間之后)進行投遞,例如2016-03-07 16:21:00投遞。如果被設置成當前時間戳之前的某個時刻,消息將立即被投遞給消費者。
            long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();

            msg.setStartDeliverTime(timeStamp);
            // 發送消息,只要不拋異常就是成功。
            SendResult sendResult = producer.send(msg);
            System.out.println("Message Id:" + sendResult.getMessageId());
        }
        catch (Exception e) {
            // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            e.printStackTrace();
        }

        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
        producer.shutdown();
    }
}       

訂閱定時消息

訂閱定時消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息

延時消息收發示例

發送延時消息

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;

import java.util.Date;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        /**
        * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
        * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
        * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
        * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
        */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。

        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");

        Producer producer = ONSFactory.createProducer(properties);
        // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
        producer.start();
        Message msg = new Message( 
                // 設置為您在消息隊列RocketMQ版控制臺上創建的Topic。
                "Topic",
                // Message Tag,可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版服務器過濾。
                "tag",
                // Message Body可以是任何二進制形式的數據,消息隊列RocketMQ版不做任何干預,需要Producer與Consumer協商好一致的序列化和反序列化方式。
                "Hello MQ".getBytes());
        // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
        // 以方便您在無法正常收到消息情況下,可通過控制臺查詢消息并補發。
        // 注意:不設置也不會影響消息正常收發。
        msg.setKey("ORDERID_100");
        try {
            // 延時消息,在指定延遲時間(當前時間之后)進行投遞。最大可設置延遲40天投遞,單位毫秒(ms)。
            // 以下示例表示消息在3秒后投遞。
            long delayTime = System.currentTimeMillis() + 3000;

            // 設置消息需要被投遞的時間。
            msg.setStartDeliverTime(delayTime);

            SendResult sendResult = producer.send(msg);
            // 同步發送消息,只要不拋異常就是成功。
            if (sendResult != null) {
            System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
            }
            } catch (Exception e) {
            // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            e.printStackTrace();
        }
        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統內存,若您需要頻繁發送消息,則無需銷毀Producer對象。
        producer.shutdown();
    }
}           

訂閱延時消息

訂閱延時消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息

事務消息收發示例

發送事務消息

package com.aliyun.openservices.tcp.example.producer;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;

import java.util.Date;
import java.util.Properties;

public class SimpleTransactionProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 您在消息隊列RocketMQ版控制臺創建的Group ID。注意:事務消息的Group ID不能與其他類型消息的Group ID共用。
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        
        /**
        * 如果是使用公網接入點訪問,則必須設置AccessKey和SecretKey,里面填寫實例的用戶名和密碼。實例用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
        * 注意!!!這里填寫的不是阿里云賬號的AccessKey ID和AccessKey Secret,請務必區分開。
        * 如果是在阿里云ECS內網訪問,則無需配置,服務端會根據內網VPC信息智能獲取。
        * 如果實例類型為Serverlesss實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
        */
        // 實例用戶名和密碼在消息隊列RocketMQ版控制臺訪問控制的智能身份識別頁簽中獲取。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK訪問RocketMQ 5.x實例時,InstanceID屬性不需要設置,否則會導致失敗。

        // 設置為您從消息隊列RocketMQ版控制臺獲取的接入點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制臺提供的域名和端口即可,請勿添加http://或https://前綴標識,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");

        // 初始化事務消息Producer時,需要注冊一個本地事務狀態的Checker。
        LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
        TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
        transactionProducer.start();

        Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());

        for (int i = 0; i < 3; i++) {
            try{
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        System.out.println("執行本地事務,并根據本地事務的狀態提交TransactionStatus。");
                        return TransactionStatus.CommitTransaction;
                    }
                }, null);
                assert sendResult != null;
            }catch (ONSClientException e){
                // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
                System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        System.out.println("Send transaction message success.");
    }
}
// 本地事務檢查器。
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
   
    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("收到事務消息的回查請求,MsgId: " + msg.getMsgID());
        return TransactionStatus.CommitTransaction;
    }
}

訂閱事務消息

訂閱事務消息的示例代碼和訂閱普通消息一樣,請參見訂閱普通消息