云數(shù)據(jù)庫RDS MySQL版
本文為您介紹如何使用云數(shù)據(jù)庫RDS MySQL版連接器。
RDS MySQL基于阿里巴巴的MySQL源碼分支,經(jīng)過雙十一高并發(fā)、大數(shù)據(jù)量的考驗,擁有優(yōu)良的性能。RDS MySQL支持實例管理、賬號管理、數(shù)據(jù)庫管理、備份恢復、白名單、透明數(shù)據(jù)加密以及數(shù)據(jù)遷移等基本功能。RDS MySQL詳情請參見RDS MySQL云數(shù)據(jù)庫。
后續(xù)將計劃不再支持云數(shù)據(jù)庫RDS MySQL版連接器,建議您直接使用MySQL連接器。使用MySQL連接器請參見MySQL。
RDS MySQL連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 結(jié)果表和維表 |
運行模式 | 流模式與批模式 |
數(shù)據(jù)格式 | 暫不適用 |
特有監(jiān)控指標 |
說明 指標含義詳情,請參見監(jiān)控指標說明。 |
API種類 | SQL |
是否支持更新或刪除結(jié)果表數(shù)據(jù) | 是 |
前提條件
已創(chuàng)建RDS MySQL數(shù)據(jù)庫和表,詳情請參見創(chuàng)建數(shù)據(jù)庫和賬號。
已設置IP白名單,詳情請參見通過客戶端、命令行連接RDS MySQL實例。
使用限制
僅Flink計算引擎VVR 2.0.0及以上版本支持RDS MySQL連接器。
語義上可以保證At-Least-Once,在結(jié)果表有主鍵的情況下,冪等可以保證數(shù)據(jù)的正確性。
推薦您使用最新版本的Flink(例如6.x以上),以獲取最新的性能與穩(wěn)定性優(yōu)化。
注意事項
RDS MySQL連接器后續(xù)會逐步下線,建議您在功能滿足的前提下使用MySQL連接器。詳情請參見MySQL。
語法結(jié)構(gòu)
結(jié)果表
CREATE TABLE rds_sink( id INT, num BIGINT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='rds', 'tableName'='your-table-name', 'userName'='your-user-name', 'password'='your-password', 'url'='your-url' );
說明Flink RDS連接器寫入數(shù)據(jù)庫結(jié)果表原理:針對Flink Sink輸出數(shù)據(jù),拼接成一行SQL語句,然后執(zhí)行。對于沒有主鍵的結(jié)果表,會執(zhí)行
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);
語句。對于包含主鍵的結(jié)果表,會執(zhí)行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;
語句。請注意:如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下游數(shù)據(jù)會因為唯一索引沖突導致數(shù)據(jù)覆蓋引發(fā)數(shù)據(jù)丟失。如果在RDS MySQL云數(shù)據(jù)庫定義了自增主鍵,在Flink DDL中不應該聲明該自增字段。數(shù)據(jù)寫入過程中,數(shù)據(jù)庫會自動填補該自增字段。Flink RDS連接器僅支持寫入和刪除帶自增字段的數(shù)據(jù),不支持更新。
維表
CREATE TABLE rds_dim( id1 INT, id2 VARCHAR ) WITH ( 'connector'='rds', 'tableName'='your-table-name', 'userName'='your-user-name', 'password'='your-password', 'url'='your-url' 'cache'='NONE' );
WITH參數(shù)
通用
參數(shù)
說明
數(shù)據(jù)類型
是否必填
默認值
備注
connector
表類型。
String
是
無。
固定值為rds。
tableName
表名。
String
是
無。
無。
userName
用戶名。
String
是
無。
無。
password
密碼。
String
是
無。
無。
url
表地址。
String
是
無。
RDS MySQL云數(shù)據(jù)庫專有網(wǎng)絡VPC地址,即內(nèi)網(wǎng)地址,詳情請?參見查看或修改內(nèi)外網(wǎng)地址和端口。
URL的格式為:
jdbc:mysql://<內(nèi)網(wǎng)地址>:<端口號>/<數(shù)據(jù)庫名稱>
。說明對于結(jié)果表,建議在URL后面加上參數(shù)?rewriteBatchedStatements=true,以提高系統(tǒng)性能。
maxRetryTimes
查詢維表或者寫數(shù)據(jù)到結(jié)果表失敗后,最多重試次數(shù)。
Integer
否
在Flink計算引擎VVR 4.0.7及以上版本,該參數(shù)默認值為10。
在Flink計算引擎VVR 4.0.6及以下版本,該參數(shù)默認值為3。
無。
結(jié)果表獨有
參數(shù)
說明
數(shù)據(jù)類型
是否必填
默認值
備注
batchSize
一次批量寫入的條數(shù)。
Integer
否
在Flink計算引擎VVR 4.0.7及以上版本,該參數(shù)默認值為4096。
在Flink計算引擎VVR 4.0.0~4.0.6版本,該參數(shù)默認值為5000。
在Flink計算引擎VVR 3.x版本及以下版本,該參數(shù)默認值為100。
無。
bufferSize
內(nèi)存中緩存的最大數(shù)據(jù)條數(shù)。batchSize或 bufferSize任一到達閾值都會觸發(fā)數(shù)據(jù)寫操作。
Integer
否
10000
僅Flink計算引擎VVR 4.0.7及以上版本支持該參數(shù)。
需指定主鍵后,該參數(shù)才生效。
flushIntervalMs
flush內(nèi)存緩沖區(qū)的時間間隔。表示如果緩存中的數(shù)據(jù)在等待指定時間后,依然沒有達到輸出條件(batchSize或bufferSize),系統(tǒng)會自動寫出緩存中的所有數(shù)據(jù)到結(jié)果表。
Integer
否
在Flink計算引擎VVR 4.0.7及以上版本,該參數(shù)默認值為2000。
在Flink計算引擎VVR 4.0.0~4.0.6版本,該參數(shù)默認值為0。
在Flink計算引擎VVR 3.x版本及以下版本,該參數(shù)默認值為1000。
在默認值為0的版本中,如果不配置該參數(shù),可能導致少量數(shù)據(jù)永遠無法寫出到結(jié)果表。建議您采用更高版本的Flink。
ignoreDelete
是否忽略數(shù)據(jù)Delete操作。
Boolean
否
false
Flink SQL可能會生成數(shù)據(jù)Delete操作,在多個輸出節(jié)點根據(jù)主鍵同時更新同一張結(jié)果表的不同字段的場景下,可能導致數(shù)據(jù)結(jié)果不正確。
例如一個任務在刪除了一條數(shù)據(jù)后,另一個任務又只更新了這條數(shù)據(jù)的部分字段,其余未被更新的字段由于被刪除,其值會變成null或默認值。通過將ignoreDelete設置為true,可以避免數(shù)據(jù)刪除操作。
connectionMaxActive
數(shù)據(jù)庫連接池大小
Integer
否
40
僅Flink計算引擎VVR 4.0.7及以上版本支持該參數(shù)。
如果出現(xiàn)獲取連接超時的問題,可能是連接池不夠用,可適當增大連接池的大小。
如果數(shù)據(jù)庫能支持的最大并發(fā)連接比較小,可適當減小連接池大小或者減小作業(yè)節(jié)點并行度。
維表獨有
參數(shù)
說明
數(shù)據(jù)類型
是否必填
默認值
備注
cache
維表緩存策略。
String
否
在Flink計算引擎VVR 4.0.6之前版本,緩存策略默認值為NONE。
在Flink計算引擎VVR 4.0.6及以上版本,緩存策略的默認值為ALL。
Flink RDS MySQL連接器支持None、LRU和ALL三種緩存策略,取值含義詳情請參見背景信息。
cacheSize
緩存大小。
Integer
否
100000
當選擇LRU緩存策略后,需要設置緩存大小。
當選擇NONE或ALL緩存策略時,不必設置緩存大小。
cacheTTLMs
緩存超時時間。
Long
否
如果cache配置為NONE,則cacheTTLMs可以不配置,表示緩存不超時。
如果cache配置為LRU,則cacheTTLMs為緩存超時時間。默認不過期。
如果cache配置為ALL,則cacheTTLMs為緩存加載時間。默認不重新加載。
單位為毫秒。
maxJoinRows
主表中每一條數(shù)據(jù)查詢維表時,匹配后最多返回的結(jié)果數(shù)。
Integer
否
1024
進行Join時,主表輸入一條數(shù)據(jù),對應維表匹配后,會返回的結(jié)果總數(shù)受該參數(shù)限制。
如果您可以預估一條數(shù)據(jù)對應的維表數(shù)據(jù)最多為n條,則可以設置
maxJoinRows='n'
,以確保實時計算匹配處理效率。
類型映射
Flink字段類型 | RDS MySQL字段類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
TINYINT(1) 說明 僅維表支持該映射。 | BOOLEAN |
SMALLINT | SMALLINT |
SMALLINT | TINYINT UNSIGNED |
INT | INT |
INT | SMALLINT UNSIGNED |
BIGINT | BIGINT |
BIGINT | INT UNSIGNED |
DECIMAL(20,0) | BIGINT UNSIGNED |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
VARBINARY | VARBINARY |
使用示例
結(jié)果表
CREATE TEMPORARY TABLE datagen_source( `name` VARCHAR, `age` INT ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE rds_sink( `name` VARCHAR, `age` INT ) WITH ( 'connector'='rds', 'password'='your-password', 'tableName'='your-tablename', 'url'='your-url', 'userName'='your-username' ); INSERT INTO rds_sink SELECT * FROM datagen_source;
維表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE rds_dim( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='rds', 'password'='<yourPassword>', 'tableName'='<yourTablename>', 'url'='jdbc:mysql://xxx', 'userName'='<yourUsername>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector'='blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a=H.a;