任務編排中的跨庫Spark SQL節點,主要針對各類跨庫數據同步和數據加工場景,您可以通過編寫Spark SQL,完成各種復雜的數據同步或數據加工的任務開發。
前提條件
支持的數據庫類型:
MySQL:RDS MySQL、PolarDB MySQL版、MyBase MySQL、PolarDB分布式版、AnalyticDB for MySQL、其他來源MySQL
SQL Server:RDS SQL Server、MyBase SQL Server、其他來源SQL Server
PostgreSQL:RDS PostgreSQL、PolarDB PostgreSQL版、MyBase PostgreSQL、AnalyticDB for PostgreSQL、其他來源PostgreSQL
Oracle
DB2
MaxCompute
Hologres
OSS
若需要添加OSS引用,建議您提前獲取OSS文件路徑或錄入OSS。詳情請參見錄入對象存儲OSS。
使用限制
Spark SQL任務節點基于Spark計算引擎運行,單個任務單次處理數據時不要超過200萬條,否則可能影響任務運行效率。
因計算資源有限,任務運行高峰期無法保證計算時效。
系統在計算數據量過大且缺失主鍵的表時,會導致內存溢出或內存耗盡(OOM)。
周期調度節點最近一次運行成功后,若連續運行失敗10次及以上,離線集成任務直接執行失敗,且不會再提交Spark任務。此時,您需要手動運行成功該任務節點。
應用場景
跨庫Spark SQL任務主要應用于跨庫數據同步和跨庫數據加工:
跨庫數據同步:
在線庫同步到數據倉庫,用于數據加工。在線業務會產生大量的數據,當需要對這些數據進行加工分析時,一般需要將在線業務的數據同步到專門用于數據加工和分析的數據倉庫,再進行數據分析。
數據倉庫的數據回流到在線庫,用于數據查詢。在數據倉庫中對數據進行加工分析后,往往需要將數據同步回在線庫中,以便在線應用提供相關的數據分析和統計服務。
示例:某消費服務平臺使用的是MySQL數據庫,需要在數據倉庫AnalyticDB for PostgreSQL中對消費數據進行消費金額、消費筆數等的統計分析,將統計分析后的數據回傳到消費服務平臺上,供用戶進行在線查詢。
將MySQL中的增量消費數據同步到AnalyticDB for PostgreSQL中,Spark SQL語句如下:
INSERT INTO adb_dw.orders SELECT * FROM mysql_db.orders WHERE dt>${bizdate} AND dt<=${tomorrow}
將AnalyticDB for PostgreSQL中加工后的數據同步到MySQL中,Spark SQL語句如下:
INSERT INTO mysql_db.orders_month SELECT * FROM adb_dw.orders_month WHERE dt=${bizdate}
跨庫數據加工:
跨多個庫的數據寫入到在線庫中,實現在線應用中直接查詢數據的功能。
示例:某電商企業的交易數據保存在MySQL在線庫中,而員工數據保存在HR系統中,系統使用的數據庫為Oracle,當企業需要按部門統計銷售額時,需要對部門、員工和交易數據進行關聯查詢。以下Spark SQL語句可以實現將mysql_db在線庫的交易流水表sales與oracle_db庫的用戶表users進行關聯,并將關聯后的數據按部門名稱分組,統計交易筆數和金額,加工后的數據寫入到mysql_db在線庫中。
INSERT INTO mysql_db.dept_sales SELECT dept_name,trade_date, COUNT(*) cnt, SUM(amt) amt FROM mysql_db.sales t1 JOIN oracle_db.users t2 ON t1.emp_id=t2.id WHERE t1.trade_date=${bizdate} GROUP BY t2.dept_name
功能特性
跨庫數據處理:支持使用SQL語句操作不同數據庫中的數據,數據生態較全面,可通過擴展支持各種數據源。
大數據量處理:支持快速處理較大規模的數據(十萬條以上數據)。
Spark SQL語法:基于Spark 3.1.2版本部署,提供該版本所有語法特性和原生函數。原生函數包括聚合函數、窗口函數、數組函數、Map函數、日期和時間處理函數、JSON處理函數等。
兼容標準SQL:通過標準的SQL語句,也可實現跨庫數據同步和數據加工。
Serverless:Spark SQL任務是基于Spark引擎進行數據處理的無服務器化計算服務,用戶無需預購計算資源和維護資源,沒有運維和升級成本。
支持的SQL語句包括:CREATE TABLE, CREATE SELECT, DROP TABLE, INSERT, INSERT SELECT, ALTER TABLE, TRUNCATE, SET, ANALYZE, MSCK REPAIR。
說明TRUNCATE語句僅支持OSS類型的表,其他類型執行無效果。
不支持的SQL語句包括:SELECT、DELETE、UPDATE、CREATE DATABASE、DROP DATABASE。
支持OSS文件存儲數據源,包括CSV、JSON、PARQUET、ORC多種文件格式。
操作步驟
- 登錄數據管理DMS 5.0。
在頂部菜單欄中,選擇 。
說明若您使用的是極簡模式的控制臺,請單擊控制臺左上角的圖標,選擇
。單擊目標任務流名稱,進入任務流詳情頁面。
說明如果您需要新增任務流,請參見新增任務流。
在畫布左側任務類型列表中,拖拽跨庫Spark SQL節點到畫布空白區域。
雙擊跨庫Spark SQL。
可選:在跨庫Spark SQL配置頁面,單擊變量設置,配置SQL語句中需要引用的變量。您可以單擊變量設置區域右上角的,查看配置變量的提示信息。
可選:在OSS引用區域,單擊添加OSS引用,添加SQL語句中需要引用的OSS,配置完成后單擊保存。
配置項
是否必填
說明
數據庫
是
在下拉框中,搜索并選擇目標數據庫。
重要非安全協同模式的數據庫,如果數據庫的賬號發生變更(如密碼、權限變更),可能導致任務運行失敗,需要重新登錄數據庫并保存節點配置。
OSS路徑
是
指定數據保存在OSS Bucket上的路徑。
說明如果路徑不存在,將自動創建。
路徑中支持使用變量,例如/path/${foldername}。
Spark SQL引用別名
是
輸入該OSS在Spark SQL語句中的別名,默認值為oss。
說明引用別名由數字、大小寫字母及下劃線(_)組成,且長度不超過32個字符。
可選:在數據庫引用區域,單擊添加數據庫引用,添加SQL語句中需要引用的數據庫。配置完成后單擊保存。
配置項
是否必填
說明
數據庫類型
是
選擇目標數據庫的類型。
說明您可以進入SQL窗口,在頁面左側的實例列表中,將鼠標移動到目標數據庫所屬實例上,查看數據庫類型。
數據庫
是
在數據庫下拉框中,搜索并選擇目標數據庫。
重要非安全協同模式的數據庫,如果數據庫的賬號發生變更(如密碼、權限變更),可能導致任務運行失敗,需要重新登錄數據庫并保存節點配置。
Spark SQL引用別名
是
輸入該數據庫在Spark SQL語句中的別名,默認值為數據庫名。
說明數據庫別名由數字、大小寫字母及下劃線(_)組成,且長度不超過32個字符。
如需添加多個目標數據庫,單擊數據庫右側的。
在SQL區域,編寫Spark SQL語句,并進行試運行。
編寫Spark SQL語句,并單擊保存。
例如,從qntext數據庫的ex_customer表讀取數據,并寫入到qn_rds數據庫的test_table表中。qntext數據庫的別名設置為qn,qn_rds數據庫的別名設置為rds。輸入的SQL語句如下:
INSERT INTO rds.test_table SELECT * FROM qn.ex_customer;
說明SQL語句中可以引用變量,引用變量格式為${var_name}。
您可以在編寫SQL代碼的過程中,隨時單擊SQL預覽,預覽SQL代碼。
單擊有效性檢查,可以檢查SQL代碼的有效性。
單擊試運行。
如果執行日志的最后一行出現
status SUCCEEDED
,表明任務試運行成功。如果執行日志的最后一行出現
status FAILED
,表明任務試運行失敗,在執行日志中查看執行失敗的節點和原因,修改配置后重新嘗試。