日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

JDBC

本文為您介紹如何使用JDBC連接器。

背景信息

此連接器為開源Flink的JDBC連接器,JDBC連接器提供了對MySQL、PostgreSQL和Oracle等常見的數據庫讀寫支持。JDBC連接器支持的信息如下。

類別

詳情

支持類型

源表、維表和結果表

運行模式

流模式和批模式

數據格式

暫不適用

特有監控指標

暫無

API種類

SQL

是否支持更新或刪除結果表數據

前提條件

連接的數據庫和表都已被創建。

使用限制

  • 僅實時計算引擎VVR 6.0.1及以上版本支持JDBC連接器。

  • JDBC源表為Bounded Source,表中數據讀取完,對應的Task就會結束。如果需要捕獲實時變更數據,則請使用CDC連接器,詳情請參見MySQL的CDC源表Postgres的CDC源表(公測中)

  • 使用JDBC結果表連接PostgreSQL數據庫時,需要數據庫版本為PostgreSQL 9.5及以上。因為DDL中定義主鍵的情況下,PostgreSQL采用ON CONFLICT語法進行插入或更新,此語法需要PostgreSQL 9.5及以上版本才支持。

  • Flink中只提供了開源JDBC連接器的實現,不包含具體的數據庫的Driver。在使用JDBC連接器時,需要手動上傳目標數據庫Driver的JAR包作為附加依賴文件,具體操作請參見步驟三:進行更多配置。目前支持的Driver如下表所示。

    Driver

    Group Id

    Artifact Id

    MySQL

    mysql

    mysql-connector-java

    Oracle

    com.oracle.database.jdbc

    ojdbc8

    PostgreSQL

    org.postgresql

    postgresql

    • 如果您采用非列表中的JDBC Driver,則其正確性和可用性需要您自行充分測試并保證。

    • JDBC連接器在向MySQL結果表寫入數據時,會將接收到的每條數據拼接成一條SQL去執行。對于包含主鍵的MySQL結果表,會拼接執行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;語句。需要注意的是,如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下游數據會因為唯一索引沖突導致數據覆蓋引發數據丟失。

語法結構

CREATE TABLE jdbc_table (
  `id` BIGINT,
  `name` VARCHAR,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:xxx',
  'table-name' = '<yourTable>',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>'
);

WITH參數

  • 通用

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    表類型。

    String

    固定值為jdbc。

    url

    數據庫的URL。

    String

    無。

    table-name

    JDBC表的名稱。

    String

    無。

    username

    JDBC用戶名稱。

    String

    如果指定了username和password中的任一參數,則兩者必須都被指定。

    password

    JDBC用戶密碼。

    String

  • 源表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    scan.partition.column

    對輸入進行分區的列名。

    String

    該列必須是數值類型或時間戳類型,且該類型在數據庫中需要支持與數值類型進行比較。關于分區掃描的詳情請參見Partitioned Scan

    scan.partition.num

    分區數。

    Integer

    無。

    scan.partition.lower-bound

    第一個分區的最小值。

    Long

    無。

    scan.partition.upper-bound

    最后一個分區的最大值。

    Long

    無。

    scan.fetch-size

    每次循環讀取時,從數據庫中獲取的行數。

    Integer

    0

    如果指定的值為0,則該配置項會被忽略。

    scan.auto-commit

    是否開啟auto-commit

    Boolean

    true

    無。

  • 結果表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    sink.buffer-flush.max-rows

    flush數據前,緩存記錄的最大值。

    Integer

    100

    您可以設置為0來禁用它,即不再緩存記錄,直接flush數據。

    sink.buffer-flush.interval

    flush數據的時間間隔。數據在Flink中緩存的時間超過該參數指定的時間后,異步線程將flush數據到數據庫中。

    Duration

    1 s

    您可以設置為0來禁用它,即不再緩存記錄,直接flush數據。

    說明

    如果您需要完全異步地處理緩存的flush事件,則可以將sink.buffer-flush.max-rows設置為0,并配置適當的flush時間間隔。

    sink.max-retries

    寫入記錄到數據庫失敗后的最大重試次數。

    Integer

    3

    無。

  • 維表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    lookup.cache.max-rows

    指定緩存的最大行數。如果超過該值,則最老的行記錄將會過期,會被新的記錄替換掉。

    Integer

    默認情況下,維表Cache是未開啟的。您可以設置lookup.cache.max-rowslookup.cache.ttl參數來啟用維表Cache。啟用緩存時,采用的是LRU策略緩存。

    lookup.cache.ttl

    指定緩存中每行記錄的最大存活時間。如果某行記錄超過該時間,則該行記錄將會過期。

    Duration

    lookup.cache.caching-missing-key

    是否緩存空的查詢結果。

    Boolean

    true

    參數取值如下:

    • true(默認值):緩存空的查詢結果。

    • false:不緩存空的查詢結果。

    lookup.max-retries

    查詢數據庫失敗的最大重試次數。

    Integer

    3

    無。

  • PostgreSQL獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    source.extend-type.enabled

    作為源表和維表時,是否允許讀取JSONB和UUID拓展類型,并映射到Flink支持的類型。

    Boolean

    false

    參數取值如下:

    • true:支持讀取和映射拓展類型。

    • false(默認值):不支持讀取和映射拓展類型。

類型映射

MySQL類型

Oracle類型

PostgreSQL類型

FlinkSQL類型

TINYINT

TINYINT

  • SMALLINT

  • TINYINT UNSIGNED

  • SMALLINT

  • INT2

  • SMALLSERIAL

  • SERIAL2

SMALLINT

  • INT

  • MEDIUMINT

  • SMALLINT UNSIGNED

  • INTEGER

  • SERIAL

INT

  • BIGINT

  • INT UNSIGNED

  • BIGINT

  • BIGSERIAL

BIGINT

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT

BIGINT

BIGINT

FLOAT

BINARY_FLOAT

  • REAL

  • FLOAT4

FLOAT

  • DOUBLE

  • DOUBLE PRECISION

BINARY_DOUBLE

  • FLOAT8

  • DOUBLE PRECISION

DOUBLE

  • NUMERIC(p, s)

  • DECIMAL(p, s)

  • SMALLINT

  • FLOAT(s)

  • DOUBLE PRECISION

  • REAL

  • NUMBER(p, s)

  • NUMERIC(p, s)

  • DECIMAL(p, s)

DECIMAL(p, s)

  • BOOLEAN

  • TINYINT(1)

BOOLEANcan

BOOLEAN

DATE

DATE

DATE

DATE

TIME [(p)]

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

  • CHAR(n)

  • VARCHAR(n)

  • TEXT

  • CHAR(n)

  • VARCHAR(n)

  • CLOB

  • CHAR(n)

  • CHARACTER(n)

  • VARCHAR(n)

  • CHARACTER VARYING(n)

  • TEXT

  • JSONB

  • UUID

STRING

  • BINARY

  • VARBINARY

  • BLOB

  • RAW(s)

  • BLOB

BYTEA

BYTES

ARRAY

ARRAY

使用示例

  • 源表

    CREATE TEMPORARY TABLE jdbc_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
  • 結果表

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    INSERT INTO jdbc_sink
    SELECT * FROM datagen_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source(
     `id` INT,
     `data` BIGINT,
     `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_dim (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `data` BIGINT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.`id`,T.`data`, H.`name`
    FROM datagen_source AS T
    JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;