數據傳輸服務DTS(Data Transmission Service)提供的流式數據ETL(Extract Transform Load)數據處理功能,結合DTS的高效流數據復制能力,可以實現流式數據的抽取、數據轉換、加工和數據裝載。本文介紹在DTS鏈路內配置ETL的操作步驟及相關語法信息,幫助您在數據過濾、數據脫敏、記錄數據修改時間和數據變更審計等場景下使用ETL功能。
背景信息
DTS是一個數據遷移和同步服務,通常用于數據搬遷或實時數據傳輸。但有時候用戶有數據處理的需求,希望先對實時數據做一定轉換或過濾,再寫入庫。為了滿足此類需求,DTS提供了流式數據ETL數據處理功能,支持使用DSL(Domain Specific Language)腳本語言靈活地定義數據處理邏輯。DSL的介紹及配置語法,請參見數據處理DSL語法簡介。
支持的數據庫
ETL支持的源庫和目標庫如下表所示。
源庫 | 目標庫 |
SQL Server |
|
MySQL |
|
自建Oracle |
|
PolarDB MySQL版 |
|
PolarDB PostgreSQL版(兼容Oracle) |
|
PolarDB-X 1.0 |
|
PolarDB-X 2.0 |
|
自建Db2 for LUW | MySQL |
自建Db2 for i | MySQL |
PolarDB PostgreSQL |
|
PostgreSQL |
|
TiDB |
|
MongoDB | Lindorm |
在創建同步任務時配置ETL
注意事項
如果您配置的ETL腳本中,包含新增列操作,那么需要您手動在目標端添加列。否則ETL腳本不生效。例如
script:e_set(`new_column`, dt_now())
,此處new_column
需要您手動在目標端添加。DSL腳本配置的字段不能為過濾條件過濾的字段,否則會導致任務異常。
DSL腳本大小寫敏感,庫名、表名、字段名稱需要和源庫保持完全一致。
DSL腳本不支持多個表達式,您可以使用
e_compose
函數將多個表達式組合成一個表達式。源庫所有表的DML變更,經過DSL腳本處理后需具有相同的列信息,否則可能會導致任務失敗。例如,在DSL腳本中使用
e_set
函數新增列時,需要設置源庫的INSERT、UPDATE或DELETE操作,均會在目標表中增加一列數據。更多信息,請參見記錄數據修改時間。
操作步驟
創建同步任務,具體請參見同步方案概覽。
在高級配置階段,將配置ETL功能選擇為是。
在輸入框中,按照數據處理DSL語法填寫數據處理(ETL)語句。
您也可以使用ETL需求框(NL2ETL功能),協助您填寫ETL語句。
說明ETL需求框功能正在灰度內測中,僅部分用戶可以使用。
在ETL需求框中,輸入您的ETL需求。
示例1
我只需要同步id>1000,并且location字段等于'hangzhou'的數據
示例2
對于表A或表B,如果created_at早于'2024-06-30',則刪除該記錄
示例3
對于`user_info`表,把`mobile_phone`字段的第8到第11位替換為星號,同時將`uid`列的前2位也替換為星號
按回車鍵。
系統會根據您輸入的內容,生成相應的ETL語句,并檢查該ETL語句的正確性。
查看生成的ETL語句和提示信息。
說明生成信息請以控制臺為準。
可選:若生成的ETL語句不符合您的需求,可以單擊ETL需求框右側的取消按鈕,調整并重新輸入ETL需求,重新生成ETL語句。
當生成的ETL語句符合您的需求時,單擊ETL需求框右側的采納按鈕。
根據實際情況,完成后續步驟。
在已有同步任務上修改ETL配置
修改已有同步任務的ETL配置包括:
如果已有同步任務未配置ETL,即創建同步任務時配置ETL功能設置為否,支持將否修改為是,并配置DSL腳本。
如果已有同步任務已配置ETL,支持修改已有的DSL腳本或將配置ETL功能修改為否。
重要在修改已有的DSL腳本時,您需要先將同步對象從已選擇對象移動至源庫對象,再重新添加至已選擇對象后,再修改DSL腳本。
注意事項
已有同步任務上修改ETL配置暫不支持對目標端表的表結構進行變更,如果需要變更,您需要在啟動同步任務前在目標端變更表結構。
修改ETL配置可能造成鏈路中斷,請謹慎操作。
ETL配置的修改僅對啟動同步任務后的增量數據生效,對修改ETL配置前的歷史數據不生效。
DSL腳本配置的字段不能為過濾條件過濾的字段,否則會導致任務異常。
DSL腳本大小寫敏感,庫名、表名、字段名稱需要和源庫保持完全一致。
DSL腳本不支持多個表達式,您可以使用
e_compose
函數將多個表達式組合成一個表達式。源庫所有表的DML變更,經過DSL腳本處理后需具有相同的列信息,否則可能會導致任務失敗。例如,在DSL腳本中使用
e_set
函數新增列時,需要設置源庫的INSERT、UPDATE或DELETE操作,均會在目標表中增加一列數據。更多信息,請參見記錄數據修改時間。
操作步驟
在目標同步任務中單擊,選擇修改ETL配置。
在高級配置階段,將配置ETL功能選擇為是。
在輸入框中,按照數據處理DSL語法填寫數據處理(ETL)語句。
您也可以使用ETL需求框(NL2ETL功能),協助您填寫ETL語句。
說明ETL需求框功能正在灰度內測中,僅部分用戶可以使用。
在ETL需求框中,輸入您的ETL需求。
示例1
我只需要同步id>1000,并且location字段等于'hangzhou'的數據
示例2
對于表A或表B,如果created_at早于'2024-06-30',則刪除該記錄
示例3
對于`user_info`表,把`mobile_phone`字段的第8到第11位替換為星號,同時將`uid`列的前2位也替換為星號
按回車鍵。
系統會根據您輸入的內容,生成相應的ETL語句,并檢查該ETL語句的正確性。
查看生成的ETL語句和提示信息。
說明生成信息請以控制臺為準。
可選:若生成的ETL語句不符合您的需求,可以單擊ETL需求框右側的取消按鈕,調整并重新輸入ETL需求,重新生成ETL語句。
當生成的ETL語句符合您的需求時,單擊ETL需求框右側的采納按鈕。
根據實際情況,完成后續步驟。
數據處理DSL語法簡介
典型場景示例
數據過濾
按數值列條件過濾:如果id>10000,則丟棄這條記錄,不同步到目標庫:e_if(op_gt(`id`, 10000), e_drop())。
按字符串匹配條件過濾:如果name包含“hangzhou”,則丟棄這條記錄:e_if(str_contains(`name`, "hangzhou"), e_drop())。
按日期過濾:如果訂單時間早于某個時間,則不同步:e_if(op_lt(`order_timestamp`, "2015-02-23 23:54:55"), e_drop())。
按多條件過濾:
如果id>1000且name包含“hangzhou”,則丟棄這條記錄:e_if(op_and(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())。
如果id>1000或name包含“hangzhou”,則丟棄這條記錄:e_if(op_or(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())。
數據脫敏
遮掩:將phone手機號列的后四位用星號替換,e_set(`phone`, str_mask(`phone`, 7, 10, '*'))。
記錄數據修改時間
對所有表新增列:__OPERATION__的值為INSERT或UPDATE或DELETE時新增1列“dts_sync_time”,值為日志提交時間(__COMMIT_TIMESTAMP__)。
e_if(op_or(op_or( op_eq(__OPERATION__, __OP_INSERT__), op_eq(__OPERATION__, __OP_UPDATE__)), op_eq(__OPERATION__, __OP_DELETE__)), e_set(dts_sync_time, __COMMIT_TIMESTAMP__))
對指定表“dts_test_table”新增列: __OPERATION__的值為INSERT或UPDATE或DELETE時新增1列“dts_sync_time”, 值為日志提交時間(__COMMIT_TIMESTAMP__)。
e_if(op_and( op_eq(__TB__,'dts_test_table'), op_or(op_or( op_eq(__OPERATION__,__OP_INSERT__), op_eq(__OPERATION__,__OP_UPDATE__)), op_eq(__OPERATION__,__OP_DELETE__))), e_set(dts_sync_time,__COMMIT_TIMESTAMP__))
說明上述新增列操作需要您在任務啟動前自行修改目標端表定義,添加“dts_sync_time”列。
數據變更審計
記錄表數據變化的類型和時間:在目標端的“operation_type”列記錄數據變化類型;在目標端的“updated”列記錄數據發生變化的時間。
e_compose( e_switch( op_eq(__OPERATION__,__OP_DELETE__), e_set(operation_type, 'DELETE'), op_eq(__OPERATION__,__OP_UPDATE__), e_set(operation_type, 'UPDATE'), op_eq(__OPERATION__,__OP_INSERT__), e_set(operation_type, 'INSERT')), e_set(updated, __COMMIT_TIMESTAMP__), e_set(__OPERATION__,__OP_INSERT__) )
說明您需要在任務啟動前在目標端表中添加“operation_type”列和“updated”列。
數據處理DSL語法
常量與變量
常量
類型
示例
int
123
float
123.4
string
"hello1_world"
boolean
true或false
datetime
DATETIME('2021-01-01 10:10:01')
變量
變量
含義
數據類型
示例值
__TB__
表名
string
table
__DB__
庫名
string
mydb
__OPERATION__
操作類型
string
__OP_INSERT__,__OP_UPDATE__,__OP_DELETE__
__BEFORE__
UPDATE操作的前鏡像值(修改前的值)
說明DELETE操作只有前鏡像值。
特殊標記,無類型
v(`column_name`,__BEFORE__)
__AFTER__
UPDATE操作的后鏡像值(修改后的值)
說明INSERT操作只有后鏡像值。
特殊標記,無類型
v(`column_name`,__AFTER__)
__COMMIT_TIMESTAMP__
事務提交時間
datetime
'2021-01-01 10:10:01'
`column`
某條數據對應column的值
string
`id`、`name`
表達式函數
數值運算
功能
語法
取值范圍
返回值
示例
加法
op_sum(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_sum(`col1`, 1.0)
減法
op_sub(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_sub(`col1`, 1.0)
乘法
op_mul(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_mul(`col1`, 1.0)
除法
op_div_true(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_div_true(`col1`, 2.0), 若col1=15,則返回7.5。
取模
op_mod(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_mod(`col1`, 10),若col1=23,則返回3
邏輯運算
功能
語法
取值范圍
返回值
示例
是否相等
op_eq(value1, value2)
value1:整數、浮點數、字符串
value2:整數、浮點數、字符串
boolean類型,true或false
op_eq(`col1`, 23)
是否大于
op_gt(value1, value2)
value1:整數、浮點數、字符串
value2:整數、浮點數、字符串
boolean類型,true或false
op_gt(`col1`, 1.0)
是否小于
op_lt(value1, value2)
value1:整數、浮點數、字符串
value2:整數、浮點數、字符串
boolean類型,true或false
op_lt(`col1`, 1.0)
是否大于等于
op_ge(value1, value2)
value1:整數、浮點數、字符串
value2:整數、浮點數、字符串
boolean類型,true或false
op_ge(`col1`, 1.0)
是否小于等于
op_le(value1, value2)
value1:整數、浮點數、字符串
value2:整數、浮點數、字符串
boolean類型,true或false
op_le(`col1`, 1.0)
AND運算
op_and(value1, value2)
value1:boolean類型
value2:boolean類型
boolean類型,true或false
op_and(`is_male`, `is_student`)
OR運算
op_or(value1, value2)
value1:boolean類型
value2:boolean類型
boolean類型,true或false
op_or(`is_male`, `is_student`)
IN運算
op_in(value, json_array)
value: 任意類型
json_array:JSON格式字符串
boolean類型,true或false
op_in(`id`,json_array('["0","1","2","3","4","5","6","7","8"]'))
值是否為空
op_is_null(value)
value: 任意類型
boolean類型,true或false
op_is_null(`name`)
值是否不為空
op_is_not_null(value)
value: 任意類型
boolean類型,true或false
op_is_not_null(`name`)
字符串函數
功能
語法
取值范圍
返回值
示例
字符串拼接
op_add(str_1,str_2,...,str_n)
str_1: 字符串
str_2: 字符串
...
str_n: 字符串
拼接后的字符串
op_add(`col`,'hangzhou','dts')
字符串格式化,字符串拼接
str_format(format, value1, value2, value3, ...)
format:字符串類型,以大括號作為占位符,如 "part1: {}, part2: {}"。
value1:任意
value2:任意
格式化好的字符串
str_format("part1: {}, part2: {}", `col1`, `col2`),若col1="ab", col2="12", 則返回"part1: ab, part2: 12"。
字符串替換
str_replace(original, oldStr, newStr, count)
original:原來的字符串
oldStr:待替換的字符串
newStr:替換后的字符串
count:整數,最多替換次數。若設置為-1,則全部替換。
替換后的字符串
str_replace(`name`, "a", 'b', 1),若name="aba", 則返回"bba" ;str_replace(`name`, "a", 'b', -1);若name="aba", 則返回"bbb"。
所有字符串類型(如varchar、text、char等)的字段值替換
tail_replace_string_field(search, replace, all)
search:待替換的字符串
replace:替換后的字符串
all: 是否替換所有匹配的字符串,目前只支持取值為true。
說明若您無需替換所有匹配的字符串,請使用
str_replace
函數。
替換后的字符串
tail_replace_string_field('\u000f','',true),將所有字符串字段類型值的 "\u000f"替換成空格。
移除字符串首尾的特定字符
str_strip(string_val, charSet)
string_val:原來的字符串
char_set:待移除的字符集合
移除首尾字符后的字符串
str_strip(`name`, 'ab'),若name=axbzb, 則返回xbz。
字符串轉小寫
str_lower(value)
value:字符串列或字符串常量
小寫字符串
str_lower(`str_col`)
字符串轉大寫
str_upper(value)
value:字符串列或字符串常量
大寫字符串
str_upper(`str_col`)
字符串轉數字
cast_string_to_long(value)
value:字符串
整數
cast_string_to_long(`col`)
數字轉字符串
cast_long_to_string(value)
value:整數
字符串
cast_long_to_string(`col`)
字符串統計
str_count(str,pattern)
str:字符串列或字符串常量
pattern:要查找的子串
子串出現的次數
str_count(`str_col`, 'abc'), 若str_col="zabcyabcz",則返回2。
字符串查找
str_find(str, pattern)
str:字符串列或字符串常量
pattern:要查找的子串
子串首次匹配的位置,沒有則返回`-1`
str_find(`str_col`, 'abc'), 若`str_col="xabcy"`,則返回`1`。
判斷是否全是字母組成的字符串
str_isalpha(str)
str:字符串列或字符串常量
true或false
str_isalpha(`str_col`)
判斷是否全是數字組成的字符串
str_isdigit(str)
str:字符串列或字符串常量
true或false
str_isdigit(`str_col`)
正則匹配
regex_match(str,regex)
str:字符串列或字符串常量
regex: 正則表達式字符串列或字符串常量
true或者false
regex_match(__TB__,'user_\\d+')
使用指定字符遮掩字符串的一部分,可用于數據脫敏,例如把手機號的后四位替換為星號
str_mask(str, start, end, maskStr)
str:字符串列或字符串常量
start:整數,遮掩的起始位置,最小值為0。
end:整數,遮掩的結束位置,最大值為字符串長度減一。
maskStr:字符串,長度為1的字符串,例如 '#'。
遮掩掉start至end后的字符串
str_mask(`phone`, 7, 10, '#')
截取字符串cond之后的部分
substring_after(str, cond)
str: 原來的字符串
cond: 字符串
字符串
說明返回值不含字符串cond。
substring_after(`col`, 'abc')
截取字符串cond之前的部分
substring_before(str, cond)
str: 原來的字符串
cond: 字符串
字符串
說明返回值不含字符串cond。
substring_before(`col`, 'efg')
截取字符串cond1和cond2之間的部分
substring_between(str, cond1, cond2)
str: 原來的字符串
cond1: 字符串
cond2: 字符串
字符串
說明返回值不含字符串cond1和cond2。
substring_between(`col`, 'abc','efg')
判斷是否為字符串類型
is_string_value(value)
value:字符串或者列名
boolean類型,true或false
is_string_value(`col1`)
字符串類型字段內容替換; 逆序從尾部開始
tail_replace_string_field(search, replace, all)
search:將被替換的字符串
replace:用于替換的字符串
all: 是否替換所有,true或者false
替換后的字符串
將所有字符串字段類型值的 "\u000f"替換成空格
tail_replace_string_field('\u000f','',true)
獲取MongoDB中字段(Field)的值
bson_value("field1","field2","field3",...)
field1:一級字段名稱。
field2:二級字段名稱。
文檔(Document)中相應字段的值
e_set(`user_id`, bson_value("id"))
e_set(`user_name`, bson_value("person","name"))
時間函數
功能
語法
取值范圍
返回值
示例
當前系統時間
dt_now()
無
DATETIME,精確到秒
dts_now()
dt_now_millis()
無
DATETIME,精確到毫秒
dt_now_millis()
UTC時間戳(秒)轉DATETIME
dt_fromtimestamp(value,[timezone])
value:整數
timezone:時區,可選參數
DATETIME,精確到秒
dt_fromtimestamp(1626837629)
dt_fromtimestamp(1626837629,'GMT+08')
UTC時間戳(毫秒)轉DATETIME
dt_fromtimestamp_millis(value,[timezone])
value:整數
timezone:時區,可選參數
DATETIME,精確到毫秒
dt_fromtimestamp_millis(1626837629123);
dt_fromtimestamp_millis(1626837629123,'GMT+08')
DATETIME轉UTC時間戳(秒)
dt_parsetimestamp(value,[timezone])
value: DATETIME
timezone:時區,可選參數
整數
dt_parsetimestamp(`datetime_col`)
dt_parsetimestamp(`datetime_col`,'GMT+08')
DATETIME轉UTC時間戳(毫秒)
dt_parsetimestamp_millis(value,[timezone])
value: DATETIME
timezone:時區,可選參數
整數
dt_parsetimestamp_millis(`datetime_col`)
dt_parsetimestamp_millis(`datetime_col`,'GMT+08')
DATETIME轉字符串
dt_str(value, format)
value:DATETIME
format:字符串, yyyy-MM-dd HH:mm:ss 格式表示
字符串
dt_str(`col1`, 'yyyy-MM-dd HH:mm:ss')
字符串轉DATETIME
dt_strptime(value,format)
value:字符串
format:字符串, yyyy-MM-dd HH:mm:ss 格式表示
DATETIME
dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss')
修改時間,對年、月、日、時、分或秒中的一個或多個數值進行增加或減少
dt_add(value, [years=intVal],
[months=intVal],
[days=intVal],
[hours=intVal],
[minutes=intVal]
)
value: DATETIME
intVal: 整數
說明負號(-)表示減。
DATETIME
dt_add(datetime_col,years=-1)
dt_add(datetime_col,years=1,months=1)
條件表達式
功能
語法
取值范圍
返回值
示例
類似于C語言中的三目運算符(
? :
),返回符合條件的值(cond ? val_1 : val_2)
cond:bool類型的字段或表達式
val_1:返回值1
val_2:返回值2
說明val_1和val_2的類型需相同。
當cond為true時返回val_1否則返回val_2
(id>1000? 1 : 0)
全局函數
流程控制函數
功能
語法
參數說明
示例
if語句
e_if(bool_expr, func_invoke)
bool_expr:bool常量或函數調用。常量:true或false。函數調用:op_gt(`id`, 10)。
func_invoke:函數調用。e_drop,e_keep,e_set,e_if,e_compose
e_if(op_gt(`id`, 10), e_drop()), 如果ID大于10,則丟棄這條記錄。
if else語句
e_if_else(bool_expr, func_invoke1, func_invoke2)
bool_expr:bool常量或函數調用。常量:true或false。函數調用:op_gt(`id`, 10)。
func_invoke1:函數調用。條件為true時執行。
func_invoke2:函數調用。條件為false時執行。
e_if_else(op_gt(`id`, 10), e_set(`tag`, 'large'), e_set(`tag`, 'small')),如果ID大于10,則設置tag列為"large", 否則設置為"small"。
類switch語句,進行多次條件判斷,第一次滿足條件時執行對應操作,如無匹配則執行默認操作。
s_switch(condition1, func1, condition2, func2, ..., default = default_func)
condition1:bool常量或函數調用。常量:true或false。函數調用:op_gt(`id`, 10)。
func_invoke:函數調用。檢查condition1,若為true則執行此函數,并退出整個switch,若為false則繼續檢查下一個條件。
default_func:函數調用。當前面的所有condition都為false時,執行此默認函數。
e_switch(op_gt(`id`, 100), e_set(`str_col`, '>100'), op_gt(`id`, 90), e_set(`str_col`, '>90'), default=e_set(`str_col`, '<=90'))。
組合多個操作
e_compose(func1, func2, func3, ...)
func1:函數調用。可以為e_set, e_drop, e_if。
func2:函數調用??梢詾閑_set, e_drop, e_if。
e_compose(e_set(`str_col`, 'test'), e_set(`dt_col`, dt_now())), 設置str_col列的值為test,并設置dt_col列的值為當前時間。
數據操作函數
功能
語法
參數說明
示例
丟棄此條數據,不同步
e_drop()
無
e_if(op_gt(`id`, 10), e_drop()),丟棄ID大于10的記錄。
保留此條數據,同步到目標端
e_keep(condition)
condition:boolean類型表達式
e_keep(op_gt(id, 1)) ,僅同步ID大于1的數據。
設置列值
e_set(`col`, val, NEW)
col:列名
val:常量或函數調用。類型需要和col的類型匹配
NEW:將col列的數據類型轉換為val的數據類型,為可選字段
重要若不傳入NEW,則也不能傳入NEW前面的逗號(,)。同時需要注意數據類型的兼容性,否則會導致任務報錯。
e_set(`dt_col`, dt_now()),設置dt_col為當前時間。
e_set(`col1`, `col2` + 1),設置col1為col2+1。
e_set(`col1`, 1, NEW),將col1列轉換為數字類型,并將值設置為1。
MongoDB保留字段、丟棄字段、字段名映射功能
e_expand_bson_value('*', 'fieldA',{"fieldB":"fieldC"})
*:需要保留的字段名稱,*表示所有字段。
fieldA:需要丟棄的字段名稱。
{"fieldB":"fieldC"}:字段名映射,fieldB表示源端字段名稱,fieldC表示目標端字段名稱。
說明字段名映射是一個可選表達式。
e_expand_bson_value("*", "_id,name"),將除_id和name兩個字段以外的其他字段寫入目標端。