路由到對象存儲OSS
本文介紹如何創(chuàng)建使用OSS Sink Connector,您可以通過OSS Sink Connector將數(shù)據(jù)從云消息隊列 Kafka 版的數(shù)據(jù)源Topic導(dǎo)出至對象存儲OSS的Object中。
前提條件
詳細(xì)步驟,請參見創(chuàng)建前提。
注意事項
Connector基于事件被處理的時間做時間分區(qū),非事件的產(chǎn)生時間,如按時間分區(qū),時間邊界的數(shù)據(jù)可能被投遞至下一個時間分區(qū)目錄。
臟數(shù)據(jù)處理問題:如果在任務(wù)的自定義分區(qū)或文件內(nèi)容中配置了JsonPath,但數(shù)據(jù)未命中JsonPath規(guī)則,Connector會將這類數(shù)據(jù)按攢批策略投遞到Bucket下的invalidRuleData/路徑。如發(fā)現(xiàn)Bucket下有此目錄,請檢查JsonPath規(guī)則的正確性,且避免消費端漏消費數(shù)據(jù)。
鏈路可能存在秒級到分鐘級別內(nèi)的延時。
如果自定義分區(qū)或文件內(nèi)容中配置的JsonPath規(guī)則需對Kafka Source消息內(nèi)容做提取,需在Kafka Source側(cè)將內(nèi)容編解碼為Json格式。
Connector實時將上游數(shù)據(jù)以Append追加方式寫入OSS中,因此單個分區(qū)路徑下,可見的最新文件通常處于寫入中的狀態(tài),而非目標(biāo)狀態(tài),請謹(jǐn)慎消費。
計費說明
Connector任務(wù)運行在阿里云函數(shù)計算平臺,任務(wù)加工傳輸產(chǎn)生的計算資源將按函數(shù)計算單價計費,詳情請參見計費概述。
步驟一:創(chuàng)建目標(biāo)服務(wù)資源
在對象存儲OSS控制臺創(chuàng)建一個存儲空間(Bucket)。詳細(xì)步驟,請參見控制臺創(chuàng)建存儲空間。
本文以oss-sink-connector-bucket Bucket為例。
步驟二:創(chuàng)建OSS Sink Connector并啟動
登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區(qū)域,選擇地域。
在左側(cè)導(dǎo)航欄,選擇 。
在任務(wù)列表頁面,單擊創(chuàng)建任務(wù)。
任務(wù)創(chuàng)建
在Source(源)配置向?qū)Вx擇數(shù)據(jù)提供方為消息隊列 Kafka 版,設(shè)置以下參數(shù),然后單擊下一步。
參數(shù)
說明
示例
地域
選擇云消息隊列 Kafka 版源實例所在的地域。
華北2(北京)
kafka 實例
選擇生產(chǎn)云消息隊列 Kafka 版消息的源實例。
alikafka_post-cn-jte3****
Topic
選擇生產(chǎn)云消息隊列 Kafka 版消息的Topic。
topic
Group ID
數(shù)據(jù)源所在的云消息隊列 Kafka 版實例的Group ID。
快速創(chuàng)建:新建一個Group ID資源,
使用已有:填寫一個已經(jīng)的創(chuàng)建好的GroupID資源。
GID_http_1
消費位點
選擇開始消費消息的位點。
最新位點
網(wǎng)絡(luò)配置
選擇路由消息的網(wǎng)絡(luò)類型。
基礎(chǔ)網(wǎng)絡(luò)
專有網(wǎng)絡(luò)VPC
選擇VPC ID。當(dāng)網(wǎng)絡(luò)配置設(shè)置為自建公網(wǎng)時需要設(shè)置此參數(shù)。
vpc-bp17fapfdj0dwzjkd****
交換機
選擇vSwitch ID。當(dāng)網(wǎng)絡(luò)配置設(shè)置為自建公網(wǎng)時需要設(shè)置此參數(shù)。
vsw-bp1gbjhj53hdjdkg****
安全組
選擇安全組。當(dāng)網(wǎng)絡(luò)配置設(shè)置為自建公網(wǎng)時需要設(shè)置此參數(shù)。
alikafka_pre-cn-7mz2****
批量推送條數(shù)
調(diào)用函數(shù)發(fā)送的最大批量消息條數(shù),當(dāng)積壓的消息數(shù)量到達(dá)設(shè)定值時才會發(fā)送請求,取值范圍為 [1,10000]。
100
批量推送間隔(單位:秒)
調(diào)用函數(shù)的間隔時間,系統(tǒng)每到間隔時間點會將消息聚合后發(fā)給函數(shù)計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。
3
在Filtering(過濾)配置向?qū)ВO(shè)置數(shù)據(jù)模式內(nèi)容過濾發(fā)送的請求。更多信息,請參見消息過濾。
在Transform(轉(zhuǎn)換)配置向?qū)ВO(shè)置數(shù)據(jù)清洗,實現(xiàn)分割、映射、富化及動態(tài)路由等繁雜數(shù)據(jù)加工能力。更多信息,請參見數(shù)據(jù)清洗。
在Sink(目標(biāo))配置向?qū)Вx擇服務(wù)類型為對象存儲 OSS,配置以下參數(shù)。
參數(shù)
說明
示例
OSS Bucket
已創(chuàng)建的對象存儲OSS Bucket。
重要保證填寫的Bucket已手動創(chuàng)建完成,且在任務(wù)運行期間不被刪除。
OSS存儲類型請選擇為標(biāo)準(zhǔn)存儲、低頻存儲,暫不支持投遞至歸檔存儲Bucket。
創(chuàng)建OSS Sink Connector任務(wù)后,平臺將在此OSS Bucket一級目錄下生成.tmp/系統(tǒng)文件路徑,請勿刪除和使用該路徑下的OSS Object。
oss-sink-connector-bucket
保存路徑
OSS Object規(guī)則分Path和Name兩部分,例如ObjectKey為
a/b/c/a.txt
時,Path為a/b/c/
,Name為a.txt
,其中自定義分區(qū)(即Path)可自定義。Name由Connector內(nèi)部按固定規(guī)則生成:{毫秒級別unix 時間戳}_{8位隨機字符串}
,例如:1705576353794_elJmxu3v。如未配置,或配置為 "/",表示無分區(qū),數(shù)據(jù)將保存在Bucket一級目錄下。
支持時間變量參數(shù),{yyyy}、{MM}、{dd}、{HH} ,分別代表年、月、天、小時,大小寫敏感。
支持JsonPath規(guī)則自定義OSS路徑參數(shù),例如: {$.data.topic}、{$.data.partition}。JsonPath變量需滿足標(biāo)準(zhǔn)JsonPath表達(dá)式,受限于OSS路徑規(guī)則,通過JsonPath提取值的類型建議為int、string,且值中全部為標(biāo)準(zhǔn)UTF-8字符,不包含空格、".."、表情符、"/"、 "\"等字符,否則可能會產(chǎn)生數(shù)據(jù)寫入異常風(fēng)險。
支持常量。
說明分區(qū)配置可以對數(shù)據(jù)做合理分組,避免單路徑下小文件過多造成不可控問題。
Connector 的吞吐能力和分區(qū)數(shù)正相關(guān),無分區(qū)或分區(qū)少時 Connector 吞吐較弱,可能造成上游堆積問題。分區(qū)較多會導(dǎo)致數(shù)據(jù)分散、寫入次數(shù)增多、碎片文件多等問題,因此分區(qū)的配置策略非常關(guān)鍵,因為以下為參考建議:
Kafka Source:可同時按時間和 partition 分區(qū),當(dāng)性能無法滿足時,可通過提升 kafka partition 數(shù)量間接提升 Connector 吞吐,例如:prefix/{yyyy}/{MM}/{dd}/{HH}/{$.data.partition}/
業(yè)務(wù)分組:按照數(shù)據(jù)中的某個業(yè)務(wù)字段分區(qū),此時吞吐速率取決于業(yè)務(wù)字段取值的數(shù)量,例如:prefixV2/{$.data.body.field}/
不同的任務(wù)建議配置不同的常量前綴,避免多個任務(wù)共享相同的分區(qū),寫入同一目錄下造成數(shù)據(jù)混亂,無法區(qū)分。
alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH
時區(qū)配置
默認(rèn)為UTC+8:00時區(qū),此規(guī)則僅對時間分區(qū)配置有效。
UTC+8:00
批量聚合文件大小
配置需要聚合的文件大小,取值范圍為[1,1024],單位:MB。
說明Connector 將數(shù)據(jù)分批寫入到同一個 OSS Object 中,每一批數(shù)據(jù)大小為 (0 MB,16 MB],因此OSS Object最終大小可能會略大于配置值,最多超出16 MB。
在大流量場景下,建議聚合文件大小配置百MB級別,例如128MB、512MB,聚合時間窗口配置小時級別,如60 min、120 min。
5
批量聚合時間窗口
配置需要聚合的時間窗口。取值范圍為[1,1440],單位:分鐘。
1
文件壓縮
無需壓縮:生成的OSS Object無后綴名。
GZIP:生成后綴為.gz的 Object。
Snappy:生成后綴為.snappy的Object。
Zstd:生成后綴為.zstd的 Object。
當(dāng)選擇壓縮后,Connector按壓縮前數(shù)據(jù)大小進(jìn)行攢批,因此OSS側(cè)顯示的Object大小會小于攢批大小,解壓后大小接近攢批值。
無需壓縮
文件內(nèi)容
完整事件:Connector通過 CloudEvent協(xié)議包裝了原始消息,完整事件表示包含CloudEvent協(xié)議后的數(shù)據(jù)。如下示例中,data字段內(nèi)容為數(shù)據(jù)信息,其他字段為CloudEvent協(xié)議附加的Meta信息。
{ "specversion": "1.0", "id": "8e215af8-ca18-4249-8645-f96c1026****", "source": "acs:alikafka", "type": "alikafka:Topic:Message", "subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-23T02:49:51.589Z", "aliyunaccountid": "182572506381****", "data": { "topic": "****", "partition": 7, "offset": 25, "timestamp": 1655952591589, "headers": { "headers": [], "isReadOnly": false }, "key": "keytest", "value": "hello kafka msg" } }
部分事件:將JSONPath提取后的部分?jǐn)?shù)據(jù)投遞至OSS,比如配置$.data,則僅將data 字段的值投遞到OSS。
如果對CloudEvent協(xié)議的附加字段無強需求,建議配置部分事件和$.data表達(dá)式,保證將Source的原始消息投遞到OSS,降低存儲成本,提升傳輸效率。
部分事件
$.data
任務(wù)屬性
配置事件推送失敗時的重試策略及錯誤發(fā)生時的處理方式。更多信息,請參見重試和死信。
返回任務(wù)列表頁面,找到創(chuàng)建好的任務(wù),在其右側(cè)操作列,單擊啟用。
在提示對話框,閱讀提示信息,然后單擊確認(rèn)。
啟用任務(wù)后,會有30秒~60秒的延遲時間,您可以在任務(wù)列表頁面的狀態(tài)欄查看啟動進(jìn)度。
步驟三:測試OSS Sink Connector
在消息流出(Sink)頁面,在OSS Sink Connector任務(wù)的事件源列單擊源Topic。
在Topic詳情頁面,單擊體驗發(fā)送消息。
在快速體驗消息收發(fā)面板,按照下圖配置消息內(nèi)容,然后單擊確定。
在消息流出(Sink)頁面,在OSS Sink Connector任務(wù)的事件目標(biāo)列單擊目標(biāo)Bucket。
在Bucket頁面,選擇左側(cè)導(dǎo)航欄的 。
tmp目錄:Connector依賴的系統(tǒng)文件路徑,請勿刪除和使用該路徑OSS Object。
數(shù)據(jù)文件目錄:目錄下按任務(wù)的分區(qū)路徑規(guī)則生成子目錄,并在最深層目錄下上傳數(shù)據(jù)文件。
在對應(yīng)Object右側(cè)操作列,選擇 。
打開下載的文件,查看消息內(nèi)容。
如圖所示,多條消息之間通過換行分隔。