使用函數(shù)計算實(shí)現(xiàn)消息數(shù)據(jù)清洗
消息數(shù)據(jù)清洗功能提供常見的消息處理模板,如消息分割、動態(tài)路由和消息富化等。您可以直接利用模板處理消息,也可以根據(jù)業(yè)務(wù)情況在模板基礎(chǔ)上修改代碼。本文介紹云消息隊(duì)列 RocketMQ 版消息數(shù)據(jù)清洗模板的類型和使用方式。
背景信息
消息數(shù)據(jù)清洗任務(wù)提供基本的算子能力,底層邏輯使用函數(shù)計算。云消息隊(duì)列 RocketMQ 版消息數(shù)據(jù)清洗任務(wù)創(chuàng)建完成后,您可以登錄函數(shù)計算控制臺,進(jìn)行代碼自定義及相應(yīng)函數(shù)配置的修改。
算子 | 算子能力說明 |
消息過濾 | 按照正則表達(dá)式匹配消息內(nèi)容,將匹配成功的消息發(fā)送至目標(biāo)。更多信息,請參見事件模式。 |
消息轉(zhuǎn)換 | 根據(jù)字符串匹配,進(jìn)行消息內(nèi)容替換,例如字符大小寫轉(zhuǎn)換。將轉(zhuǎn)換后的消息發(fā)送至目標(biāo)。更多信息,請參見事件內(nèi)容轉(zhuǎn)換。 |
根據(jù)正則表達(dá)式對消息內(nèi)容進(jìn)行分割,將分割后的消息逐條發(fā)送至目標(biāo)。 | |
根據(jù)正則表達(dá)式匹配消息內(nèi)容,將匹配成功的消息路由至對應(yīng)目標(biāo),將匹配不成功的消息路由至默認(rèn)目標(biāo)。 | |
根據(jù)富化源對消息內(nèi)容進(jìn)行富化。如果消息原始內(nèi)容包含AccountID,處理時根據(jù)AccountID查詢數(shù)據(jù)庫,獲得客戶地域后填至源消息體中,并發(fā)送至目標(biāo)服務(wù)。 | |
根據(jù)正則表達(dá)式對消息內(nèi)容進(jìn)行映射處理。例如,屏蔽消息中敏感字段或?qū)⑾⒋笮】s減至最小標(biāo)準(zhǔn)。 |
內(nèi)容分割
使用示例
例如,以下是一份學(xué)生名單。
message:
[張三,男,17,4班;李四,女,17,3班;王五,男,17,4班]
需要將消息拆分為單個學(xué)生的信息,然后分三條消息推送至各目標(biāo)服務(wù)。如下所示。
message:
[張三,男,17,4班]
message:
[李四,女,17,3班]
message:
[王五,男,17,4班]
操作步驟
在左側(cè)導(dǎo)航欄,選擇 ,然后在頂部菜單欄,選擇地域。
在消息流出(sink)頁面,單擊創(chuàng)建任務(wù)。
在消息流出創(chuàng)建面板,設(shè)置以下配置項(xiàng),然后單擊確定。
關(guān)鍵配置項(xiàng)說明如下,其余保持默認(rèn)值即可。
基礎(chǔ)信息
配置項(xiàng)
說明
任務(wù)名稱
填寫任務(wù)名稱。
流出類型
本文選擇消息隊(duì)列 RocketMQ。支持的消息服務(wù)包括消息隊(duì)列 RocketMQ、消息隊(duì)列 RabbitMQ、輕量消息隊(duì)列(原 MNS)和消息隊(duì)列 Kafka。
資源配置
配置項(xiàng)
說明
源
地域
本文選擇華東1(杭州)。
版本
選擇RocketMQ的版本,本文選擇RocketMQ 5.x。
RocketMQ 實(shí)例
選擇生產(chǎn)消息的RocketMQ實(shí)例。
Topic
選擇源實(shí)例的Topic。
目標(biāo)
版本
選擇接收消息的RocketMQ的版本,本文選擇RocketMQ 5.x。
實(shí)例 ID
選擇接收消息的RocketMQ的實(shí)例 ID。
Topic
選擇目標(biāo)實(shí)例的Topic。
數(shù)據(jù)處理
消息過濾:選擇無需過濾。
消息轉(zhuǎn)換:選擇自定義配置。消息體(body)選擇數(shù)據(jù)清洗,并選中新建函數(shù)模板。函數(shù)模板選擇內(nèi)容分割 transform_split,然后根據(jù)業(yè)務(wù)情況修改函數(shù)代碼。
創(chuàng)建完成后,您可以登錄函數(shù)計算控制臺查看自動創(chuàng)建的服務(wù)和函數(shù)。
動態(tài)路由
使用示例
例如,以下是一份牙膏信息清單。
message:
[BrandA, toothpaste, $12.98, 100g
BrandB, toothpaste, $7.99, 80g
BrandC, toothpaste, $1.99, 100g]
需要按照自定義動態(tài)規(guī)則,將列表路由至目標(biāo)Topic。規(guī)則描述如下所示。
如果消息以BrandA開頭,發(fā)送至BrandA-item-topic和BrandA-discount-topic這兩個topic。
如果消息以BrandB開頭,發(fā)送至BrandB-item-topic和BrandB-discount-topic這兩個topic。
其余消息發(fā)送至Unknown-brand-topic。
規(guī)則的JSON描述如下。
{
"defaultTopic": "Unknown-brand-topic",
"rules": [
{
"regex": "^BrandA",
"targetTopics": [
"BrandA-item-topic",
"BrandA-discount-topic"
]
},
{
"regex": "^BrandB",
"targetTopics": [
"BrandB-item-topic",
"BrandB-discount-topic"
]
}
]
}
操作步驟
具體操作,請參見消息分割操作步驟。其中,函數(shù)模板需選擇為動態(tài)路由 dynamic_routing。
內(nèi)容富化
使用示例
本文以一個IP地址段處理的場景富化為例。假設(shè)某服務(wù)的訪問日志如下所示。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX"
}
需要統(tǒng)計IP地址的來源,并且映射關(guān)系存儲于MySQL數(shù)據(jù)庫。
CREATE TABLE `tb_ip` (
-> `IP` VARCHAR(256) NOT NULL,
-> `Region` VARCHAR(256) NOT NULL,
-> `ISP` VARCHAR(256) NOT NULL,
-> PRIMARY KEY (`IP`)
-> );
處理后的消息結(jié)果如下所示。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX",
"region": "beijing"
}
操作步驟
具體操作,請參見消息分割操作步驟。其中,函數(shù)模板需選擇為內(nèi)容富化 transform_enrichment。
內(nèi)容映射
使用示例
例如,以下是某公司員工登記信息,涉及了員工工號、電話號碼等隱私內(nèi)容。
張三,工號1,131 1111 1111
李四,工號2,132 2222 2222
王五,工號3,133 3333 3333
需要將以上消息中員工隱私信息進(jìn)行屏蔽,然后推送至目標(biāo)服務(wù)。如下所示。
張*,工號*,*** **** ****
李*,工號*,*** **** ****
王*,工號*,*** **** ****
操作步驟
具體操作,請參見消息分割操作步驟。其中,函數(shù)模板需選擇為內(nèi)容映射 transform_projection。