針對初次接觸云消息隊列 RocketMQ 版的工程師,本文以TCP協議下的Java SDK為例,提供操作示例幫助您從零開始搭建云消息隊列 RocketMQ 版測試工程。Demo工程包含普通消息、順序消息、事務消息、定時和延時消息的測試代碼,以及相關Spring的配置。

前提條件

  • 安裝IDE。

    您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。

    請下載IntelliJ IDEA Ultimate版本,并參見IntelliJ IDEA說明進行安裝。更多信息,請參見下載地址

  • 下載Demo工程。

    下載到本地并解壓后即可看到本地新增了mq-demo-master文件夾,該文件夾包括純Java、Spring以及Spring Boot的示例代碼。更多信息,請參見rocketmq-demo

  • 下載安裝JDK。更多信息,請參見JDK下載地址

配置Demo工程

  1. 將Demo工程文件導入IntelliJ IDEA。
  2. 創建資源。

    您需要先到消息隊列RocketMQ版控制臺創建所需資源,包括實例、Topic、Group ID(GID),以及鑒權需要的AccessKey ID(AK)和AccessKey Secret(SK)。

    更多詳細信息和操作指導,請參見創建資源

  3. 配置Demo。
    您需將在步驟2中創建好的資源信息配置到MqConfig類和common.xml文件中。
    1. 修改pom.xml文件。建議將ons-client版本號修改為最新版本。具體版本信息,請參見版本說明
      <dependency>
          <groupId>com.aliyun.openservices</groupId>
          <artifactId>ons-client</artifactId>
          <!--建議替換為Java SDK的最新版本號-->
          <version>1.8.8.5.Final</version>
      </dependency>                            
    2. 按以下說明配置MqConfig類。
      public static final String TOPIC = "您已創建的Topic";
      public static final String GROUP_ID = "您已創建的Group ID";
      public static final String ORDER_TOPIC = "您已創建的用于收發順序消息的Topic";
      public static final String ORDER_GROUP_ID = "您已創建的用于收發順序消息的Group ID";
      public static final String ACCESS_KEY = "您的阿里云賬號的AccessKey ID,獲取方式,請參見創建AccessKey";
      public static final String SECRET_KEY = "您的阿里云賬號的AccessKey Secret,獲取方式,請參見創建AccessKey";
      public static final String TAG = "您自定義的消息Tag屬性";
      public static final String NAMESRV_ADDR = "您已創建的云消息隊列 RocketMQ 版實例的TCP接入點,可在云消息隊列 RocketMQ 版控制臺實例詳情頁面的接入點區域查看";                              
      說明
      • 如果RAM用戶擁有該Topic的權限以及自己的AccessKey,那么也可以使用RAM用戶的AccessKey。
      • 參數與接口的更多信息,請參見接口和參數說明
    3. 配置common.xml
      <props>
          <prop key="AccessKey">XXX</prop> <!-- 使用前請修改這些資源信息 -->
          <prop key="SecretKey">XXX</prop>
          <prop key="GROUP_ID">XXX</prop>
          <prop key="Topic">XXX</prop>
          <prop key="NAMESRV_ADDR">XXX</prop>
      </props>

以Main方式運行Demo

  1. 發送消息。
    • 發送普通消息:
      • 以純Java方式發送普通消息:運行SimpleMQProducer類。
      • 以Spring方式發送普通消息:運行ProducerClient類。
      • 以Spring Boot方式發送普通消息:運行ProducerClient類。
    • 發送事務消息:
      • 以純Java方式發送事務消息:運行SimpleTransactionProducer類。

        LocalTransactionCheckerImpl類為本地事務check接口類,用于校驗事務。更多信息,請參見收發事務消息

      • 以Spring方式發送事務消息:運行TransactionProducerClient類。
      • 以Spring Boot方式發送事務消息:運行TransactionProducerClient類。
    • 發送順序消息:
      • 以純Java方式發送順序消息:運行SimpleOrderProducer類。
      • 以Spring方式發送順序消息:運行OrderProducerClient類。
      • 以Spring Boot方式發送順序消息:運行OrderProducerClient類。

      此方式下,消息發布和消費都按順序進行。更多信息,請參見收發順序消息

    • 發送定時和延時消息:運行MQTimerProducer類發送消息。延時3秒后投遞。

      您也可以指定一個精確的投遞時間,最長定時時間為40天。更多信息,請參見收發定時消息

    云消息隊列 RocketMQ 版控制臺,按Topic查詢消息,可以看到消息已經發送至Topic。
  2. 接收消息。
    • 接收普通消息:
      • 以純Java方式接收普通消息:運行SimpleMQConsumer類。
      • 以Spring方式接收普通消息:運行ConsumerClient類。
      • 以Spring Boot方式接收普通消息:運行ConsumerClient類。
    • 接收事務消息:
      • 以純Java方式接收事務消息:運行SimpleMQConsumer類。
      • 以Spring方式接收事務消息:運行ConsumerClient類。
      • 以Spring Boot方式接收事務消息:運行ConsumerClient類。
    • 接收順序消息:
      • 以純Java方式接收順序消息:運行SimpleOrderConsumer類。
      • 以Spring方式接收順序消息:運行OrderConsumerClient類。
      • 以Spring Boot方式接收順序消息:運行OrderConsumerClient類。
    • 接收定時和延時消息:運行SimpleMQConsumer類。
      說明 Spring和Spring Boot框架暫不支持收發定時和延時消息。
    可以看到消息被接收打印的日志。因為有初始化,所以需等待幾秒,在生產環境中不會經常初始化。

結果驗證:在云消息隊列 RocketMQ 版控制臺查看消費者狀態,可以看到啟動的消費端已經在線,并且訂閱關系一致。

更多信息