日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用SDK示例代碼消費PolarDB-X 1.0訂閱數據

完成數據訂閱任務后,您可以使用DTS提供的SDK示例代碼來訂閱數據變更信息。本文介紹通過SDK示例代碼消費分布式訂閱數據,支持的數據源包括PolarDB-X 1.0和DMS LogicDB。

前提條件

  • 已安裝JDK 1.8版本。

  • 已安裝IntelliJ IDEA軟件。

注意事項

如果使用子賬號(RAM用戶)來訂閱數據,該賬號需具備AliyunDTSFullAccess權限,以及訂閱對象的訪問權限,授權方法請參見通過系統策略授權子賬號管理DTS為RAM用戶授權

操作步驟

本文以IntelliJ IDEA軟件(Community Edition 2020.1 Windows版本)為例,介紹如何運行SDK示例代碼來消費PolarDB-X 1.0的訂閱數據。

  1. 創建新版數據訂閱通道,具體操作請參見創建RDS MySQL數據訂閱任務

  2. 創建一個或多個消費組,具體操作請參見新增消費組

  3. 下載并解壓SDK示例代碼文件,下載地址為SDK示例代碼

    重要

    在消費訂閱數據時,您需要調用DefaultUserRecord的commit方法以提交位點信息,否則會導致數據重復消費。

  4. 在IntelliJ IDEA軟件中打開目標項目。

    1. 打開IntelliJ IDEA軟件,然后單擊Open or Import

      打開工程

    2. 在彈出的對話框中,選擇SDK示例代碼解壓的目錄,依次展開文件夾,雙擊項目對象模型文件:pom.xml

      雙擊模型文件

    3. 在彈出對話框中,選擇Open as Projec

  5. 在IntelliJ IDEA軟件界面,依次展開文件夾,并根據 SDK客戶端的使用模式,選擇并雙擊打開對應的Java文件:DistributedDTSConsumerDemo

    找到目標文件

  6. 設置Java文件代碼中的必填參數。

    public static void main(String[] args) throws ClientException {
            //分布式類型數據源的訂閱配置方式,例如PolarDBX10(原DRDS)。配置AccessKey、實例Id、主任務id,訂閱消費組等相關信息。
            String accessKeyId = "LTA***********99reZ";
            String accessKeySecret = "****************";
            String regionId = "cn-hangzhou";
            String dtsInstanceId = "dtse5212sed162****";
            String jobId = "l791216x16d****";
            String sid = "dtsip412t13160****";
            String userName = "xftest";
            String password = "******";
            String proxyUrl = "dts-cn-****.com:18001";
            // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
            String checkpoint = "1639620090";
    
            // Convert physical database/table name to logical database/table name
            boolean mapping = true;
            // if force use config checkpoint when start. for checkpoint reset, only assign mode works
            boolean isForceUseInitCheckpoint = false;
    
            ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
            DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId,
                    jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl,
                    checkpoint, isForceUseInitCheckpoint, mapping);
            demo.start();
        }

    參數

    說明

    獲取方法

    accessKeyId

    訪問密鑰ID。

    獲取方法請參見獲取AccessKey

    accessKeySecret

    訪問密鑰ID的密碼。

    regionId

    數據訂閱任務的地域ID。

    在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面,您可以獲取地域信息,例如:地域為華東1(杭州),代碼中需要填寫為cn-hangzhou,詳情請參見地域列表

    dtsInstanceId

    數據訂閱任務實例ID。

    在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面,您可以獲取數據訂閱實例的任務實例ID

    jobId

    數據訂閱任務ID。

    您可以調用DescribeDtsJobs接口獲取數據訂閱任務ID(DtsJobId)。

    sid

    消費組ID。

    在DTS控制臺單擊目標訂閱實例ID,然后單擊左側導航欄的數據消費,您可以獲取到消費組ID/名稱和消費組的賬號信息。

    說明

    消費組賬號的密碼已在您新建消費組時指定。

    userName

    消費組的賬號。

    password

    消費組賬號的密碼。

    proxyUrl

    數據訂閱通道的網絡地址及端口號信息。

    說明
    • 如果您部署SDK客戶端所屬的ECS實例與數據訂閱通道屬于經典網絡或同一專有網絡,建議通過內網地址進行數據訂閱,網絡延遲最小。

    • 不建議使用公網地址。

    在DTS控制臺單擊目標訂閱實例ID,在基本信息頁面,您可以獲取到網絡信息。

    checkpoint

    消費位點,即SDK客戶端消費第一條數據的時間戳,格式為Unix時間戳,使用秒級時間戳。

    說明

    消費位點信息可用于:

    • 當業務程序中斷后,傳入已消費位點繼續消費數據,防止數據丟失。

    • 在訂閱客戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費數據。

    消費位點必須在訂閱實例的數據范圍之內,并需轉化為Unix時間戳。

    說明
    • 您可以在訂閱任務列表的數據范圍列,查看訂閱實例的數據范圍。

    • Unix時間戳轉換工具可用搜索引擎獲取。

  7. 在IntelliJ IDEA軟件界面的頂部,選擇Run > Run運行該客戶端。

    說明

    首次運行時,軟件需要一定時間自動加載相關依賴包并完成安裝。

    • 運行結果中顯示該客戶端可正常訂閱到源庫的數據變更信息。

    • SDK客戶端每隔一定時間會統計并顯示消費數據的信息,包括數據發送和接受時數據總數、數據總量、每秒請求數接收RPS等。

      表 1. 消費數據的統計信息

      參數

      說明

      outCounts

      SDK客戶端所消費的數據總數。

      outBytes

      SDK客戶端所消費的數據總量,單位為Byte。

      outRps

      SDK客戶端消費數據時的每秒請求數。

      outBps

      SDK客戶端消費數據時每秒傳送的比特數。

      count

      暫無。

      inBytes

      DTS服務器發送的數據總量,單位為Byte。

      DStoreRecordQueue

      DTS服務器發送數據時,當前數據緩存隊列的大小。

      inCounts

      DTS服務器發送數據總數。

      inRps

      DTS服務器發送數據時的每秒請求數。

      inBps

      DTS服務器發送數據時每秒傳送的比特數。

      __dt

      SDK客戶端接收到數據的當前時間戳,單位為毫秒。

      DefaultUserRecordQueue

      序列化后,當前數據緩存隊列的大小。

  8. 可選:如果您需要修改訂閱數據的數據類型,可以在buildRecordListener()方法中進行修改或者自定義類。

    public static Map<String, RecordListener> buildRecordListener() {
            // user can impl their own listener
            RecordListener mysqlRecordPrintListener = new RecordListener() {
                @Override
                public void consume(DefaultUserRecord record) {
    
                    OperationType operationType = record.getOperationType();
    
                    if (operationType.equals(OperationType.INSERT)
                            || operationType.equals(OperationType.UPDATE)
                            || operationType.equals(OperationType.DELETE)
                            || operationType.equals(OperationType.HEARTBEAT)) {
    
                        // consume record
                        RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL);
    
                        recordPrintListener.consume(record);
    
                        //commit method push the checkpoint update
                        record.commit("");
                    }
                }
            };
            return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
        }