日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Flink讀寫云原生數據倉庫AnalyticDB PostgreSQL版(ADB PG)數據

本文介紹如何通過阿里云實時計算Flink版實時讀寫云原生數據倉庫AnalyticDB PostgreSQL版數據。

背景信息

云原生數據倉庫AnalyticDB PostgreSQL版是一種大規模并行處理(MPP)數據倉庫服務,可提供海量數據在線分析服務。實時計算Flink版是基于Apache Flink構建的?站式實時大數據分析平臺,內置豐富上下游連接器,滿足不同業務場景的需求,提供高效、靈活的實時計算服務。通過實時計算Flink版讀取AnalyticDB PostgreSQL版數據,可以充分發揮云原生數據倉庫的優勢,提高數據分析的效率和精度。

使用限制

  • 該功能暫不支持AnalyticDB PostgreSQL版Serverless模式

  • 僅Flink實時計算引擎VVR 6.0.0及以上版本支持云原生數據倉庫AnalyticDB PostgreSQL版連接器。

  • 僅Flink實時計算引擎VVR 8.0.1及以上版本支持云原生數據倉庫AnalyticDB PostgreSQL版7.0版本。

    說明

    如果您使用了自定義連接器,具體操作請參見管理自定義連接器

前提條件

步驟一:配置白名單并準備數據

  1. 登錄云原生數據倉庫AnalyticDB PostgreSQL版控制臺
  2. 將目標Flink工作空間所屬的網段加入AnalyticDB PostgreSQL版的白名單。

    1. 查看目標Flink工作空間的虛擬交換機所屬網段,詳情請參見控制臺操作

    2. 添加至目標AnalyticDB PostgreSQL版實例的白名單中,請參見操作步驟

      說明

      如果您通過公網訪問,請添加公網IP至白名單。

  3. 單擊頁面右上方的登錄數據庫,并填寫賬號和密碼。連接數據庫的更多方式,請參見客戶端連接

  4. 在對應實例的目標數據庫中創建一張名為adbpg_dim_table的表并插入50條測試數據。

    建表SQL和插入數據SQL的示例如下:

    --創建名稱為adbpg_dim_table的表。
    CREATE TABLE adbpg_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    --向adbpg_dim_table的表中插入50行數據,其中id字段的值為從1到50的整數,而username字段的值為username字符串后面跟隨當前行數的文本表示。
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);

    您可以使用select * from adbpg_dim_table order by id;語句查看插入后的數據。

  5. 創建一張名為adbpg_sink_table的表,用于Flink寫入結果數據。

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

步驟二:創建Flink作業

  1. 登錄實時計算控制臺,單擊目標工作空間操作列下的控制臺

  2. 在左側導航欄,單擊數據開發 > ETL,單擊新建,選擇空白的流作業草稿,單擊下一步

  3. 新建作業草稿對話框,填寫作業配置信息。

    作業參數

    說明

    示例

    文件名稱

    作業的名稱。

    說明

    作業名稱在當前項目中必須保持唯一。

    adbpg-test

    存儲位置

    指定該作業的代碼文件所屬的文件夾。

    您還可以在現有文件夾右側,單擊新建文件夾圖標,新建子文件夾。

    作業草稿

    引擎版本

    當前作業使用的Flink的引擎版本。引擎版本號含義、版本對應關系和生命周期重要時間點詳情請參見引擎版本介紹

    vvr-8.0.1-flink-1.17

  4. 單擊創建

步驟三:編寫作業代碼并部署作業

  1. 將以下作業代碼拷貝到作業文本編輯區。

    ---創建一個datagen源表。本示例中無需修改WITH參數。
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen', 
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='50',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    --創建adbpg維表。需根據您的實際情況修改WITH參數。
    CREATE TEMPORARY TABLE dim_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) not ENFORCED
    ) WITH(
     'connector' = 'adbpg', 
     'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
     'tablename' = 'adbpg_dim_table', 
     'username' = 'flinktest',
     'password' = '${secret_values.adb_password}',
     'maxRetryTimes'='2', --寫入數據失敗后,重試寫入的最大次數。
     'cache'='lru',  --緩存策略,
     'cacheSize'='100'  --緩存大小
    );
    
    --創建adbpg結果表。需根據您的實際情況修改WITH參數。
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
      'tablename' = 'adbpg_sink_table',  
      'username' = 'flinktest',
      'password' = '${secret_values.adb_password}',
      'maxRetryTimes' = '2',
      'conflictMode' = 'ignore',--當Insert寫入出現主鍵沖突或者唯一索引沖突時的處理策略。
      'retryWaitTime' = '200'  --重試的時間間隔。
    );
    
    --維表和源表join后的結果插入adbpg結果表。
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score
    FROM datagen_source AS ds
    JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    on ds.id = ts.id;
  2. 根據實際情況修改參數。

    本示例中無需修改datagen源表。您需要根據實際情況修改adbpg維表和結果表參數,具體說明如下。涉及的連接器更多相關參數和類型映射請參見相關文檔

    參數

    是否必填

    說明

    url

    AnalyticDB PostgreSQL版的JDBC連接地址。格式為jdbc:postgresql://<地址>:<端口>/<連接的數據庫名稱>。您可在云原生數據倉庫 AnalyticDB PostgreSQL版控制臺對應實例的數據庫連接頁面查看。

    tablename

    AnalyticDB PostgreSQL版的表名。

    username

    AnalyticDB PostgreSQL版的數據庫賬號。

    password

    AnalyticDB PostgreSQL版的數據庫賬號密碼。

    targetSchema

    Schema名稱。默認為public。如果您使用了對應數據庫下其他Schema,請填寫此參數。

  3. 在作業開發頁面頂部,單擊深度檢查,進行語法檢查。

  4. 單擊部署

  5. 運維中心 > 作業運維頁面,單擊目標作業操作列下的啟動

步驟四:查看寫入數據結果

  1. 登錄云原生數據倉庫AnalyticDB PostgreSQL版控制臺
  2. 單擊登錄數據庫,連接數據庫的更多方式,請參見客戶端連接

  3. 執行如下查詢語句,查看Flink寫入數據。

    SELECT * FROM adbpg_sink_table ORDER BY id;

    結果如下圖所示。

    image.png

相關文檔