實時計算Flink版基于Flink CDC,通過開發YAML作業的方式有效地實現了將數據從源端同步到目標端的數據攝入工作。本文將為您介紹數據攝入YAML作業開發的操作步驟。
背景信息
數據攝入模塊整合了Flink CDC連接器,相對于CDAS和CTAS,它通過YAML配置的方式可以輕松定義復雜的ETL流程,并自動轉化為Flink運算邏輯。除支持整庫同步、單表同步、分庫分表同步、新增表同步、表結構變更和自定義計算列同步等能力,還支持ETL處理、Where條件過濾、列裁剪和計算列,極大地簡化了數據集成過程,有效提升了數據集成的效率和可靠性。
YAML優勢
在實時計算Flink版中,您可以選擇開發數據攝入YAML作業、SQL作業或自行開發DataStream作業完成數據同步工作。下面介紹一下數據攝入YAML作業相比于其他兩種開發方式的優勢。
YAML vs SQL
數據攝入YAML作業和SQL作業在數據傳遞過程中使用不同的數據類型:
SQL傳遞RowData,YAML傳遞DataChangeEvent和SchemaChangeEvent。SQL的每個RowData都有自己的變更類型,主要有4種類型:insert(+I),update before(-U),update after(+U)和delete(-D)。
YAML使用SchemaChangeEvent傳遞Schema變更信息,例如創建表,添加列、清空表等,DataChangeEvent用來傳遞數據變更,主要是insert,update和delete,update消息中同時包含了update before和update after的內容,這使得您能夠寫入原始變更數據到目標端。
數據攝入YAML作業相比SQL作業的優勢如下:
數據攝入YAML | SQL |
自動識別Schema,支持整庫同步 | 需要人工寫Create Table和Insert語句 |
支持多策略的Schema變更 | 不支持Schema變更 |
支持原始Changelog同步 | 破壞原始Changelog結構 |
支持讀寫多個表 | 讀寫單個表 |
相對于CTAS或CDAS語句,YAML作業功能也更為強大,可以支持:
上游表結構變更立即同步,不用等新數據寫入觸發。
支持原始Changelog同步,Update消息不拆分。
同步更多類型的Schema變更,例如Truncate Table和Drop Table等變更。
支持指定表的映射關系,靈活定義目標端表名。
支持靈活的Schema Evolution行為,用戶可配置。
支持WHERE條件過濾數據。
支持裁剪字段。
YAML vs DataStream
數據攝入YAML作業相比DataStream作業的優勢如下:
數據攝入YAML | DataStream |
為各級別用戶設計,不只是專家 | 需要熟悉Java和分布式系統 |
隱藏底層細節,便于開發 | 需要熟悉Flink框架 |
YAML格式容易理解和學習 | 需要了解Maven等工具管理相關依賴 |
已有作業方便復用 | 難以復用已有代碼 |
使用限制
僅實時計算引擎VVR 8.0.9及以上版本支持數據攝入YAML作業。
僅支持從一個源端流向一個目標端。從多個數據源讀取或寫入多個目標端時需編寫多個YAML作業。
暫不支持將YAML作業部署到Session集群。
數據攝入連接器
當前支持作為數據攝入源端和目標端的連接器如下表所示。
歡迎您通過工單、釘釘等渠道反饋感興趣的上下游存儲,未來計劃適配更多上下游以更好滿足您的需要。
連接器 | 支持類型 | |
Source | Sink | |
√ 說明 僅實時計算引擎VVR 8.0.10及以上版本支持。 | √ | |
× | √ | |
說明 支持連接RDS MySQL版、PolarDB MySQL版及自建MySQL。 | √ | × |
× | √ | |
× | √ | |
× | √ | |
× | √ |
操作步驟
登錄實時計算管理控制臺。
單擊目標工作空間操作列的控制臺。
在左側導航欄選擇
。單擊新建,選擇空白的數據攝入草稿,單擊下一步。
您也可以直接選擇目標數據同步模板(MySQL到Starrocks數據同步、MySQL到Paimon數據同步或MySQL到Hologres數據同步)快速配置YAML作業開發信息。
填寫作業名稱,存儲位置和選擇引擎版本后,單擊確定。
配置YAML作業開發信息。
# 必填 source: # 數據源類型 type: <替換為您源端連接器類型> # 數據源配置信息,配置項詳情請參見對應連接器文檔。 ... # 必填 sink: # 目標類型 type: <替換為您目標端連接器類型> # 數據目標配置信息,配置項詳情請參見對應連接器文檔。 ... # 可選 transform: # 轉換規則,針對flink_test.customers表 - source-table: flink_test.customers # 投影配置,指定要同步的列,并進行數據轉換 projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name # 過濾條件,只同步id大于10的數據 filter: id > 10 # 描述信息,用于解釋轉換規則 description: append calculated columns based on source table # 可選 route: # 路由規則,指定源表和目標表之間的對應關系 - source-table: flink_test.customers sink-table: db.customers_o # 描述信息,用于解釋路由規則 description: sync customers table - source-table: flink_test.customers_suffix sink-table: db.customers_s # 描述信息,用于解釋路由規則 description: sync customers_suffix table #可選 pipeline: # 任務名稱 name: MySQL to Hologres Pipeline
涉及的代碼塊說明詳情如下。
是否必填
代碼模塊
說明
必填
source(數據源端)
數據管道的起點,Flink CDC將從數據源中捕獲變更數據。
sink(數據目標端)
數據管道的終點,Flink CDC將捕獲的數據變更傳輸到這些目標系統中。
可選
pipeline
(數據管道)
定義整個數據通道作業的一些基礎配置,例如pipeline名稱等。
transform(數據轉換)
填寫數據轉化規則。轉換是指對流經Flink管道的數據進行操作的過程。支持ETL處理、Where條件過濾,列裁剪和計算列。
當Flink CDC捕獲的原始變更數據需要經過轉換以適應特定的下游系統時,可以通過transform實現。
route(路由)
如果未配置該模塊,則代表整庫或目標表同步。
在某些情況下,捕獲的變更數據可能需要根據特定規則被發送到不同的目的地。路由機制允許您靈活指定上下游的映射關系,將數據發送到不同的數據目標端。
各模塊語法結構和配置項說明詳情,請參見數據攝入開發參考。
以將MySQL中app_db數據庫下的所有表同步到Hologres的某個數據庫為例,代碼示例如下。
source: type: mysql hostname: <hostname> port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: app_db.\.* server-id: 5400-5404 sink: type: hologres name: Hologres Sink endpoint: <endpoint> dbname: <database-name> username: ${secret_values.holousername} password: ${secret_values.holopassword} pipeline: name: Sync MySQL Database to Hologres
(可選)單擊深度檢查。
您可以進行語法檢測、網絡連通性和訪問權限檢查。
相關文檔
YAML作業開發完成后,您需要部署上線,詳情請參見部署作業。
快速構建一個YAML作業將MySQL庫中的數據同步到StarRock中,詳情請參見數據攝入YAML作業快速入門。