云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版(ADB PG)
本文為您介紹如何使用AnalyticDB PostgreSQL連接器。
背景信息
云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版是一種大規(guī)模并行處理(MPP)數(shù)據(jù)倉(cāng)庫(kù)服務(wù),可提供海量數(shù)據(jù)在線分析服務(wù)。
AnalyticDB PostgreSQL版支持的信息如下。
類別 | 詳情 |
支持類型 | 維表和結(jié)果表 |
運(yùn)行模式 | 流模式和批模式 |
數(shù)據(jù)格式 | 暫不適用 |
特有監(jiān)控指標(biāo) |
說(shuō)明 指標(biāo)含義詳情,請(qǐng)參見(jiàn)監(jiān)控指標(biāo)說(shuō)明。 |
API種類 | SQL |
是否支持更新或刪除結(jié)果表數(shù)據(jù) | 是 |
前提條件
已創(chuàng)建AnalyticDB PostgreSQL實(shí)例并創(chuàng)建表,詳情請(qǐng)參見(jiàn)創(chuàng)建實(shí)例和CREATE TABLE。
已設(shè)置白名單,詳情請(qǐng)參見(jiàn)設(shè)置白名單。
使用限制
僅Flink實(shí)時(shí)計(jì)算引擎VVR 6.0.0及以上版本支持云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版連接器。
僅Flink實(shí)時(shí)計(jì)算引擎VVR 8.0.1及以上版本支持云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版7.0版本。
暫不支持自建的PostgreSQL。
語(yǔ)法結(jié)構(gòu)
CREATE TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='adbpg',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
WITH參數(shù)
類型 | 參數(shù) | 說(shuō)明 | 數(shù)據(jù)類型 | 是否必填 | 默認(rèn)值 | 備注 |
通用 | connector | 表類型。 | String | 是 | 無(wú) | 固定值為adbpg。 |
url | JDBC連接地址。 | String | 是 | 無(wú) | 格式為 | |
tableName | 表名。 | String | 是 | 無(wú) | 無(wú)。 | |
userName | 用戶名。 | String | 是 | 無(wú) | 無(wú)。 | |
password | 密碼。 | String | 是 | 無(wú) | 無(wú)。 | |
maxRetryTimes | 寫(xiě)入數(shù)據(jù)失敗后,重試寫(xiě)入的最大次數(shù)。 | Integer | 否 | 3 | 無(wú)。 | |
targetSchema | Schema名稱。 | String | 否 | public | 無(wú)。 | |
caseSensitive | 大小寫(xiě)是否敏感。 | String | 否 | false | 參數(shù)取值如下:
| |
connectionMaxActive | 連接池的最大連接數(shù)。 | Integer | 否 | 5 | 系統(tǒng)會(huì)自動(dòng)釋放與數(shù)據(jù)庫(kù)服務(wù)的空閑連接。 重要 此參數(shù)設(shè)置過(guò)大可能會(huì)導(dǎo)致服務(wù)端連接數(shù)出現(xiàn)異常。 | |
結(jié)果表獨(dú)有 | retryWaitTime | 重試的時(shí)間間隔。 | Integer | 否 | 100 | 單位毫秒。 |
batchSize | 一次批量寫(xiě)入的數(shù)據(jù)條數(shù)。 | Integer | 否 | 500 | 無(wú)。 | |
flushIntervalMs | 清空緩存的時(shí)間間隔。 | Integer | 否 | 無(wú) | 如果緩存中的數(shù)據(jù)在等待指定時(shí)間后,依然沒(méi)有達(dá)到輸出條件,系統(tǒng)會(huì)自動(dòng)輸出緩存中的所有數(shù)據(jù)。單位毫秒。 | |
writeMode | 第一次嘗試寫(xiě)入時(shí)的寫(xiě)入方式。 | String | 否 | insert | 參數(shù)取值如下:
| |
conflictMode | 當(dāng)Insert寫(xiě)入出現(xiàn)主鍵沖突或者唯一索引沖突時(shí)的處理策略。 | String | 否 | strict | 參數(shù)取值如下:
| |
維表獨(dú)有 | maxJoinRows | 單行數(shù)據(jù)Join的最多行數(shù)。 | Integer | 否 | 1024 | 無(wú)。 |
cache | 緩存策略。 | String | 否 | ALL | 支持以下三種緩存策略:
| |
cacheSize | 緩存大小,即緩存多少行數(shù)據(jù)。 | Long | 否 | 100000 | 僅當(dāng)選擇LRU緩存策略時(shí),cacheSize參數(shù)生效。 | |
cacheTTLMs | 緩存失效的超時(shí)時(shí)間。 | Long | 否 | Long.MAX_VALUE | cacheTTLMs配置和cache配置有關(guān):
單位為毫秒。 |
類型映射
云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版字段類型 | Flink字段類型 |
boolean | boolean |
smallint | int |
int | int |
bigint | bigint |
float | double |
varchar | varchar |
text | varchar |
timestamp | timestamp |
date | date |
使用示例
結(jié)果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TABLE adbpg_sink ( name VARCHAR, age INT ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO adbpg_sink SELECT * FROM datagen_source;
維表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' }; CREATE TABLE adbpg_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) COMMENT 'blackhole sink table' WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;