在使用數據傳輸服務DTS(Data Transmission Service)創建數據同步或遷移任務時,DTS支持為目標表添加額外的列并進行賦值。數據成功寫入目標表后,您可以通過篩選附加列的賦值,對傳輸至目標端的數據進行元數據管理、排序、去重等操作,從而更好地管理和處理傳輸至目標端的數據。
注意事項
支持新增附加列的同步或遷移實例如下:
目標庫數據庫類型為DataHub、Lindorm、Kafka或ClickHouse。
源庫數據庫類型為DB2 LUW或DB2 iSeries(AS/400),且目標庫數據庫類型為MySQL或PolarDB for MySQL。
源庫數據庫類型為MySQL、Mariadb或PolarDB for MySQL,且目標庫數據庫類型為MySQL、Mariadb或PolarDB for MySQL。
源庫數據庫類型為MySQL,且目標庫數據庫類型為Tair/Redis、AnalyticDB PostgreSQL或AnalyticDB MySQL 3.0。
源庫數據庫類型為PolarDB for PostgreSQL,且目標庫數據庫類型為AnalyticDB PostgreSQL。
若為同步實例,則同步類型需勾選庫表結構同步;若為遷移實例,則遷移類型需勾選庫表結構遷移。
在修改數據同步的附加列規則前,您需要評估附加列和目標表中已有的列是否會出現名稱沖突。
若同步任務的源數據庫為MongoDB,則目標數據庫的集合不能有名稱為_id和_value的字段,否則會導致同步失敗。
若您在已選擇對象中右鍵單擊的對象是數據庫,則DTS將會為目標端對應數據庫中的所有表批量添加設置的附加列。
操作步驟
本操作以DTS同步實例為例,介紹新增附加列的步驟。
進入同步任務的列表頁面。
登錄DMS數據管理服務。
在頂部菜單欄中,單擊集成與開發。
在左側導航欄,選擇 。
說明實際操作可能會因DMS的模式和布局不同,而有所差異。更多信息,請參見極簡模式和自定義DMS界面布局與樣式。
您也可以登錄新版DTS同步任務的列表頁面。
在同步任務右側,選擇同步實例所屬地域。
說明新版DTS同步任務列表頁面,需要在頁面左上角選擇同步實例所屬地域。
單擊創建任務,根據業務需求配置源庫及目標庫信息。
說明若需要給運行中的同步實例新增附加列,請單擊修改同步對象。
根據提示,進入對象配置階段并完成配置。
在此配置階段,您可以新增附加列。
在同步類型中,勾選庫表結構同步。
在源庫對象中以庫或表粒度選擇待同步的對象,然后單擊將其移動至已選擇對象框。
在已選擇對象中,右鍵單擊待同步的庫或表。
在彈出的對話框的附加列區域,單擊+ 新增列按鈕。
填寫附加列的列名稱、類型和賦值等。
說明賦值可以單擊文本框右側的自定義附加列值的表達式,詳情請參見賦值配置。
單擊確定。
根據提示,完成后續的數據同步任務配置。
說明若同步任務配置了ETL功能,待同步的數據先使用附加列的規則計算出一個值之后,再應用鏈路內的ETL腳本計算得到最終值,然后同步到目標數據庫。
賦值配置
附加列賦值的構成元素:常量、變量、操作符、表達式函數。
兼容ETL的數據處理DSL語法。
表達式中列名的符號為quote符號(``),不是單引號('')。
常量
類型
示例
int
123
float
123.4
string
"hello1_world"
boolean
true或false
datetime
DATETIME('2021-01-01 10:10:01')
變量
變量
含義
數據類型
示例值
__TB__
數據庫中表的名稱。
string
table
__DB__
數據庫的庫名稱。
string
mydb
__OPERATION__
操作的類型。
string
__OP_INSERT__,__OP_UPDATE__,__OP_DELETE__
__COMMIT_TIMESTAMP__
事務提交的時間。
datetime
'2021-01-01 10:10:01'
`column`
某條數據對應column的值。
string
`id`、`name`
__SCN__
系統變化編號SCN(System Change Number),記錄數據庫提交事務的版本和時間,具有唯一性。
string
22509****
__ROW_ID__
某條數據的地址ID,定位該數據的位置,具有唯一性。
string
AAAgWHAAKAAJgX****
表達式函數
數值運算
功能
語法
取值范圍
返回值
示例
加法(+)
op_sum(value1, value2)
value1+value2
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_sum(`col1`, 1.0)
`col1`+1.0
減法(-)
op_sub(value1, value2)
value1-value2
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_sub(`col1`, 1.0)
`col1`-1.0
乘法(*)
op_mul(value1, value2)
value1*value2
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_mul(`col1`, 1.0)
`col1`*1.0
除法(/)
op_div_true(value1, value2)
value1/value2
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_div_true(`col1`, 2.0), 若col1=15,則返回7.5。
`col1`/1.0
取模
op_mod(value1, value2)
value1:整數或浮點數
value2:整數或浮點數
若參數均為整數,則返回整數,否則返回浮點數。
op_mod(`col1`, 10),若col1=23,則返回3
邏輯運算
功能
語法
取值范圍
返回值
示例
是否相等
op_eq(value1, value2)
value1:整數、浮點數、字符串
value2:整數、浮點數、字符串
boolean類型,true或false
op_eq(`col1`, 23)
是否大于
op_gt(value1, value2)
value1:整數、浮點數、字符串
value2:整數、浮點數、字符串
boolean類型,true或false
op_gt(`col1`, 1.0)
是否小于
op_lt(value1, value2)
value1:整數、浮點數、字符串
value2:整數、浮點數、字符串
boolean類型,true或false
op_lt(`col1`, 1.0)
是否大于等于
op_ge(value1, value2)
value1:整數、浮點數、字符串
value2:整數、浮點數、字符串
boolean類型,true或false
op_ge(`col1`, 1.0)
是否小于等于
op_le(value1, value2)
value1:整數、浮點數、字符串
value2:整數、浮點數、字符串
boolean類型,true或false
op_le(`col1`, 1.0)
AND運算
op_and(value1, value2)
value1:boolean類型
value2:boolean類型
boolean類型,true或false
op_and(`is_male`, `is_student`)
OR運算
op_or(value1, value2)
value1:boolean類型
value2:boolean類型
boolean類型,true或false
op_or(`is_male`, `is_student`)
IN運算
op_in(value, json_array)
value: 任意類型
json_array:JSON格式字符串
boolean類型,true或false
op_in(`id`,json_array('["0","1","2","3","4","5","6","7","8"]'))
值是否為空
op_is_null(value)
value: 任意類型
boolean類型,true或false
op_is_null(`name`)
值是否不為空
op_is_not_null(value)
value: 任意類型
boolean類型,true或false
op_is_not_null(`name`)
字符串函數
功能
語法
取值范圍
返回值
示例
字符串拼接
op_add(str_1,str_2,...,str_n)
str_1: 字符串
str_2: 字符串
...
str_n: 字符串
拼接后的字符串
op_add(`col`,'hangzhou','dts')
字符串格式化,字符串拼接
str_format(format, value1, value2, value3, ...)
format:字符串類型,以大括號作為占位符,如 "part1: {}, part2: {}"。
value1:任意
value2:任意
格式化好的字符串
str_format("part1: {}, part2: {}", `col1`, `col2`),若col1="ab", col2="12", 則返回"part1: ab, part2: 12"。
字符串替換
str_replace(original, oldStr, newStr, count)
original:原來的字符串
oldStr:待替換的字符串
newStr:替換后的字符串
count:整數,最多替換次數。若設置為-1,則全部替換。
替換后的字符串
str_replace(`name`, "a", 'b', 1),若name="aba", 則返回"bba" ;str_replace(`name`, "a", 'b', -1);若name="aba", 則返回"bbb"。
所有字符串類型(如varchar、text、char等)的字段值替換
tail_replace_string_field(search, replace, all)
search:待替換的字符串
replace:替換后的字符串
all: 是否替換所有匹配的字符串,目前只支持取值為true。
說明若您無需替換所有匹配的字符串,請使用
str_replace
函數。
替換后的字符串
tail_replace_string_field('\u000f','',true),將所有字符串字段類型值的 "\u000f"替換成空格。
移除字符串首尾的特定字符
str_strip(string_val, charSet)
string_val:原來的字符串
char_set:待移除的字符集合
移除首尾字符后的字符串
str_strip(`name`, 'ab'),若name=axbzb, 則返回xbz。
字符串轉小寫
str_lower(value)
value:字符串列或字符串常量
小寫字符串
str_lower(`str_col`)
字符串轉大寫
str_upper(value)
value:字符串列或字符串常量
大寫字符串
str_upper(`str_col`)
字符串轉數字
cast_string_to_long(value)
value:字符串
整數
cast_string_to_long(`col`)
數字轉字符串
cast_long_to_string(value)
value:整數
字符串
cast_long_to_string(`col`)
字符串統計
str_count(str,pattern)
str:字符串列或字符串常量
pattern:要查找的子串
子串出現的次數
str_count(`str_col`, 'abc'), 若str_col="zabcyabcz",則返回2。
字符串查找
str_find(str, pattern)
str:字符串列或字符串常量
pattern:要查找的子串
子串首次匹配的位置,沒有則返回`-1`
str_find(`str_col`, 'abc'), 若`str_col="xabcy"`,則返回`1`。
判斷是否全是字母組成的字符串
str_isalpha(str)
str:字符串列或字符串常量
true或false
str_isalpha(`str_col`)
判斷是否全是數字組成的字符串
str_isdigit(str)
str:字符串列或字符串常量
true或false
str_isdigit(`str_col`)
正則匹配
regex_match(str,regex)
str:字符串列或字符串常量
regex: 正則表達式字符串列或字符串常量
true或者false
regex_match(__TB__,'user_\\d+')
使用指定字符遮掩字符串的一部分,可用于數據脫敏,例如把手機號的后四位替換為星號
str_mask(str, start, end, maskStr)
str:字符串列或字符串常量
start:整數,遮掩的起始位置,最小值為0。
end:整數,遮掩的結束位置,最大值為字符串長度減一。
maskStr:字符串,長度為1的字符串,例如 '#'。
遮掩掉start至end后的字符串
str_mask(`phone`, 7, 10, '#')
截取字符串cond之后的部分
substring_after(str, cond)
str: 原來的字符串
cond: 字符串
字符串
說明返回值不含字符串cond。
substring_after(`col`, 'abc')
截取字符串cond之前的部分
substring_before(str, cond)
str: 原來的字符串
cond: 字符串
字符串
說明返回值不含字符串cond。
substring_before(`col`, 'efg')
截取字符串cond1和cond2之間的部分
substring_between(str, cond1, cond2)
str: 原來的字符串
cond1: 字符串
cond2: 字符串
字符串
說明返回值不含字符串cond1和cond2。
substring_between(`col`, 'abc','efg')
判斷是否為字符串類型
is_string_value(value)
value:字符串或者列名
boolean類型,true或false
is_string_value(`col1`)
字符串類型字段內容替換; 逆序從尾部開始
tail_replace_string_field(search, replace, all)
search:將被替換的字符串
replace:用于替換的字符串
all: 是否替換所有,true或者false
替換后的字符串
將所有字符串字段類型值的 "\u000f"替換成空格
tail_replace_string_field('\u000f','',true)
獲取MongoDB中字段(Field)的值
bson_value("field1","field2","field3",...)
field1:一級字段名稱。
field2:二級字段名稱。
文檔(Document)中相應字段的值
e_set(`user_id`, bson_value("id"))
e_set(`user_name`, bson_value("person","name"))
條件表達式
功能
語法
取值范圍
返回值
示例
類似于C語言中的三目運算符(
? :
),返回符合條件的值(cond ? val_1 : val_2)
cond:bool類型的字段或表達式
val_1:返回值1
val_2:返回值2
說明val_1和val_2的類型需相同。
當cond為true時返回val_1否則返回val_2
(id>1000? 1 : 0)
時間函數
功能
語法
取值范圍
返回值
示例
當前系統時間
dt_now()
無
DATETIME,精確到秒
dts_now()
dt_now_millis()
無
DATETIME,精確到毫秒
dt_now_millis()
UTC時間戳(秒)轉DATETIME
dt_fromtimestamp(value,[timezone])
value:整數
timezone:時區,可選參數
DATETIME,精確到秒
dt_fromtimestamp(1626837629)
dt_fromtimestamp(1626837629,'GMT+08')
UTC時間戳(毫秒)轉DATETIME
dt_fromtimestamp_millis(value,[timezone])
value:整數
timezone:時區,可選參數
DATETIME,精確到毫秒
dt_fromtimestamp_millis(1626837629123);
dt_fromtimestamp_millis(1626837629123,'GMT+08')
DATETIME轉UTC時間戳(秒)
dt_parsetimestamp(value,[timezone])
value: DATETIME
timezone:時區,可選參數
整數
dt_parsetimestamp(`datetime_col`)
dt_parsetimestamp(`datetime_col`,'GMT+08')
DATETIME轉UTC時間戳(毫秒)
dt_parsetimestamp_millis(value,[timezone])
value: DATETIME
timezone:時區,可選參數
整數
dt_parsetimestamp_millis(`datetime_col`)
dt_parsetimestamp_millis(`datetime_col`,'GMT+08')
DATETIME轉字符串
dt_str(value, format)
value:DATETIME
format:字符串, yyyy-MM-dd HH:mm:ss 格式表示
字符串
dt_str(`col1`, 'yyyy-MM-dd HH:mm:ss')
字符串轉DATETIME
dt_strptime(value,format)
value:字符串
format:字符串, yyyy-MM-dd HH:mm:ss 格式表示
DATETIME
dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss')
修改時間,對年、月、日、時、分或秒中的一個或多個數值進行增加或減少
dt_add(value, [years=intVal],
[months=intVal],
[days=intVal],
[hours=intVal],
[minutes=intVal]
)
value: DATETIME
intVal: 整數
說明負號(-)表示減。
DATETIME
dt_add(datetime_col,years=-1)
dt_add(datetime_col,years=1,months=1)