本文為您介紹如何使用云原生數據倉庫AnalyticDB MySQL版3.0連接器。
背景信息
云原生數據倉庫AnalyticDB MySQL版3.0是融合數據庫、大數據技術于一體的云原生企業級數據倉庫服務。AnalyticDB MySQL版支持高吞吐的數據實時增刪改、低延時的實時分析和復雜ETL,兼容上下游生態工具,可用于構建企業級報表系統、數據倉庫和數據服務引擎。
ADB MySQL 3.0連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 維表和結果表 |
運行模式 | 流模式和批模式 |
數據格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | SQL |
是否支持更新或刪除結果表數據 | 是 |
前提條件
已創建AnalyticDB MySQL集群并創建表,詳情請參見創建集群和CREATE TABLE。
已設置白名單,詳情請參見設置白名單。
使用限制
僅支持作為維表和結果表,不支持作為源表。
僅Flink計算引擎VVR 3.x及以上版本支持云原生數據倉庫AnalyticDB MySQL版3.0連接器。
語法結構
CREATE TEMPORARY TABLE adb_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
Flink DDL中定義的主鍵必須和AnalyticDB MySQL數據庫物理表中的主鍵保持一致,主鍵一致包括是否存在主鍵和主鍵名稱一致。如果不一致,會影響數據正確性。
WITH參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
結果表類型。
String
是
無
固定值為adb3.0。
url
JDBC連接地址。
String
是
無
云原生數據倉庫AnalyticDB MySQL版數據庫的JDBC鏈接地址。固定格式為jdbc:mysql://<endpoint>:<port>/<databaseName>,其中:
endpoint和port:您可以登錄AnalyticDB 控制臺,單擊對應的集群名稱,進入集群信息頁面,在網絡信息中獲取。
databaseName:云原生數據倉庫AnalyticDB MySQL版數據庫名稱。
userName
用戶名。
String
是
無
無。
password
密碼。
String
是
無
無。
tableName
表名。
String
是
無
無。
maxRetryTimes
寫入或讀取數據失敗后,重試的最大次數。
Integer
否
參數默認值取值情況如下。
在Flink計算引擎VVR 3.x版本及以下版本,該參數默認值為3。
在Flink計算引擎VVR 4.0.10及以上版本,該參數默認值為10。
無。
結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
batchSize
一次批量寫入的條數。
Integer
否
參數默認值取值情況如下:
在Flink計算引擎VVR 3.x版本及以下版本,該參數默認值為100。
在Flink計算引擎VVR 4.0.10及以上版本,該參數默認值為1000。
需指定主鍵后,該參數才生效。
bufferSize
內存中緩存的數據條數。batchSize或bufferSize任一到達閾值都會觸發寫入。
Integer
否
1000
需指定主鍵后,該參數才生效。
說明僅Flink計算引擎VVR 4.0.10及以上版本支持該參數。
flushIntervalMs
清空緩存的時間間隔。表示如果緩存中的數據在等待指定時間后,依然沒有達到輸出條件,系統會自動輸出緩存中的所有數據。
Integer
否
參數默認值取值情況如下:
在Flink計算引擎VVR 3.x版本及以下版本,該參數默認值為1000。
在Flink計算引擎VVR 4.0.10及以上版本,該參數默認值為3000。
單位為毫秒。
ignoreDelete
是否忽略Delete操作。
Boolean
否
false
參數取值如下:
true:忽略Delete操作。
false:接受Delete操作。
說明僅Flink計算引擎VVR 4.0.10及以上版本支持該參數。
replaceMode
DDL中定義了主鍵的情況下,是否采用replace into語法插入數據。
Boolean
否
true
該參數取值如下:
true:采用
replace into
語法插入數據。false:采用
insert into on duplicate key update
語法插入數據。
說明僅Flink計算引擎VVR 4.0.10及以上版本支持該參數。
僅AnalyticDB MySQL 3.1.3.5及以上版本支持該參數。
此參數僅在DDL中定義了主鍵時才生效,插入數據時采用的語法詳情如下:
DDL中定義了主鍵且replaceMode=true,采用
replace into
語法插入數據。DDL中定義了主鍵且replaceMode=false,采用
insert into on duplicate key update
語法插入數據。DDL中沒有定義主鍵,采用
insert into
語法插入數據。
excludeUpdateColumns
表示更新主鍵值相同的數據時,忽略指定字段的更新。
String
否
空字符串
如果忽略指定的字段為多個時,則需要使用英文逗號(,)分割。例如
excludeUpdateColumns=column1,column2
。說明僅在replaceMode=false時,該參數才生效。在replaceMode=true時,對應字段會被更新為null。
要忽略的多個字段需要寫在一行中,不能換行。
connectionMaxActive
線程池大小。
Integer
否
40
僅Flink計算引擎VVR 4.0.10及以上版本支持該參數。
維表獨有
參數
說明
數據類型
是否必填
默認值
備注
cache
緩存策略。
String
否
ALL
云原生數據倉庫AnalyticDB MySQL版3.0維表支持以下三種緩存策略:
None:無緩存。
LRU:緩存維表里的部分數據。源表的每條數據都會觸發系統先在Cache中查找數據,如果沒有找到,則去物理維表中查找。
ALL(默認值):緩存維表里的所有數據。在Job運行前,系統會將維表中所有數據加載到Cache中,之后所有的維表查找數據都會通過Cache進行。如果在Cache中無法找到數據,則KEY不存在,并在Cache過期后重新加載一遍全量Cache。
適用于遠程表數據量小且MISS KEY在源表數據和維表JOIN時,ON條件無法關聯特別多的場景。
說明如果使用CACHE ALL時,請注意節點內存大小,防止出現OOM。
因為系統會異步加載維表數據,所以在使用CACHE ALL時,需要增加維表JOIN節點的內存,增加的內存大小為遠程表數據量的兩倍。
cacheSize
緩存大小,即緩存多少行數據。
Integer
否
100000
cacheSize配置和cache為LRU有關。當cache配置為LRU時,必須配置cacheSize參數。
cacheTTLMs
緩存超時時間,單位為毫秒。
Integer
否
Long.MAX_VALUE
cacheTTLMs配置和cache配置為LRU或ALL有關:
如果cache配置為LRU,則cacheTTLMs為緩存失效的超時時間。默認值為
Long.MAX_VALUE
,即代表緩存不過期。如果cache配置為ALL,則cacheTTLMs為物理表數據被重新加載的間隔時間。默認值為
Long.MAX_VALUE
,即代表不重新加載物理表數據。
說明如果cache配置為None,則cacheTTLMs不用配置。因為cache配置為None,表示沒有緩存,因此不用配置緩存超時時間。
maxJoinRows
主表中每一條數據查詢維表時,匹配后最多返回的結果數。
Integer
否
1024
如果您可以預估一條數據對應的維表數據最多為n條,則可以設置maxJoinRows='n',以確保實時計算匹配處理效率。
說明進行Join時,主表輸入一條數據,對應維表匹配后返回的結果總數受該參數限制。
類型映射
云原生數據倉庫AnalyticDB MySQL版3.0字段類型 | Flink字段類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) 或NUMERIC(p, s) | DECIMAL(p, s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
使用示例
結果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO adb_sink SELECT * FROM datagen_source;
維表
CREATE TEMPORARY TABLE datagen_source( `a` INT, `b` VARCHAR, `c` STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_dim ( `a` INT, `b` VARCHAR, `c` VARCHAR ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); CREATE TEMPORARY TABLE blackhole_sink( `a` INT, `b` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;