本文為您介紹如何使用PolarDB PostgreSQL版(Oracle語法兼容1.0)連接器。
背景信息
PolarDB PostgreSQL版(兼容Oracle)是阿里巴巴自研的新一代云原生數據庫,在存儲計算分離架構下,利用了軟硬件結合的優勢,為您提供具備極致彈性、高性能、海量存儲、安全可靠的數據庫服務,高度兼容Oracle。
PolarDB PostgreSQL版(Oracle語法兼容1.0)連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 結果表 |
運行模式 | 流模式和批模式 |
數據格式 | 暫不適用 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | SQL |
是否支持更新或刪除結果表數據 | 是 |
前提條件
已創建PolarDB PostgreSQL版(Oracle語法兼容1.0)實例并創建表,詳情請參見創建PolarDB PostgreSQL版(兼容Oracle)集群和創建表。
已設置白名單,詳情請參見設置集群白名單。
使用限制
僅Flink實時計算引擎VVR 8.0.5及以上版本支持PolarDB PostgreSQL版(Oracle語法兼容1.0)連接器。
語法結構
CREATE TABLE polardbo_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='polardbo',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
WITH參數
參數 | 說明 | 數據類型 | 是否必填 | 默認值 | 備注 |
connector | 表類型。 | String | 是 | 無 | 固定值為polardbo。 |
url | JDBC連接地址。 | String | 是 | 無 | 格式為 |
tableName | 表名。 | String | 是 | 無 | 無。 |
userName | 用戶名。 | String | 是 | 無 | 無。 |
password | 密碼。 | String | 是 | 無 | 為了避免您的密碼信息泄露,建議您通過密鑰管理的方式填寫密碼取值,詳情請參見密鑰管理。 |
maxRetryTimes | 寫入數據失敗后,重試寫入的最大次數。 | Integer | 否 | 3 | 無。 |
targetSchema | Schema名稱。 | String | 否 | public | 無。 |
caseSensitive | 大小寫是否敏感。 | String | 否 | false | 參數取值如下:
|
connectionMaxActive | 連接池的最大連接數。 | Integer | 否 | 5 | 系統會自動釋放與數據庫服務的空閑連接。 重要 此參數設置過大可能會導致服務端連接數出現異常。 |
retryWaitTime | 重試的時間間隔。 | Integer | 否 | 100 | 單位毫秒。 |
batchSize | 一次批量寫入的數據條數。 | Integer | 否 | 500 | 無。 |
flushIntervalMs | 清空緩存的時間間隔。 | Integer | 否 | 無 | 單位毫秒。如果緩存中的數據在等待指定時間后,依然沒有達到輸出條件,系統會自動輸出緩存中的所有數據。 |
writeMode | 第一次嘗試寫入時的寫入方式。 | String | 否 | insert | 參數取值如下:
|
conflictMode | 當Insert寫入出現主鍵沖突或者唯一索引沖突時的處理策略。 | String | 否 | strict | 參數取值如下:
|
類型映射
下面是將connector用于結果表的場景下,Flink字段到PolarDB PostgreSQL(Oracle兼容1.0)字段的映射關系。
PolarDB PostgreSQL版(Oracle語法兼容1.0)字段類型 | Flink字段類型 |
boolean | boolean |
int | int |
number | bigint |
number | double |
varchar | varchar |
timestamp | timestamp |
varchar | date |
使用示例
結果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TABLE polardbo_sink ( name VARCHAR, age INT ) WITH ( 'connector'='polardbo', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO polardbo_sink SELECT * FROM datagen_source;