本文為您介紹如何使用JDBC連接器。
背景信息
此連接器為開源Flink的JDBC連接器,JDBC連接器提供了對MySQL、PostgreSQL和Oracle等常見的數據庫讀寫支持。JDBC連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 源表、維表和結果表 |
運行模式 | 流模式和批模式 |
數據格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | SQL |
是否支持更新或刪除結果表數據 | 是 |
前提條件
連接的數據庫和表都已被創建。
使用限制
僅實時計算引擎VVR 6.0.1及以上版本支持JDBC連接器。
JDBC源表為Bounded Source,表中數據讀取完,對應的Task就會結束。如果需要捕獲實時變更數據,則請使用CDC連接器,詳情請參見MySQL的CDC源表和Postgres的CDC源表(公測中)。
使用JDBC結果表連接PostgreSQL數據庫時,需要數據庫版本為PostgreSQL 9.5及以上。因為DDL中定義主鍵的情況下,PostgreSQL采用ON CONFLICT語法進行插入或更新,此語法需要PostgreSQL 9.5及以上版本才支持。
Flink中只提供了開源JDBC連接器的實現,不包含具體的數據庫的Driver。在使用JDBC連接器時,需要手動上傳目標數據庫Driver的JAR包作為附加依賴文件,具體操作請參見步驟三:進行更多配置。目前支持的Driver如下表所示。
Driver
Group Id
Artifact Id
MySQL
mysql
Oracle
com.oracle.database.jdbc
PostgreSQL
org.postgresql
如果您采用非列表中的JDBC Driver,則其正確性和可用性需要您自行充分測試并保證。
JDBC連接器在向MySQL結果表寫入數據時,會將接收到的每條數據拼接成一條SQL去執行。對于包含主鍵的MySQL結果表,會拼接執行
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;
語句。需要注意的是,如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下游數據會因為唯一索引沖突導致數據覆蓋引發數據丟失。
語法結構
CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
WITH參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
表類型。
String
是
無
固定值為jdbc。
url
數據庫的URL。
String
是
無
無。
table-name
JDBC表的名稱。
String
是
無
無。
username
JDBC用戶名稱。
String
否
無
如果指定了username和password中的任一參數,則兩者必須都被指定。
password
JDBC用戶密碼。
String
否
無
源表獨有
參數
說明
數據類型
是否必填
默認值
備注
scan.partition.column
對輸入進行分區的列名。
String
否
無
該列必須是數值類型或時間戳類型,且該類型在數據庫中需要支持與數值類型進行比較。關于分區掃描的詳情請參見Partitioned Scan。
scan.partition.num
分區數。
Integer
否
無
無。
scan.partition.lower-bound
第一個分區的最小值。
Long
否
無
無。
scan.partition.upper-bound
最后一個分區的最大值。
Long
否
無
無。
scan.fetch-size
每次循環讀取時,從數據庫中獲取的行數。
Integer
否
0
如果指定的值為0,則該配置項會被忽略。
scan.auto-commit
是否開啟auto-commit。
Boolean
否
true
無。
結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
sink.buffer-flush.max-rows
flush數據前,緩存記錄的最大值。
Integer
否
100
您可以設置為0來禁用它,即不再緩存記錄,直接flush數據。
sink.buffer-flush.interval
flush數據的時間間隔。數據在Flink中緩存的時間超過該參數指定的時間后,異步線程將flush數據到數據庫中。
Duration
否
1 s
您可以設置為0來禁用它,即不再緩存記錄,直接flush數據。
說明如果您需要完全異步地處理緩存的flush事件,則可以將sink.buffer-flush.max-rows設置為0,并配置適當的flush時間間隔。
sink.max-retries
寫入記錄到數據庫失敗后的最大重試次數。
Integer
否
3
無。
維表獨有
參數
說明
數據類型
是否必填
默認值
備注
lookup.cache.max-rows
指定緩存的最大行數。如果超過該值,則最老的行記錄將會過期,會被新的記錄替換掉。
Integer
否
無
默認情況下,維表Cache是未開啟的。您可以設置lookup.cache.max-rows和lookup.cache.ttl參數來啟用維表Cache。啟用緩存時,采用的是LRU策略緩存。
lookup.cache.ttl
指定緩存中每行記錄的最大存活時間。如果某行記錄超過該時間,則該行記錄將會過期。
Duration
否
無
lookup.cache.caching-missing-key
是否緩存空的查詢結果。
Boolean
否
true
參數取值如下:
true(默認值):緩存空的查詢結果。
false:不緩存空的查詢結果。
lookup.max-retries
查詢數據庫失敗的最大重試次數。
Integer
否
3
無。
PostgreSQL獨有
參數
說明
數據類型
是否必填
默認值
備注
source.extend-type.enabled
作為源表和維表時,是否允許讀取JSONB和UUID拓展類型,并映射到Flink支持的類型。
Boolean
否
false
參數取值如下:
true:支持讀取和映射拓展類型。
false(默認值):不支持讀取和映射拓展類型。
類型映射
MySQL類型 | Oracle類型 | PostgreSQL類型 | FlinkSQL類型 |
TINYINT | 無 | 無 | TINYINT |
| 無 |
| SMALLINT |
| 無 |
| INT |
| 無 |
| BIGINT |
BIGINT UNSIGNED | 無 | 無 | DECIMAL(20, 0) |
BIGINT | 無 | BIGINT | BIGINT |
FLOAT | BINARY_FLOAT |
| FLOAT |
| BINARY_DOUBLE |
| DOUBLE |
|
|
| DECIMAL(p, s) |
| 無 | BOOLEANcan | BOOLEAN |
DATE | DATE | DATE | DATE |
TIME [(p)] | DATE | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
|
|
| STRING |
|
| BYTEA | BYTES |
無 | 無 | ARRAY | ARRAY |
使用示例
源表
CREATE TEMPORARY TABLE jdbc_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
結果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); INSERT INTO jdbc_sink SELECT * FROM datagen_source;
維表
CREATE TEMPORARY TABLE datagen_source( `id` INT, `data` BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_dim ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `data` BIGINT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.`id`,T.`data`, H.`name` FROM datagen_source AS T JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;