日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

路由到對象存儲OSS

本文介紹如何創(chuàng)建使用OSS Sink Connector,您可以通過OSS Sink Connector將數(shù)據(jù)從云消息隊列 Kafka 版的數(shù)據(jù)源Topic導(dǎo)出至對象存儲OSS的Object中。

前提條件

詳細(xì)步驟,請參見創(chuàng)建前提

注意事項

  1. Connector基于事件被處理的時間做時間分區(qū),非事件的產(chǎn)生時間,如按時間分區(qū),時間邊界的數(shù)據(jù)可能被投遞至下一個時間分區(qū)目錄。

  2. 臟數(shù)據(jù)處理問題:如果在任務(wù)的自定義分區(qū)或文件內(nèi)容中配置了JsonPath,但數(shù)據(jù)未命中JsonPath規(guī)則,Connector會將這類數(shù)據(jù)按攢批策略投遞到Bucket下的invalidRuleData/路徑。如發(fā)現(xiàn)Bucket下有此目錄,請檢查JsonPath規(guī)則的正確性,且避免消費端漏消費數(shù)據(jù)。

  3. 鏈路可能存在秒級到分鐘級別內(nèi)的延時。

  4. 如果自定義分區(qū)或文件內(nèi)容中配置的JsonPath規(guī)則需對Kafka Source消息內(nèi)容做提取,需在Kafka Source側(cè)將內(nèi)容編解碼為Json格式。

  5. Connector實時將上游數(shù)據(jù)以Append追加方式寫入OSS中,因此單個分區(qū)路徑下,可見的最新文件通常處于寫入中的狀態(tài),而非目標(biāo)狀態(tài),請謹(jǐn)慎消費。

計費說明

Connector任務(wù)運行在阿里云函數(shù)計算平臺,任務(wù)加工傳輸產(chǎn)生的計算資源將按函數(shù)計算單價計費,詳情請參見計費概述

步驟一:創(chuàng)建目標(biāo)服務(wù)資源

在對象存儲OSS控制臺創(chuàng)建一個存儲空間(Bucket)。詳細(xì)步驟,請參見控制臺創(chuàng)建存儲空間

本文以oss-sink-connector-bucket Bucket為例。

步驟二:創(chuàng)建OSS Sink Connector并啟動

  1. 登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區(qū)域,選擇地域。

  2. 在左側(cè)導(dǎo)航欄,選擇Connector 生態(tài)集成 > 任務(wù)列表

  3. 任務(wù)列表頁面,單擊創(chuàng)建任務(wù)

    • 任務(wù)創(chuàng)建

      1. Source(源)配置向?qū)Вx擇數(shù)據(jù)提供方消息隊列 Kafka 版,設(shè)置以下參數(shù),然后單擊下一步

        參數(shù)

        說明

        示例

        地域

        選擇云消息隊列 Kafka 版源實例所在的地域。

        華北2(北京)

        kafka 實例

        選擇生產(chǎn)云消息隊列 Kafka 版消息的源實例。

        alikafka_post-cn-jte3****

        Topic

        選擇生產(chǎn)云消息隊列 Kafka 版消息的Topic。

        topic

        Group ID

        數(shù)據(jù)源所在的云消息隊列 Kafka 版實例的Group ID。

        • 快速創(chuàng)建:新建一個Group ID資源,

        • 使用已有:填寫一個已經(jīng)的創(chuàng)建好的GroupID資源。

        GID_http_1

        消費位點

        選擇開始消費消息的位點。

        最新位點

        網(wǎng)絡(luò)配置

        選擇路由消息的網(wǎng)絡(luò)類型。

        基礎(chǔ)網(wǎng)絡(luò)

        專有網(wǎng)絡(luò)VPC

        選擇VPC ID。當(dāng)網(wǎng)絡(luò)配置設(shè)置為自建公網(wǎng)時需要設(shè)置此參數(shù)。

        vpc-bp17fapfdj0dwzjkd****

        交換機

        選擇vSwitch ID。當(dāng)網(wǎng)絡(luò)配置設(shè)置為自建公網(wǎng)時需要設(shè)置此參數(shù)。

        vsw-bp1gbjhj53hdjdkg****

        安全組

        選擇安全組。當(dāng)網(wǎng)絡(luò)配置設(shè)置為自建公網(wǎng)時需要設(shè)置此參數(shù)。

        alikafka_pre-cn-7mz2****

        批量推送條數(shù)

        調(diào)用函數(shù)發(fā)送的最大批量消息條數(shù),當(dāng)積壓的消息數(shù)量到達(dá)設(shè)定值時才會發(fā)送請求,取值范圍為 [1,10000]。

        100

        批量推送間隔(單位:秒)

        調(diào)用函數(shù)的間隔時間,系統(tǒng)每到間隔時間點會將消息聚合后發(fā)給函數(shù)計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。

        3

      2. Filtering(過濾)配置向?qū)ВO(shè)置數(shù)據(jù)模式內(nèi)容過濾發(fā)送的請求。更多信息,請參見消息過濾

      3. Transform(轉(zhuǎn)換)配置向?qū)ВO(shè)置數(shù)據(jù)清洗,實現(xiàn)分割、映射、富化及動態(tài)路由等繁雜數(shù)據(jù)加工能力。更多信息,請參見數(shù)據(jù)清洗

      4. Sink(目標(biāo))配置向?qū)Вx擇服務(wù)類型對象存儲 OSS,配置以下參數(shù)。

        參數(shù)

        說明

        示例

        OSS Bucket

        已創(chuàng)建的對象存儲OSS Bucket。

        重要
        • 保證填寫的Bucket已手動創(chuàng)建完成,且在任務(wù)運行期間不被刪除。

        • OSS存儲類型請選擇為標(biāo)準(zhǔn)存儲、低頻存儲,暫不支持投遞至歸檔存儲Bucket。

        • 創(chuàng)建OSS Sink Connector任務(wù)后,平臺將在此OSS Bucket一級目錄下生成.tmp/系統(tǒng)文件路徑,請勿刪除和使用該路徑下的OSS Object。

        oss-sink-connector-bucket

        保存路徑

        OSS Object規(guī)則分Path和Name兩部分,例如ObjectKey為a/b/c/a.txt時,Path為a/b/c/,Name為a.txt,其中自定義分區(qū)(即Path)可自定義。Name由Connector內(nèi)部按固定規(guī)則生成:{毫秒級別unix 時間戳}_{8位隨機字符串},例如:1705576353794_elJmxu3v。

        • 如未配置,或配置為 "/",表示無分區(qū),數(shù)據(jù)將保存在Bucket一級目錄下。

        • 支持時間變量參數(shù),{yyyy}、{MM}、{dd}、{HH} ,分別代表年、月、天、小時,大小寫敏感。

        • 支持JsonPath規(guī)則自定義OSS路徑參數(shù),例如: {$.data.topic}、{$.data.partition}。JsonPath變量需滿足標(biāo)準(zhǔn)JsonPath表達(dá)式,受限于OSS路徑規(guī)則,通過JsonPath提取值的類型建議為int、string,且值中全部為標(biāo)準(zhǔn)UTF-8字符,不包含空格、".."、表情符、"/"、 "\"等字符,否則可能會產(chǎn)生數(shù)據(jù)寫入異常風(fēng)險。

        • 支持常量。

          說明

          分區(qū)配置可以對數(shù)據(jù)做合理分組,避免單路徑下小文件過多造成不可控問題。

          Connector 的吞吐能力和分區(qū)數(shù)正相關(guān),無分區(qū)或分區(qū)少時 Connector 吞吐較弱,可能造成上游堆積問題。分區(qū)較多會導(dǎo)致數(shù)據(jù)分散、寫入次數(shù)增多、碎片文件多等問題,因此分區(qū)的配置策略非常關(guān)鍵,因為以下為參考建議:

          • Kafka Source:可同時按時間和 partition 分區(qū),當(dāng)性能無法滿足時,可通過提升 kafka partition 數(shù)量間接提升 Connector 吞吐,例如:prefix/{yyyy}/{MM}/{dd}/{HH}/{$.data.partition}/

          • 業(yè)務(wù)分組:按照數(shù)據(jù)中的某個業(yè)務(wù)字段分區(qū),此時吞吐速率取決于業(yè)務(wù)字段取值的數(shù)量,例如:prefixV2/{$.data.body.field}/

          不同的任務(wù)建議配置不同的常量前綴,避免多個任務(wù)共享相同的分區(qū),寫入同一目錄下造成數(shù)據(jù)混亂,無法區(qū)分。

        alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH

        時區(qū)配置

        默認(rèn)為UTC+8:00時區(qū),此規(guī)則僅對時間分區(qū)配置有效。

        UTC+8:00

        批量聚合文件大小

        配置需要聚合的文件大小,取值范圍為[1,1024],單位:MB。

        說明
        • Connector 將數(shù)據(jù)分批寫入到同一個 OSS Object 中,每一批數(shù)據(jù)大小為 (0 MB,16 MB],因此OSS Object最終大小可能會略大于配置值,最多超出16 MB。

        • 在大流量場景下,建議聚合文件大小配置百MB級別,例如128MB、512MB,聚合時間窗口配置小時級別,如60 min、120 min。

        5

        批量聚合時間窗口

        配置需要聚合的時間窗口。取值范圍為[1,1440],單位:分鐘。

        1

        文件壓縮

        • 無需壓縮:生成的OSS Object無后綴名。

        • GZIP:生成后綴為.gz的 Object。

        • Snappy:生成后綴為.snappy的Object。

        • Zstd:生成后綴為.zstd的 Object。

        當(dāng)選擇壓縮后,Connector按壓縮前數(shù)據(jù)大小進(jìn)行攢批,因此OSS側(cè)顯示的Object大小會小于攢批大小,解壓后大小接近攢批值。

        無需壓縮

        文件內(nèi)容

        • 完整事件:Connector通過 CloudEvent協(xié)議包裝了原始消息,完整事件表示包含CloudEvent協(xié)議后的數(shù)據(jù)。如下示例中,data字段內(nèi)容為數(shù)據(jù)信息,其他字段為CloudEvent協(xié)議附加的Meta信息。

          {
            "specversion": "1.0",
            "id": "8e215af8-ca18-4249-8645-f96c1026****",
            "source": "acs:alikafka",
            "type": "alikafka:Topic:Message",
            "subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
            "datacontenttype": "application/json; charset=utf-8",
            "time": "2022-06-23T02:49:51.589Z",
            "aliyunaccountid": "182572506381****",
            "data": {
              "topic": "****",
              "partition": 7,
              "offset": 25,
              "timestamp": 1655952591589,
              "headers": {
                "headers": [],
                "isReadOnly": false
              },
              "key": "keytest",
              "value": "hello kafka msg"
            }
          }
        • 部分事件:將JSONPath提取后的部分?jǐn)?shù)據(jù)投遞至OSS,比如配置$.data,則僅將data 字段的值投遞到OSS。

        如果對CloudEvent協(xié)議的附加字段無強需求,建議配置部分事件和$.data表達(dá)式,保證將Source的原始消息投遞到OSS,降低存儲成本,提升傳輸效率。

        部分事件

        $.data
    • 任務(wù)屬性

      配置事件推送失敗時的重試策略及錯誤發(fā)生時的處理方式。更多信息,請參見重試和死信

  4. 返回任務(wù)列表頁面,找到創(chuàng)建好的任務(wù),在其右側(cè)操作列,單擊啟用

  5. 提示對話框,閱讀提示信息,然后單擊確認(rèn)

    啟用任務(wù)后,會有30秒~60秒的延遲時間,您可以在任務(wù)列表頁面的狀態(tài)欄查看啟動進(jìn)度。

步驟三:測試OSS Sink Connector

  1. 消息流出(Sink)頁面,在OSS Sink Connector任務(wù)的事件源列單擊源Topic。

  2. 在Topic詳情頁面,單擊體驗發(fā)送消息

  3. 快速體驗消息收發(fā)面板,按照下圖配置消息內(nèi)容,然后單擊確定

    發(fā)送消息

  4. 消息流出(Sink)頁面,在OSS Sink Connector任務(wù)的事件目標(biāo)列單擊目標(biāo)Bucket。

  5. 在Bucket頁面,選擇左側(cè)導(dǎo)航欄的文件管理 > 文件列表

    • tmp目錄:Connector依賴的系統(tǒng)文件路徑,請勿刪除和使用該路徑OSS Object。

    • 數(shù)據(jù)文件目錄:目錄下按任務(wù)的分區(qū)路徑規(guī)則生成子目錄,并在最深層目錄下上傳數(shù)據(jù)文件。

    最深層路徑

  6. 在對應(yīng)Object右側(cè)操作列,選擇圖標(biāo) > 下載

  7. 打開下載的文件,查看消息內(nèi)容。

    消息

    如圖所示,多條消息之間通過換行分隔。