日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用flink-dts-connector消費訂閱數據

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

完成數據訂閱通道的配置后,您可以使用flink-dts-connector文件消費通道中的數據,用于Flink客戶端消費。本文介紹如何flink-dts-connector文件的使用說明。

注意事項

  • 僅支持Flink客戶端使用DataStream API、Table API和SQL。

  • 如您的Flink客戶端使用Table API和SQL,則單次配置時僅支持消費單張表的數據,如需消費多張表的數據,您需進行多次配置獨立的任務。

操作步驟

本文以IntelliJ IDEA軟件(Community Edition 2020.1 Windows版本)為例,介紹如何使用flink-dts-connector文件來消費訂閱通道中的數據。

  1. 創建新版數據訂閱通道,詳情請參見創建RDS MySQL數據訂閱通道、創建PolarDB MySQL版數據訂閱通道創建Oracle數據訂閱通道。

  2. 創建一個或多個消費組,詳情請參見新增消費組。

  3. 下載flink-dts-connector文件并解壓。

  4. 運行IntelliJ IDEA工具,然后單擊Open or Import。

    打開工程

  5. 在彈出的對話框中,定位至flink-dts-connector文件所在目錄,依次展開文件夾,找到項目對象模型文件:pom.xml

    pom模型

  6. 在彈出對話框中,選擇Open as Project。

  7. pom.xml文件中添加如下依賴:

    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. 在IntelliJ IDEA軟件界面,依次展開文件夾,并根據您所使用的Flink Connector的程序類型,選擇對應的Java文件。

    • 如Flink客戶端類型為DataStream API,您需雙擊打開flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java文件,并執行如下操作:

      1. 在IntelliJ IDEA軟件界面的頂部,單擊如下圖標。run圖標

      2. 在彈跳框中單擊DtsExample > Edit。edit

      3. 在彈跳框的Program arguments中,按如下示例輸入參數及對應的值,并單擊下方的Run,啟動flink-dts-connector。

        說明

        具體參數說明及查詢方式,請參見參數說明。

        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. 運行結果如下圖所示,該客戶端可正常訂閱到源庫的數據變更信息。數據變更信息(DataStream)

        說明

        如需查詢數據變更的具體記錄,您可登錄Flink客戶端的Task Manager界面進行查看。

    • 如Flink客戶端類型為Table API和SQL,您需雙擊打開flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java文件,并執行如下操作:

      說明

      單個DtsTableISelectTCaseTest.java文件,僅支持配置并消費單張表的訂閱數據。如需配置并消費多張表中的數據,您需要重復配置,并運行多個獨立任務。

      1. 在如下位置添加前導字符//,注釋該行代碼信息。注釋掉一行

      2. 設置所需消費的單張表的信息,支持使用SQL語句。

      3. 設置訂閱通道參數,具體參數說明及查詢方式,請參見參數說明。table api和sql的參數配置

      4. 在IntelliJ IDEA軟件界面的頂部,單擊Run'DtsTableISelectTCaseTest',啟動flink-dts-connector。

      5. 運行結果如下圖所示,該客戶端可正常訂閱到源庫的數據變更信息。tableapi和sql-數據變更信息

        說明

        如需查詢數據變更的具體記錄,您可登錄Flink客戶端的Task Manager界面進行查看。

參數說明

DstExample文件中的參數

DtsTableISelectTCaseTest文件中的參數

說明

查詢方式

broker-url

dts.server

數據訂閱通道的網絡地址及端口號信息。

說明
  • 如果您部署的Flink所屬的ECS實例與數據訂閱通道屬于經典網絡或同一專有網絡,建議通過內網地址進行數據訂閱,網絡延遲最小。

  • 不建議使用公網地址。

在DTS控制臺單擊目標訂閱實例ID,在訂閱配置頁面,您可以獲取到訂閱Topic、網絡地址及端口號信息。

topic

topic

數據訂閱通道的訂閱Topic。

sid

dts.sid

消費組ID。

在DTS控制臺單擊目標訂閱實例ID,然后單擊數據消費,您可以獲取到消費組ID和消費組的賬號信息。

說明

消費組賬號的密碼已在您新建消費組時指定。

user

dts.user

消費組的賬號。

警告

如您未使用本文提供的flink-dts-connector文件,請按照<消費組的賬號>-<消費組ID>的格式設置用戶名(例如:dtstest-dtsae******bpv),否則無法正常連接。

password

dts.password

該賬號的密碼。

checkpoint

dts.checkpoint

消費位點,即flink-dts-connector消費第一條數據的時間戳,格式為Unix時間戳,例如1624440043。

說明

消費位點信息可用于:

  • 當業務程序中斷后,傳入已消費位點繼續消費數據,防止數據丟失。

  • 在訂閱客戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費數據。

消費位點必須在訂閱實例的數據范圍(如圖所示)之內,并需轉化為Unix時間戳。數據范圍

說明

Unix時間戳轉換工具可用搜索引擎獲取。

dts-cdc.table.name

訂閱對象。僅支持傳入單張表,且格式要求如下:

  • 當數據庫類型為MySQL、PolarDB for MySQL、PolarDB-X 1.0PolarDB-X 2.0時:格式為<數據庫名稱>.<表名稱>

  • 當數據庫為其他類型時:格式為<Schema名稱>.<表名稱>

在DTS控制臺單擊目標訂閱實例ID,在訂閱配置頁面,單擊右上方的查看訂閱對象,查詢訂閱對象所屬數據庫和表。

常見問題

報錯提示

可能的原因

解決方式

Cluster changed from *** to ***, consumer require restart.

DTS用于讀取增量數據的模塊DStore發生切換,導致Flink客戶端的消費位點丟失。

您無需重啟客戶端,僅需查詢客戶端的消費位點,并在DtsExample.javaDtsTableISelectTCaseTest.java文件中重新傳入消費位點checkpointdts.checkpoint,即可重新消費訂閱數據。