本文為您介紹如何使用StarRocks連接器。
背景信息
StarRocks是新一代極速全場景MPP(Massively Parallel Processing)數據倉庫,致力于構建極速和統一分析體驗。StarRocks具有以下優勢:
StarRocks兼容MySQL協議,可以使用MySQL客戶端和常用BI工具對接StarRocks來分析數據。
StarRocks采用分布式架構:
對數據表進行水平劃分并以多副本存儲。
集群規模可以靈活伸縮,支持10 PB級別的數據分析。
支持MPP框架,并行加速計算。
支持多副本,具有彈性容錯能力。
Flink連接器內部的結果表是通過緩存并批量由Stream Load導入實現,源表是通過批量讀取數據實現。StarRocks連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 源表和結果表、數據攝入目標端 |
運行模式 | 流模式和批模式 |
數據格式 | CSV |
特有監控指標 | 暫無 |
API種類 | Datastream、SQL和數據攝入YAML |
是否支持更新或刪除結果表數據 | 是 |
前提條件
已創建StarRocks集群,包括EMR的StarRocks或基于ECS的云上自建StarRocks。
使用限制
僅實時計算引擎VVR 6.0.5及以上版本支持StarRocks連接器。
StarRocks連接器僅支持at-least-once和exactly-once語義。
SQL
特色功能
EMR的StarRocks支持CTAS&CDAS功能,CTAS可以實現單表的結構和數據同步,CDAS可以實現整庫同步或者同一庫中的多表結構和數據同步,詳情請參見基于實時計算Flink使用CTAS&CDAS功能同步MySQL數據至StarRocks。
語法結構
CREATE TABLE USER_RESULT(
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);
WITH參數
類型 | 參數 | 說明 | 數據類型 | 是否必填 | 默認值 | 備注 |
通用 | connector | 表類型。 | String | 是 | 無 | 固定值為starrocks。 |
jdbc-url | JDBC連接的URL。 | String | 是 | 無 | 指定FE(Front End)的IP和JDBC端口,格式為 | |
database-name | StarRocks數據庫名稱。 | String | 是 | 無 | 無。 | |
table-name | StarRocks表名稱。 | String | 是 | 無 | 無。 | |
username | StarRocks連接用戶名。 | String | 是 | 無 | 無。 | |
password | StarRocks連接密碼。 | String | 是 | 無 | 無。 | |
starrocks.create.table.properties | StarRocks表屬性。 | String | 否 | 無 | 設置數據表初始屬性,如引擎、副本數等。例如,'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'。 | |
源表獨有 | scan-url | 數據掃描的url。 | String | 否 | 無 | 指定FE(Front End)的IP和HTTP端口,格式為 說明 填寫多個IP和端口號時,請使用半角分號(;)進行分隔。 |
scan.connect.timeout-ms | flink-connector-starrocks連接StarRocks的時間上限。 超過該時間上限,將報錯。 | String | 否 | 1000 | 單位為毫秒。 | |
scan.params.keep-alive-min | 查詢任務的保活時間。 | String | 否 | 10 | 無。 | |
scan.params.query-timeout-s | 查詢任務的超時時間。 如果超過該時間,仍未返回查詢結果,則停止查詢任務。 | String | 否 | 600 | 單位為秒。 | |
scan.params.mem-limit-byte | BE節點中單個查詢的內存上限。 | String | 否 | 1073741824(1 GB) | 單位為字節。 | |
scan.max-retries | 查詢失敗時的最大重試次數。 超過該數量上限,則將報錯。 | String | 否 | 1 | 無。 | |
結果表獨有 | load-url | 數據導入的URL。 | String | 是 | 無 | 指定FE(Front End)的IP和HTTP端口,格式為 說明 填寫多個IP和端口號時,請使用半角分號(;)進行分隔。 |
sink.semantic | 數據寫入語義。 | String | 否 | at-least-once | 取值如下:
| |
sink.buffer-flush.max-bytes | Buffer可容納的最大數據量。 | String | 否 | 94371840(90 MB) | 取值范圍為64 MB~10 GB。 | |
sink.buffer-flush.max-rows | Buffer可容納的最大數據行數。 | String | 否 | 500000 | 取值范圍為64,000~5000,000。 | |
sink.buffer-flush.interval-ms | Buffer刷新時間間隔。 | String | 否 | 300000 | 取值范圍為1000毫秒~3600000毫秒。 | |
sink.max-retries | 最大重試次數。 | String | 否 | 3 | 取值范圍為0~10。 | |
sink.connect.timeout-ms | 連接到starrocks的超時時間。 | String | 否 | 1000 | 取值范圍為100~60000。單位為毫秒。 | |
sink.properties.* | 結果表屬性。 | String | 否 | 無 | Stream Load的參數控制Stream Load導入行為。例如,參數 sink.properties.format表示Stream Load所導入的數據格式,如CSV。更多參數和解釋,請參見Stream Load。 |
類型映射
StarRocks字段類型 | Flink字段類型 |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
BIGINT UNSIGNED 說明 僅實時計算引擎VVR 8.0.10及以上版本支持。 | DECIMAL(20,0) |
LARGEINT | DECIMAL(20,0) |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR(n × 3) 說明
| CHAR(n) (n <= 85 時) |
VARCHAR(n × 3) 說明
| CHAR(n) (n > 85 時) |
VARCHAR | STRING |
VARBINARY 說明 僅實時計算引擎VVR 8.0.10及以上版本支持。 | VARBINARY |
代碼示例
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
PRIMARY KEY(`runoob_id`)
NOT ENFORCED
) WITH (
'jdbc-url' = 'jdbc:mysql://ip:9030',
'connector' = 'starrocks',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;
數據攝入
使用StarRocks Pipeline連接器,您可以輕松地將來自上游數據源的數據記錄和表結構變更寫入外部StarRocks數據庫。StarRocks連接器同時支持社區版與阿里云E-MapReduce Serverless StarRocks全托管版本。
特色功能
自動建庫建表
如果來自上游的數據庫及數據表不存在于下游StarRocks實例中,則對應的數據庫及數據表會被自動創建。您可以通過
table.create.properties.*
參數設定自動創建表時的選項。表結構變更同步
目前,StarRocks連接器支持自動將建表事件(CreateTableEvent)、增加列事件(AddColumnEvent)和刪除列(DropColumnEvent)事件自動應用到下游數據庫中。
注意事項
目前StarRocks連接器只支持At-least Once語義,并通過主鍵表來保證冪等寫入。
目前,同步的表必須包含主鍵。不含主鍵的表必須通過
transform
語句塊指定主鍵方可正常寫入下游。例如:transform: - source-table: ... primary-keys: id, ...
自動創建的表分桶鍵與主鍵相同,且不可有分區鍵。
進行表結構變更同步時,新增列只能追加到已有列的尾部。在默認的表結構演化模式Lenient下,會自動將其他位置的插入轉換到尾部。
如果您使用的StarRocks版本低于2.5.7,則必須顯式地通過
table.create.num-buckets
參數指定分桶數量。更高版本的StarRocks可以自動設定合適的分桶數。如果您使用的是StarRocks 3.2或更高版本,建議開啟
table.create.properties.fast_schema_evolution
選項來加快表結構變更的速度。
語法結構
source:
...
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
配置項
參數名稱 | 描述 | 類型 | 是否必填 | 默認值 | 備注 |
| 連接器的名稱。 | String | 是 | 無 | 固定值為 |
| Sink的顯示名稱。 | String | 否 | 無 | 無。 |
| JDBC連接的URL。 | String | 是 | 無 | 支持傳入多個地址,使用英文逗號 ( |
| 連接到FE節點的HTTP服務URL。 | String | 是 | 無 | 支持傳入多個地址,使用英文分號 ( |
| 連接到 StarRocks時使用的用戶名。 | String | 是 | 無 | 該用戶至少需要具備對目標表的SELECT和INSERT權限。您可以使用StarRocks的GRANT命令賦予相應的權限。 |
| 連接到 StarRocks時使用的密碼。 | String | 是 | 無 | 無。 |
| 在進行Stream Load導入時使用的標簽前綴。 | String | 否 | 無 | 無。 |
| 建立HTTP連接時的超時時間。 | Integer | 否 | 30000 | 單位為毫秒,取值需要介于100 ~ 60000。 |
| 從服務器得到100 Continue請求前的超時時間)。 | Integer | 否 | 30000 | 單位為毫秒。取值需要介于3000 ~ 600000。 |
| 在將數據寫入StarRocks前,最多可以在內存中緩存多大量的數據。 | Long | 否 | 157286400 | 單位為字節,取值需要介于64 MB ~ 10 GB。 說明
|
| 每張表連續兩次Flush之間的間隔時間。 | Long | 否 | 300000 | 單位為毫秒。 |
| 連續兩次檢查是否應該進行Flush之間的間隔時間。 | Long | 否 | 50 | 單位為毫秒。 |
| 在進行 Stream Load導入時的線程數量。 | Integer | 否 | 2 | 無。 |
| 是否使用Stream Load事務接口進行導入。 | Boolean | 否 | true | 僅在數據庫支持的情況下生效。 |
| 提供給Sink的額外參數。 | String | 否 | 無 | 可以在STREAM LOAD查看支持的參數。 |
| 自動建表時的Bucket數量。 | Integer | 否 | 無 |
|
| 在自動建表時需要傳遞的額外參數。 | String | 否 | 無 | 例如,可以傳遞 |
| 執行表結構變更的超時時間。 | Duration | 否 | 30 min | 必須設定為整數秒。 說明 如果某個表結構變更操作耗時超過此限制,作業將運行失敗。 |
類型映射
StarRocks并不支持所有的CDC YAML類型,嘗試將不支持的類型寫入下游會導致作業失敗。您可以使用Transform CAST內置函數對不支持的數據進行轉換,或是使用Projection語句將其從結果表中移除。詳情請參考數據攝入開發參考。
CDC類型 | StarRocks類型 | 附注 |
TINYINT | TINYINT | 無 |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | DATETIME | |
CHAR(n) (n <= 85 時) | CHAR(n × 3) | CDC中的CHAR類型長度表示字符數,而StarRocks中的CHAR類型長度表示UTF-8編碼后的字節數。通常情況下,一個中文字符經過UTF-8編碼后不會超過3字節,因此映射到的StarRocks CHAR類型長度為原來的3倍。 說明 StarRocks的CHAR類型長度最長不可超過255,因此只有長度不超過85的CDC CHAR類型才會被映射到StarRocks CHAR類型。 |
CHAR(n) (n > 85 時) | VARCHAR(n × 3) | CDC中的CHAR類型長度表示字符數,而StarRocks中的CHAR類型長度表示UTF-8編碼后的字節數。通常情況下,一個中文字符經過UTF-8編碼后不會超過3字節,因此映射到的 StarRocks VARCHAR類型長度為原來的3倍。 說明 StarRocks的CHAR類型長度最長不可超過255,因此長度大于85的CDC CHAR類型會被映射到StarRocks VARCHAR類型。 |
VARCHAR(n) | VARCHAR(n × 3) | CDC中的VARCHAR類型長度表示字符數,而StarRocks中的VARCHAR類型長度表示UTF-8編碼后的字節數。通常情況下,一個中文字符經過UTF-8編碼后不會超過3字節,因此映射到的StarRocks VARCHAR類型長度為原來的3倍。 |