本文介紹如何通過阿里云實時計算Flink版實時讀寫云原生數據倉庫AnalyticDB PostgreSQL版數據。
背景信息
云原生數據倉庫AnalyticDB PostgreSQL版是一種大規模并行處理(MPP)數據倉庫服務,可提供海量數據在線分析服務。實時計算Flink版是基于Apache Flink構建的?站式實時大數據分析平臺,內置豐富上下游連接器,滿足不同業務場景的需求,提供高效、靈活的實時計算服務。通過實時計算Flink版讀取AnalyticDB PostgreSQL版數據,可以充分發揮云原生數據倉庫的優勢,提高數據分析的效率和精度。
使用限制
該功能暫不支持AnalyticDB PostgreSQL版Serverless模式。
僅Flink實時計算引擎VVR 6.0.0及以上版本支持云原生數據倉庫AnalyticDB PostgreSQL版連接器。
僅Flink實時計算引擎VVR 8.0.1及以上版本支持云原生數據倉庫AnalyticDB PostgreSQL版7.0版本。
說明如果您使用了自定義連接器,具體操作請參見管理自定義連接器。
前提條件
AnalyticDB PostgreSQL版實例和Flink全托管工作空間需要位于同一VPC下。
說明不在同一VPC下時請參見網絡連通性。
已創建Flink全托管工作空間。具體操作請參見開通實時計算Flink版。
步驟一:配置白名單并準備數據
- 登錄云原生數據倉庫AnalyticDB PostgreSQL版控制臺。
將目標Flink工作空間所屬的網段加入AnalyticDB PostgreSQL版的白名單。
單擊頁面右上方的登錄數據庫,并填寫賬號和密碼。連接數據庫的更多方式,請參見客戶端連接。
在對應實例的目標數據庫中創建一張名為adbpg_dim_table的表并插入50條測試數據。
建表SQL和插入數據SQL的示例如下:
--創建名稱為adbpg_dim_table的表。 CREATE TABLE adbpg_dim_table( id int, username text, PRIMARY KEY(id) ); --向adbpg_dim_table的表中插入50行數據,其中id字段的值為從1到50的整數,而username字段的值為username字符串后面跟隨當前行數的文本表示。 INSERT INTO adbpg_dim_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);
您可以使用
select * from adbpg_dim_table order by id;
語句查看插入后的數據。創建一張名為adbpg_sink_table的表,用于Flink寫入結果數據。
CREATE TABLE adbpg_sink_table( id int, username text, score int );
步驟二:創建Flink作業
步驟三:編寫作業代碼并部署作業
將以下作業代碼拷貝到作業文本編輯區。
---創建一個datagen源表。本示例中無需修改WITH參數。 CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='50', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); --創建adbpg維表。需根據您的實際情況修改WITH參數。 CREATE TEMPORARY TABLE dim_adbpg( id int, username varchar, PRIMARY KEY(id) not ENFORCED ) WITH( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_dim_table', 'username' = 'flinktest', 'password' = '${secret_values.adb_password}', 'maxRetryTimes'='2', --寫入數據失敗后,重試寫入的最大次數。 'cache'='lru', --緩存策略, 'cacheSize'='100' --緩存大小 ); --創建adbpg結果表。需根據您的實際情況修改WITH參數。 CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_sink_table', 'username' = 'flinktest', 'password' = '${secret_values.adb_password}', 'maxRetryTimes' = '2', 'conflictMode' = 'ignore',--當Insert寫入出現主鍵沖突或者唯一索引沖突時的處理策略。 'retryWaitTime' = '200' --重試的時間間隔。 ); --維表和源表join后的結果插入adbpg結果表。 INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts on ds.id = ts.id;
根據實際情況修改參數。
本示例中無需修改datagen源表。您需要根據實際情況修改adbpg維表和結果表參數,具體說明如下。涉及的連接器更多相關參數和類型映射請參見相關文檔。
參數
是否必填
說明
url
是
AnalyticDB PostgreSQL版的JDBC連接地址。格式為
jdbc:postgresql://<地址>:<端口>/<連接的數據庫名稱>
。您可在云原生數據倉庫 AnalyticDB PostgreSQL版控制臺對應實例的數據庫連接頁面查看。tablename
是
AnalyticDB PostgreSQL版的表名。
username
是
AnalyticDB PostgreSQL版的數據庫賬號。
password
是
AnalyticDB PostgreSQL版的數據庫賬號密碼。
targetSchema
否
Schema名稱。默認為public。如果您使用了對應數據庫下其他Schema,請填寫此參數。
在作業開發頁面頂部,單擊深度檢查,進行語法檢查。
單擊部署。
在
頁面,單擊目標作業操作列下的啟動。
步驟四:查看寫入數據結果
- 登錄云原生數據倉庫AnalyticDB PostgreSQL版控制臺。
單擊登錄數據庫,連接數據庫的更多方式,請參見客戶端連接。
執行如下查詢語句,查看Flink寫入數據。
SELECT * FROM adbpg_sink_table ORDER BY id;
結果如下圖所示。