日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

OceanBase(公測中)

本文為您介紹如何使用OceanBase連接器。

背景信息

OceanBase數(shù)據(jù)庫是一款原生分布式的HTAP數(shù)據(jù)庫管理系統(tǒng),詳情請參見OceanBase官網(wǎng)。為了降低您從MySQL數(shù)據(jù)庫或Oracle數(shù)據(jù)庫遷移到OceanBase數(shù)據(jù)庫時引發(fā)的業(yè)務(wù)系統(tǒng)改造成本,OceanBase數(shù)據(jù)庫支持Oracle和MySQL兩種兼容模式,兩種模式下的數(shù)據(jù)類型、SQL功能、內(nèi)部視圖等與MySQL數(shù)據(jù)庫或Oracle數(shù)據(jù)庫保持一致。兩種模式下建議使用的連接器如下:

  • Oracle模式:只能使用OceanBase連接器。

  • MySQL模式:與原生MySQL保持高度兼容,支持使用OceanBase和MySQL兩種連接器,MySQL連接器詳情請參見MySQL

    • 在無需高級特性的情況下,維表和結(jié)果表建議優(yōu)先考慮MySQL連接器,配置更簡單。

    • 使用OceanBase 3.2.4.4及以上版本時,源表建議優(yōu)先使用MySQL連接器。這是因為OceanBase 3.2.4.4及以上版本MySQL模式開始支持開啟Binlog服務(wù),輸出格式與原生MySQL Binlog一致(有關(guān)OceanBase Binlog的更多信息,請參見概述Binlog 相關(guān)操作)。

      重要
      • 使用MySQL連接器讀取OceanBase Binlog目前處于公測階段,請在使用前充分評估并謹(jǐn)慎使用。

      • 使用MySQL連接器讀取OceanBase時,請確保OceanBase Binlog已開啟且被正確設(shè)置,詳情請參見Binlog 相關(guān)操作

OceanBase連接器支持的信息如下。

類別

詳情

支持類型

源表、維表和結(jié)果表

運行模式

流模式和批模式

數(shù)據(jù)格式

暫不適用

特有監(jiān)控指標(biāo)

暫無

API種類

SQL

是否支持更新或刪除結(jié)果表數(shù)據(jù)

前提條件

連接的數(shù)據(jù)庫和表都已被創(chuàng)建。具體操作可參考以下文檔:

使用限制

  • 維表和結(jié)果表

    • Flink計算引擎VVR 8.0.1及以上版本支持OceanBase連接器。

    • 語義上可以保證At-Least-Once,在結(jié)果表有主鍵的情況下,冪等可以保證數(shù)據(jù)的正確性。

  • 結(jié)果表:OceanBase數(shù)據(jù)庫沒有部署數(shù)據(jù)庫代理服務(wù)時,連接器會使用OCJ(OceanBase Connector Java)連接OceanBase數(shù)據(jù)庫,該模式需要用到config url,要求OceanBase數(shù)據(jù)庫已部署OceanBase云平臺。該工作方式只能用于OceanBase數(shù)據(jù)庫的MySQL兼容模式,不支持Oracle兼容模式。

    說明

    數(shù)據(jù)庫代理服務(wù)與OCJ實現(xiàn)了相同的路由功能,區(qū)別在于OCJ驅(qū)動集成于Java應(yīng)用程序,而數(shù)據(jù)庫代理是一個獨立的代理服務(wù)。目前,OceanBase團(tuán)隊推薦通過數(shù)據(jù)庫代理來連接OceanBase集群,OCJ驅(qū)動主要用于兼容一些歷史集群和應(yīng)用程序。

語法結(jié)構(gòu)

CREATE TABLE oceanabse_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);

說明

連接器寫入結(jié)果表原理:寫入結(jié)果表時,會將接收到的每條數(shù)據(jù)拼接成一條SQL去執(zhí)行。具體執(zhí)行的SQL情況如下:

  • 對于沒有主鍵的結(jié)果表,會拼接成INSERT INTO語句。

  • 對于包含主鍵的結(jié)果表,會根據(jù)數(shù)據(jù)庫的兼容模式拼接成UPSERT語句。

WITH參數(shù)

  • 通用

    參數(shù)

    說明

    是否必填

    數(shù)據(jù)類型

    默認(rèn)值

    備注

    connector

    表類型。

    STRING

    • 作為CDC源表或維表時,固定值為oceanbase

    • 作為結(jié)果表時,參數(shù)取值如下:

      • 如果使用了OceanBase數(shù)據(jù)庫代理,則表類型取值為oceanbase

      • 如果直連OceanBase集群,則表類型取值為oceanbase-ocj

    userName

    用戶名。

    STRING

    無。

    password

    密碼。

    STRING

    無。

  • 源表獨有

    說明

    連接器支持通過數(shù)據(jù)庫名稱(database-name)和表名(table-name)的正則匹配和表列表(table-list)的精確匹配兩種模式來指定需要監(jiān)聽的表。當(dāng)同時使用兩種方式時,將會監(jiān)聽兩種方式匹配的所有表。

    參數(shù)

    說明

    是否必填

    數(shù)據(jù)類型

    默認(rèn)值

    備注

    logproxy.host

    OceanBase日志代理服務(wù)器的IP地址或主機(jī)名。

    String

    無。

    logproxy.port

    OceanBase日志代理服務(wù)器的端口號。

    Integer

    無。

    scan.startup.mode

    OceanBase CDC的啟動模式。

    String

    參數(shù)取值如下:

    • initial:從初始位點開始拉取全部數(shù)據(jù)。

    • latest-offset:從當(dāng)前位點開始拉取變更數(shù)據(jù)。

    • timestamp:從指定的時間戳開始拉取變更數(shù)據(jù)。

    tenant-name

    OceanBase數(shù)據(jù)庫的租戶名。

    String

    無。

    database-name

    OceanBase數(shù)據(jù)庫名稱。

    String

    支持使用正則表達(dá)式指定數(shù)據(jù)庫名稱。

    說明

    僅支持在啟動模式為initial時,使用該參數(shù)。

    table-name

    OceanBase數(shù)據(jù)庫的表名稱。

    String

    支持使用正則表達(dá)式指定表名稱。

    說明

    僅支持在啟動模式為initial時,使用該參數(shù)。

    table-list

    OceanBase數(shù)據(jù)庫的全路徑的表名列表。

    String

    可以使用英文逗號(,)分隔,例如db1.table1, db2.table2

    hostname

    OceanBase數(shù)據(jù)庫或 OceanBase代理的IP地址或主機(jī)名。

    String

    無。

    port

    OceanBase數(shù)據(jù)庫服務(wù)器的端口號。

    Integer

    可以是OceanBase服務(wù)器的SQL端口號(默認(rèn)值為2881)

    或OceanBase代理服務(wù)器的端口號(默認(rèn)值為2883)。

    connect.timeout

    連接到OceanBase數(shù)據(jù)庫服務(wù)器之前的最長超時時間。

    Duration

    30s

    無。

    server-time-zone

    數(shù)據(jù)庫服務(wù)器中的會話時區(qū)。

    String

    +00:00

    會話時區(qū)值的合法格式為±hh:mm,表示與協(xié)調(diào)世界時(UTC)的時區(qū)偏移量。

    說明
    • 會話時區(qū)的設(shè)置會影響到時間類型的顯示和存儲方式。因此,如果您需要控制OceanBase的時間類型如何轉(zhuǎn)換為字符串,則需要設(shè)置合理的會話時區(qū)信息,以確保顯示正確的本地時間。

    • 如果您在MySQL數(shù)據(jù)庫中已存在一個用于存儲時區(qū)信息的表,則在設(shè)置時區(qū)時,可以使用這個表中已經(jīng)創(chuàng)建的時區(qū)作為合法的值。

    logproxy.client.id

    OceanBase日志代理服務(wù)器的客戶端連接ID。

    String

    規(guī)則生成

    如果您沒有指定,則Flink會默認(rèn)按照{flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}規(guī)則生成。

    rootserver-list

    OceanBase根服務(wù)器列表。

    String

    服務(wù)器列表格式為ip:rpc_port:sql_port。您可以執(zhí)行SHOW PARAMETERS LIKE 'rootservice_list';SQL語句獲取服務(wù)器列表信息。

    說明
    • OceanBase社區(qū)版本必填。

    • 多個服務(wù)器地址使用英文分號(;)分隔。

    config-url

    從配置服務(wù)器獲取服務(wù)器信息的url。

    String

    OceanBase企業(yè)版本必填。

    working-mode

    日志代理中l(wèi)ibobcdc的工作模式。

    String

    storage

    參數(shù)取值如下:

    • storage:表示數(shù)據(jù)將被存儲在磁盤或其他持久性存儲介質(zhì)中。

    • memory:表示數(shù)據(jù)將被存儲在內(nèi)存中。

    compatible-mode

    OceanBase的兼容模式。

    String

    mysql

    參數(shù)取值如下:

    • mysql

    • oracle

    jdbc.driver

    全量讀取源表數(shù)據(jù)時使用的jdbc驅(qū)動類名。

    String

    com.mysql.jdbc.Driver

    無。

    jdbc.properties.*

    傳遞自定義的JDBC URL屬性。

    String

    例如 'jdbc.properties.useSSL' = 'false'表示不使用SSL加密。

    obcdc.properties.*

    將自定義的 OBCDC參數(shù)傳遞給libobcdc。

    String

    例如'obcdc.properties.sort_trans_participants' = '1'

    更多參數(shù)信息見OBCDC配置項說明

  • 維表獨有

    參數(shù)

    說明

    是否必填

    數(shù)據(jù)類型

    默認(rèn)值

    備注

    url

    JDBC url或config url。

    STRING

    • 當(dāng)連接器類型為oceanbase時使用JDBC url,連接器類型為oceanbase-ocj時,使用config url。

    • url中需要包含MySQL database名或Oracle service名。

    cache

    緩存策略。

    STRING

    ALL

    支持以下三種緩存策略:

    • ALL:緩存維表里的所有數(shù)據(jù)。在Job運行前,系統(tǒng)會將維表中所有數(shù)據(jù)加載到Cache中,之后所有的維表查找數(shù)據(jù)都會通過Cache進(jìn)行。如果在Cache中無法找到數(shù)據(jù),則KEY不存在,并在Cache過期后重新加載一遍全量Cache。

      適用于遠(yuǎn)程表數(shù)據(jù)量小且MISS KEY(源表數(shù)據(jù)和維表JOIN時,ON條件無法關(guān)聯(lián))特別多的場景。

    • LRU:緩存維表里的部分?jǐn)?shù)據(jù)。源表的每條數(shù)據(jù)都會觸發(fā)系統(tǒng)先在Cache中查找數(shù)據(jù)。如果沒有找到,則去物理維表中查找。使用該緩存策略時,必須配置cacheSize參數(shù)。

    • None:無緩存。

    重要
    • 使用ALL緩存策略時,請注意節(jié)點內(nèi)存大小,防止出現(xiàn)OOM。

    • 因為系統(tǒng)會異步加載維表數(shù)據(jù),所以在使用ALL緩存策略時,需要增加維表JOIN節(jié)點的內(nèi)存,增加的內(nèi)存大小為遠(yuǎn)程表數(shù)據(jù)量的兩倍。

    cacheSize

    最大緩存條數(shù)。

    INTEGER

    100000

    • 當(dāng)選擇LRU緩存策略后,必須設(shè)置緩存大小。

    • 當(dāng)選擇ALL緩存策略后,可以不設(shè)置緩存大小。

    cacheTTLMs

    緩存超時時間。

    LONG

    Long.MAX_VALUE

    cacheTTLMs的配置和cache有關(guān),詳情如下:

    • 如果cache配置為None,則cacheTTLMs可以不配置,表示緩存不超時。

    • 如果cache配置為LRU,則cacheTTLMs為緩存超時時間。默認(rèn)不過期。

    • 如果cache配置為ALL,則cacheTTLMs為緩存加載時間。默認(rèn)不重新加載。

    maxRetryTimeout

    最大重試時間。

    DURATION

    60s

    無。

  • 結(jié)果表獨有

    參數(shù)

    說明

    是否必填

    數(shù)據(jù)類型

    默認(rèn)值

    備注

    compatibleMode

    OceanBase的兼容模式。

    STRING

    mysql

    參數(shù)取值如下:

    • mysql

    • oracle

    說明

    oceanabse獨有參數(shù)。

    databaseName

    數(shù)據(jù)庫名。

    STRING

    應(yīng)當(dāng)與config url中保持一致。

    說明

    oceanbase-ocj獨有參數(shù)。

    passwordEncrypted

    是否使用加密過的密碼。

    Boolean

    false

    oceanbase-ocj獨有參數(shù)。

    slowQueryThresholdMs

    慢查詢等待閾值。

    INTEGER

    60000

    單位毫秒。

    說明

    oceanbase-ocj獨有參數(shù)。

    url

    JDBC url或config url。

    STRING

    • 當(dāng)連接器類型為oceanbase時使用JDBC url,連接器類型為oceanbase-ocj時,使用config url。

    • url中需要包含MySQL database名或Oracle service名。

    tableName

    表名。

    STRING

    無。

    maxRetryTimes

    最大重試次數(shù)。

    INTEGER

    3

    無。

    poolInitialSize

    數(shù)據(jù)庫連接池初始大小。

    INTEGER

    1

    無。

    poolMaxActive

    數(shù)據(jù)庫連接池最大連接數(shù)。

    INTEGER

    8

    無。

    poolMaxWait

    從數(shù)據(jù)庫連接池中獲取連接的最大等待時間。

    INTEGER

    2000

    單位毫秒。

    poolMinIdle

    數(shù)據(jù)庫連接池中最小空閑連接數(shù)。

    INTEGER

    1

    無。

    connectionProperties

    jdbc的連接屬性。

    STRING

    格式為"k1=v1;k2=v2;k3=v3"。

    ignoreDelete

    是否忽略數(shù)據(jù)Delete操作。

    Boolean

    false

    無。

    excludeUpdateColumns

    指定要排除的列名。在執(zhí)行更新操作時,這些列將不會被更新。

    STRING

    如果忽略指定的字段為多個時,則需要使用英文逗號(,)分隔。例如excludeUpdateColumns=column1,column2

    說明

    該值始終會包含主鍵列,也就是實際生效的列名為您指定的列名和主鍵列。

    partitionKey

    分區(qū)鍵。

    STRING

    當(dāng)設(shè)置分區(qū)鍵時,連接器會先將數(shù)據(jù)按照分區(qū)鍵進(jìn)行分組,各個分組將分別寫入數(shù)據(jù)庫。這里的分組處理早于modRule的處理。

    modRule

    分組規(guī)則。

    STRING

    分組規(guī)則格式需要為"列名mod數(shù)字",列類型必須為數(shù)字類型。當(dāng)設(shè)置分組規(guī)則時,數(shù)據(jù)會根據(jù)計算所得結(jié)果進(jìn)行分組,各個分組將分別寫入數(shù)據(jù)庫。這里的分組處理晚于partitionKey的處理。

    bufferSize

    數(shù)據(jù)緩沖區(qū)大小。

    INTEGER

    1000

    無。

    flushIntervalMs

    清空緩存的時間間隔。表示如果緩存中的數(shù)據(jù)在等待指定時間后,依然沒有達(dá)到輸出條件,系統(tǒng)會自動輸出緩存中的所有數(shù)據(jù)。

    LONG

    1000

    無。

    retryIntervalMs

    最大重試時間。

    INTEGER

    5000

    單位毫秒。

類型映射

  • MySQL兼容模式

    OceanBase字段類型

    Flink字段類型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    說明

    其中p <= 38。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(?n/8?)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    重要

    Flink僅支持小于等于2,147,483,647(2^31 - 1)的BLOB類型的記錄。

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • Oracle兼容模式

    OceanBase字段類型

    Flink字段類型

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIMEZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

使用示例

  • 源表&結(jié)果表

    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'scan.startup.mode' = 'initial',
      'username' = 'user',
      'password' = 'password',
      'tenant-name' = 'tenant',
      'database-name' = '^test_db$',
      'table-name' = '^orders$',
      'hostname' = '11.22.xx.xx',
      'port' = '2883',
      'config-url' = 'http://11.22.xx.xx:55/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx',
      'logproxy.host' = '11.22.xx.xx',
      'logproxy.port' = '2983',
      'working-mode' = 'memory'
    );
    
    -- oceanbase結(jié)果表
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    --oceanbase-ocj結(jié)果表
    CREATE TEMPORARY TABLE oceanbase_ocj_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase-ocj',
      'url' = '<yourConfigUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'databaseName' = '<yourDatabaseName>',
      'tableName' = '<yourTableName>'
    );
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    INSERT INTO oceanbase_ocj_sink
    SELECT * FROM oceanbase_source;
    END; 

  • 維表

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'tableName' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

相關(guān)文檔

Flink支持的連接器,請參見支持的連接器