使用函數(shù)計算實現(xiàn)消息數(shù)據(jù)清洗
消息數(shù)據(jù)清洗功能提供常見的消息處理模板,支持對不同來源、不同格式的數(shù)據(jù)進(jìn)行分割、動態(tài)路由和消息富化等。您可以直接利用函數(shù)計算提供的模板處理消息,也可以根據(jù)業(yè)務(wù)情況在模板基礎(chǔ)上修改代碼。本文以云消息隊列 RocketMQ 版數(shù)據(jù)清洗為例,介紹消息處理模板的類型和使用方式。
功能介紹
消息數(shù)據(jù)清洗任務(wù)提供基本的算子能力,底層邏輯使用函數(shù)計算。云消息隊列 RocketMQ 版消息數(shù)據(jù)清洗任務(wù)創(chuàng)建完成后,您可以登錄函數(shù)計算,進(jìn)行代碼自定義及相應(yīng)函數(shù)配置的修改。
算子 | 算子能力說明 |
消息過濾 | 按照正則表達(dá)式匹配消息內(nèi)容,將匹配成功的消息發(fā)送至目標(biāo)。更多信息,請參見事件模式。 |
消息轉(zhuǎn)換 | 根據(jù)字符串匹配,進(jìn)行消息內(nèi)容替換,例如字符大小寫轉(zhuǎn)換。將轉(zhuǎn)換后的消息發(fā)送至目標(biāo)。更多信息,請參見事件內(nèi)容轉(zhuǎn)換。 |
根據(jù)正則表達(dá)式對消息內(nèi)容進(jìn)行分割,將分割后的消息逐條發(fā)送至目標(biāo)。 | |
根據(jù)正則表達(dá)式匹配消息內(nèi)容,將匹配成功的消息路由至對應(yīng)目標(biāo),將匹配不成功的消息路由至默認(rèn)目標(biāo)。 | |
根據(jù)富化源對消息內(nèi)容進(jìn)行富化。例如,消息原始內(nèi)容包含AccountID,處理時根據(jù)AccountID查詢數(shù)據(jù)庫,獲得客戶地域后填至源消息體中,并發(fā)送至目標(biāo)服務(wù)。 | |
根據(jù)正則表達(dá)式對消息內(nèi)容進(jìn)行映射處理。例如,屏蔽消息中敏感字段或?qū)⑾⒋笮】s減至最小標(biāo)準(zhǔn)。 |
場景示例
內(nèi)容分割
例如,以下是一份學(xué)生名單。
message:
[張三,男,4班|李四,女,3班|王五,男,4班]
需要將消息拆分為單個學(xué)生的信息,然后分三條消息推送至各目標(biāo)服務(wù)。如下所示。
message:
[張三,男,4班]
message:
[李四,女,3班]
message:
[王五,男,4班]
實現(xiàn)效果如下圖。
動態(tài)路由
例如,以下是一份牙膏信息清單。
message:
[BrandA, toothpaste, $12.98, 100g
BrandB, toothpaste, $7.99, 80g
BrandC, toothpaste, $1.99, 100g]
需要按照自定義動態(tài)規(guī)則,將列表路由至目標(biāo)Topic。規(guī)則描述如下所示。
如果消息以BrandA開頭,發(fā)送至BrandA-item-topic和BrandA-discount-topic這兩個topic。
如果消息以BrandB開頭,發(fā)送至BrandB-item-topic和BrandB-discount-topic這兩個topic。
其余消息發(fā)送至Unknown-brand-topic。
規(guī)則的JSON描述如下。
{
"defaultTopic": "Unknown-brand-topic",
"rules": [
{
"regex": "^BrandA",
"targetTopics": [
"BrandA-item-topic",
"BrandA-discount-topic"
]
},
{
"regex": "^BrandB",
"targetTopics": [
"BrandB-item-topic",
"BrandB-discount-topic"
]
}
]
}
實現(xiàn)效果如下圖。
內(nèi)容富化
本文以一個IP地址段處理的場景富化為例。假設(shè)某服務(wù)的訪問日志如下所示。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX"
}
需要統(tǒng)計IP地址的來源,并且映射關(guān)系存儲于數(shù)據(jù)庫MySQL。
CREATE TABLE `tb_ip` (
-> `IP` VARCHAR(256) NOT NULL,
-> `Region` VARCHAR(256) NOT NULL,
-> `ISP` VARCHAR(256) NOT NULL,
-> PRIMARY KEY (`IP`)
-> );
處理后的消息結(jié)果如下所示。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX",
"region": "beijing"
}
實現(xiàn)效果如下圖。
內(nèi)容映射
例如,以下是某公司員工登記信息,涉及了員工工號、電話號碼等隱私內(nèi)容。
張三,工號1,131 1111 1111
李四,工號2,132 2222 2222
王五,工號3,133 3333 3333
需要將以上消息中員工隱私信息進(jìn)行屏蔽,然后推送至目標(biāo)服務(wù)。如下所示。
張*,工號*,*** **** ****
李*,工號*,*** **** ****
王*,工號*,*** **** ****
實現(xiàn)效果如下圖。
操作步驟
1.創(chuàng)建數(shù)據(jù)清洗任務(wù)
登錄事件總線 EventBridge控制臺,左側(cè)導(dǎo)航欄,選擇事件流,然后單擊創(chuàng)建事件流。
在創(chuàng)建事件流頁面的Source(源)、Filtering(過濾)、Transform(轉(zhuǎn)換)和Sink(目標(biāo))配置向?qū)Х謩e設(shè)置事件源、過濾規(guī)則、數(shù)據(jù)清洗模板和事件目標(biāo)。
①Source(源)和④Sink(目標(biāo))
選擇云消息隊列 RocketMQ 版的不同實例。
②Filtering(過濾)
可選配置,如果不設(shè)置表示匹配全部事件,更多匹配規(guī)則,請參見事件模式。
③Transform(轉(zhuǎn)換)
選擇一個函數(shù)計算提供的模板,包括內(nèi)容分割、內(nèi)容映射、內(nèi)容富化和動態(tài)路由。您可以根據(jù)需求選擇以上模板,模板中提供了基礎(chǔ)的數(shù)據(jù)處理邏輯,可以直接使用也可以自行調(diào)整。
本文以選擇內(nèi)容分割模板為例。
2.測試結(jié)果
登錄消息隊列 RocketMQ 版控制臺,找到上一步配置的Source(源)實例,在目標(biāo)topic右側(cè)操作列,單擊 ,然后在快速體驗的消息生產(chǎn)和消費面板,輸入原始消息,單擊確定即可發(fā)送消息。
在消息隊列 RocketMQ 版控制臺,找到上一步配置的Sink(目標(biāo))實例,依次單擊 查詢消息,您可以看到上一步發(fā)送的消息發(fā)送到目標(biāo)端時已被分割為3條消息,依次單擊消息行的詳情,可以看到3條消息分別為
"data": "張三,男,4班"
、"data": "李四,女,3班"
和"data": "王五,男,4班"
。