OceanBase(公測中)
本文為您介紹如何使用OceanBase連接器。
背景信息
OceanBase數(shù)據(jù)庫是一款原生分布式的HTAP數(shù)據(jù)庫管理系統(tǒng),詳情請參見OceanBase官網(wǎng)。為了降低您從MySQL數(shù)據(jù)庫或Oracle數(shù)據(jù)庫遷移到OceanBase數(shù)據(jù)庫時引發(fā)的業(yè)務(wù)系統(tǒng)改造成本,OceanBase數(shù)據(jù)庫支持Oracle和MySQL兩種兼容模式,兩種模式下的數(shù)據(jù)類型、SQL功能、內(nèi)部視圖等與MySQL數(shù)據(jù)庫或Oracle數(shù)據(jù)庫保持一致。兩種模式下建議使用的連接器如下:
Oracle模式:只能使用OceanBase連接器。
MySQL模式:與原生MySQL保持高度兼容,支持使用OceanBase和MySQL兩種連接器,MySQL連接器詳情請參見MySQL。
在無需高級特性的情況下,維表和結(jié)果表建議優(yōu)先考慮MySQL連接器,配置更簡單。
使用OceanBase 3.2.4.4及以上版本時,源表建議優(yōu)先使用MySQL連接器。這是因為OceanBase 3.2.4.4及以上版本MySQL模式開始支持開啟Binlog服務(wù),輸出格式與原生MySQL Binlog一致(有關(guān)OceanBase Binlog的更多信息,請參見概述或Binlog 相關(guān)操作)。
重要使用MySQL連接器讀取OceanBase Binlog目前處于公測階段,請在使用前充分評估并謹(jǐn)慎使用。
使用MySQL連接器讀取OceanBase時,請確保OceanBase Binlog已開啟且被正確設(shè)置,詳情請參見Binlog 相關(guān)操作。
OceanBase連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 源表、維表和結(jié)果表 |
運行模式 | 流模式和批模式 |
數(shù)據(jù)格式 | 暫不適用 |
特有監(jiān)控指標(biāo) | 暫無 |
API種類 | SQL |
是否支持更新或刪除結(jié)果表數(shù)據(jù) | 是 |
前提條件
連接的數(shù)據(jù)庫和表都已被創(chuàng)建。具體操作可參考以下文檔:
MySQL模式
Oracle模式
使用限制
維表和結(jié)果表
Flink計算引擎VVR 8.0.1及以上版本支持OceanBase連接器。
語義上可以保證At-Least-Once,在結(jié)果表有主鍵的情況下,冪等可以保證數(shù)據(jù)的正確性。
結(jié)果表:OceanBase數(shù)據(jù)庫沒有部署數(shù)據(jù)庫代理服務(wù)時,連接器會使用OCJ(OceanBase Connector Java)連接OceanBase數(shù)據(jù)庫,該模式需要用到config url,要求OceanBase數(shù)據(jù)庫已部署OceanBase云平臺。該工作方式只能用于OceanBase數(shù)據(jù)庫的MySQL兼容模式,不支持Oracle兼容模式。
說明數(shù)據(jù)庫代理服務(wù)與OCJ實現(xiàn)了相同的路由功能,區(qū)別在于OCJ驅(qū)動集成于Java應(yīng)用程序,而數(shù)據(jù)庫代理是一個獨立的代理服務(wù)。目前,OceanBase團(tuán)隊推薦通過數(shù)據(jù)庫代理來連接OceanBase集群,OCJ驅(qū)動主要用于兼容一些歷史集群和應(yīng)用程序。
語法結(jié)構(gòu)
CREATE TABLE oceanabse_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<yourJdbcUrl>',
'tableName' = '<yourTableName>',
'userName' = '<yourUserName>',
'password' = '<yourPassword>'
);
連接器寫入結(jié)果表原理:寫入結(jié)果表時,會將接收到的每條數(shù)據(jù)拼接成一條SQL去執(zhí)行。具體執(zhí)行的SQL情況如下:
對于沒有主鍵的結(jié)果表,會拼接成INSERT INTO語句。
對于包含主鍵的結(jié)果表,會根據(jù)數(shù)據(jù)庫的兼容模式拼接成UPSERT語句。
WITH參數(shù)
通用
參數(shù)
說明
是否必填
數(shù)據(jù)類型
默認(rèn)值
備注
connector
表類型。
是
STRING
無
作為CDC源表或維表時,固定值為
oceanbase
。作為結(jié)果表時,參數(shù)取值如下:
如果使用了OceanBase數(shù)據(jù)庫代理,則表類型取值為
oceanbase
,如果直連OceanBase集群,則表類型取值為
oceanbase-ocj
。
userName
用戶名。
是
STRING
無
無。
password
密碼。
是
STRING
無
無。
源表獨有
說明連接器支持通過數(shù)據(jù)庫名稱(database-name)和表名(table-name)的正則匹配和表列表(table-list)的精確匹配兩種模式來指定需要監(jiān)聽的表。當(dāng)同時使用兩種方式時,將會監(jiān)聽兩種方式匹配的所有表。
參數(shù)
說明
是否必填
數(shù)據(jù)類型
默認(rèn)值
備注
logproxy.host
OceanBase日志代理服務(wù)器的IP地址或主機(jī)名。
是
String
無
無。
logproxy.port
OceanBase日志代理服務(wù)器的端口號。
是
Integer
無
無。
scan.startup.mode
OceanBase CDC的啟動模式。
是
String
無
參數(shù)取值如下:
initial:從初始位點開始拉取全部數(shù)據(jù)。
latest-offset:從當(dāng)前位點開始拉取變更數(shù)據(jù)。
timestamp:從指定的時間戳開始拉取變更數(shù)據(jù)。
tenant-name
OceanBase數(shù)據(jù)庫的租戶名。
是
String
無
無。
database-name
OceanBase數(shù)據(jù)庫名稱。
否
String
無
支持使用正則表達(dá)式指定數(shù)據(jù)庫名稱。
說明僅支持在啟動模式為initial時,使用該參數(shù)。
table-name
OceanBase數(shù)據(jù)庫的表名稱。
否
String
無
支持使用正則表達(dá)式指定表名稱。
說明僅支持在啟動模式為initial時,使用該參數(shù)。
table-list
OceanBase數(shù)據(jù)庫的全路徑的表名列表。
否
String
無
可以使用英文逗號(,)分隔,例如
db1.table1, db2.table2
。hostname
OceanBase數(shù)據(jù)庫或 OceanBase代理的IP地址或主機(jī)名。
否
String
無
無。
port
OceanBase數(shù)據(jù)庫服務(wù)器的端口號。
否
Integer
無
可以是OceanBase服務(wù)器的SQL端口號(默認(rèn)值為2881)
或OceanBase代理服務(wù)器的端口號(默認(rèn)值為2883)。
connect.timeout
連接到OceanBase數(shù)據(jù)庫服務(wù)器之前的最長超時時間。
否
Duration
30s
無。
server-time-zone
數(shù)據(jù)庫服務(wù)器中的會話時區(qū)。
否
String
+00:00
會話時區(qū)值的合法格式為
±hh:mm
,表示與協(xié)調(diào)世界時(UTC)的時區(qū)偏移量。說明會話時區(qū)的設(shè)置會影響到時間類型的顯示和存儲方式。因此,如果您需要控制OceanBase的時間類型如何轉(zhuǎn)換為字符串,則需要設(shè)置合理的會話時區(qū)信息,以確保顯示正確的本地時間。
如果您在MySQL數(shù)據(jù)庫中已存在一個用于存儲時區(qū)信息的表,則在設(shè)置時區(qū)時,可以使用這個表中已經(jīng)創(chuàng)建的時區(qū)作為合法的值。
logproxy.client.id
OceanBase日志代理服務(wù)器的客戶端連接ID。
否
String
規(guī)則生成
如果您沒有指定,則Flink會默認(rèn)按照
{flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}
規(guī)則生成。rootserver-list
OceanBase根服務(wù)器列表。
否
String
無
服務(wù)器列表格式為
ip:rpc_port:sql_port
。您可以執(zhí)行SHOW PARAMETERS LIKE 'rootservice_list';
SQL語句獲取服務(wù)器列表信息。說明OceanBase社區(qū)版本必填。
多個服務(wù)器地址使用英文分號(;)分隔。
config-url
從配置服務(wù)器獲取服務(wù)器信息的url。
否
String
無
OceanBase企業(yè)版本必填。
working-mode
日志代理中l(wèi)ibobcdc的工作模式。
否
String
storage
參數(shù)取值如下:
storage:表示數(shù)據(jù)將被存儲在磁盤或其他持久性存儲介質(zhì)中。
memory:表示數(shù)據(jù)將被存儲在內(nèi)存中。
compatible-mode
OceanBase的兼容模式。
否
String
mysql
參數(shù)取值如下:
mysql
oracle
jdbc.driver
全量讀取源表數(shù)據(jù)時使用的jdbc驅(qū)動類名。
否
String
com.mysql.jdbc.Driver
無。
jdbc.properties.*
傳遞自定義的JDBC URL屬性。
否
String
無
例如
'jdbc.properties.useSSL' = 'false'
表示不使用SSL加密。obcdc.properties.*
將自定義的 OBCDC參數(shù)傳遞給libobcdc。
否
String
無
例如
'obcdc.properties.sort_trans_participants' = '1'
。更多參數(shù)信息見OBCDC配置項說明。
維表獨有
參數(shù)
說明
是否必填
數(shù)據(jù)類型
默認(rèn)值
備注
url
JDBC url或config url。
是
STRING
無
當(dāng)連接器類型為
oceanbase
時使用JDBC url,連接器類型為oceanbase-ocj
時,使用config url。url中需要包含MySQL database名或Oracle service名。
cache
緩存策略。
否
STRING
ALL
支持以下三種緩存策略:
ALL:緩存維表里的所有數(shù)據(jù)。在Job運行前,系統(tǒng)會將維表中所有數(shù)據(jù)加載到Cache中,之后所有的維表查找數(shù)據(jù)都會通過Cache進(jìn)行。如果在Cache中無法找到數(shù)據(jù),則KEY不存在,并在Cache過期后重新加載一遍全量Cache。
適用于遠(yuǎn)程表數(shù)據(jù)量小且MISS KEY(源表數(shù)據(jù)和維表JOIN時,ON條件無法關(guān)聯(lián))特別多的場景。
LRU:緩存維表里的部分?jǐn)?shù)據(jù)。源表的每條數(shù)據(jù)都會觸發(fā)系統(tǒng)先在Cache中查找數(shù)據(jù)。如果沒有找到,則去物理維表中查找。使用該緩存策略時,必須配置cacheSize參數(shù)。
None:無緩存。
重要使用ALL緩存策略時,請注意節(jié)點內(nèi)存大小,防止出現(xiàn)OOM。
因為系統(tǒng)會異步加載維表數(shù)據(jù),所以在使用ALL緩存策略時,需要增加維表JOIN節(jié)點的內(nèi)存,增加的內(nèi)存大小為遠(yuǎn)程表數(shù)據(jù)量的兩倍。
cacheSize
最大緩存條數(shù)。
否
INTEGER
100000
當(dāng)選擇LRU緩存策略后,必須設(shè)置緩存大小。
當(dāng)選擇ALL緩存策略后,可以不設(shè)置緩存大小。
cacheTTLMs
緩存超時時間。
否
LONG
Long.MAX_VALUE
cacheTTLMs的配置和cache有關(guān),詳情如下:
如果cache配置為None,則cacheTTLMs可以不配置,表示緩存不超時。
如果cache配置為LRU,則cacheTTLMs為緩存超時時間。默認(rèn)不過期。
如果cache配置為ALL,則cacheTTLMs為緩存加載時間。默認(rèn)不重新加載。
maxRetryTimeout
最大重試時間。
否
DURATION
60s
無。
結(jié)果表獨有
參數(shù)
說明
是否必填
數(shù)據(jù)類型
默認(rèn)值
備注
compatibleMode
OceanBase的兼容模式。
否
STRING
mysql
參數(shù)取值如下:
mysql
oracle
說明oceanabse獨有參數(shù)。
databaseName
數(shù)據(jù)庫名。
是
STRING
無
應(yīng)當(dāng)與config url中保持一致。
說明oceanbase-ocj獨有參數(shù)。
passwordEncrypted
是否使用加密過的密碼。
否
Boolean
false
oceanbase-ocj獨有參數(shù)。
slowQueryThresholdMs
慢查詢等待閾值。
否
INTEGER
60000
單位毫秒。
說明oceanbase-ocj獨有參數(shù)。
url
JDBC url或config url。
是
STRING
無
當(dāng)連接器類型為
oceanbase
時使用JDBC url,連接器類型為oceanbase-ocj
時,使用config url。url中需要包含MySQL database名或Oracle service名。
tableName
表名。
是
STRING
無
無。
maxRetryTimes
最大重試次數(shù)。
否
INTEGER
3
無。
poolInitialSize
數(shù)據(jù)庫連接池初始大小。
否
INTEGER
1
無。
poolMaxActive
數(shù)據(jù)庫連接池最大連接數(shù)。
否
INTEGER
8
無。
poolMaxWait
從數(shù)據(jù)庫連接池中獲取連接的最大等待時間。
否
INTEGER
2000
單位毫秒。
poolMinIdle
數(shù)據(jù)庫連接池中最小空閑連接數(shù)。
否
INTEGER
1
無。
connectionProperties
jdbc的連接屬性。
否
STRING
無
格式為"k1=v1;k2=v2;k3=v3"。
ignoreDelete
是否忽略數(shù)據(jù)Delete操作。
否
Boolean
false
無。
excludeUpdateColumns
指定要排除的列名。在執(zhí)行更新操作時,這些列將不會被更新。
否
STRING
無
如果忽略指定的字段為多個時,則需要使用英文逗號(,)分隔。例如
excludeUpdateColumns=column1,column2
。說明該值始終會包含主鍵列,也就是實際生效的列名為您指定的列名和主鍵列。
partitionKey
分區(qū)鍵。
否
STRING
無
當(dāng)設(shè)置分區(qū)鍵時,連接器會先將數(shù)據(jù)按照分區(qū)鍵進(jìn)行分組,各個分組將分別寫入數(shù)據(jù)庫。這里的分組處理早于modRule的處理。
modRule
分組規(guī)則。
否
STRING
無
分組規(guī)則格式需要為"列名mod數(shù)字",列類型必須為數(shù)字類型。當(dāng)設(shè)置分組規(guī)則時,數(shù)據(jù)會根據(jù)計算所得結(jié)果進(jìn)行分組,各個分組將分別寫入數(shù)據(jù)庫。這里的分組處理晚于partitionKey的處理。
bufferSize
數(shù)據(jù)緩沖區(qū)大小。
否
INTEGER
1000
無。
flushIntervalMs
清空緩存的時間間隔。表示如果緩存中的數(shù)據(jù)在等待指定時間后,依然沒有達(dá)到輸出條件,系統(tǒng)會自動輸出緩存中的所有數(shù)據(jù)。
否
LONG
1000
無。
retryIntervalMs
最大重試時間。
否
INTEGER
5000
單位毫秒。
類型映射
MySQL兼容模式
OceanBase字段類型
Flink字段類型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
說明其中p <= 38。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(?n/8?)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
重要Flink僅支持小于等于2,147,483,647(2^31 - 1)的BLOB類型的記錄。
BLOB
MEDIUMBLOB
LONGBLOB
Oracle兼容模式
OceanBase字段類型
Flink字段類型
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
FLOAT
BINARY_FLOAT
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n)
STRING
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
BLOB
BYTES
ROWID
使用示例
源表&結(jié)果表
CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'scan.startup.mode' = 'initial', 'username' = 'user', 'password' = 'password', 'tenant-name' = 'tenant', 'database-name' = '^test_db$', 'table-name' = '^orders$', 'hostname' = '11.22.xx.xx', 'port' = '2883', 'config-url' = 'http://11.22.xx.xx:55/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx', 'logproxy.host' = '11.22.xx.xx', 'logproxy.port' = '2983', 'working-mode' = 'memory' ); -- oceanbase結(jié)果表 CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); --oceanbase-ocj結(jié)果表 CREATE TEMPORARY TABLE oceanbase_ocj_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase-ocj', 'url' = '<yourConfigUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'databaseName' = '<yourDatabaseName>', 'tableName' = '<yourTableName>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; INSERT INTO oceanbase_ocj_sink SELECT * FROM oceanbase_source; END;
維表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'tableName' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
相關(guān)文檔
Flink支持的連接器,請參見支持的連接器。