本文介紹如何創(chuàng)建OSS Sink Connector將數(shù)據(jù)從云消息隊(duì)列 Kafka 版實(shí)例的數(shù)據(jù)源Topic導(dǎo)出至對象存儲(chǔ)OSS。

前提條件

在導(dǎo)出數(shù)據(jù)前,請確保您已完成以下操作:

注意事項(xiàng)

  • 僅支持在同地域內(nèi),將數(shù)據(jù)從云消息隊(duì)列 Kafka 版實(shí)例的數(shù)據(jù)源Topic導(dǎo)出至函數(shù)計(jì)算,再由函數(shù)計(jì)算導(dǎo)出至對象存儲(chǔ)。Connector的限制說明,請參見使用限制
  • 該功能基于函數(shù)計(jì)算服務(wù)提供。函數(shù)計(jì)算為您提供了一定的免費(fèi)額度,超額部分將產(chǎn)生費(fèi)用,請以函數(shù)計(jì)算的計(jì)費(fèi)規(guī)則為準(zhǔn)。計(jì)費(fèi)詳情,請參見計(jì)費(fèi)概述
  • 函數(shù)計(jì)算的函數(shù)調(diào)用支持日志查詢。具體操作步驟,請參見配置日志
  • 消息轉(zhuǎn)儲(chǔ)時(shí),云消息隊(duì)列 Kafka 版中消息用UTF-8 String序列化,暫不支持二進(jìn)制的數(shù)據(jù)格式。

創(chuàng)建并部署OSS Sink Connector

  1. 登錄云消息隊(duì)列 Kafka 版控制臺(tái)
  2. 概覽頁面的資源分布區(qū)域,選擇地域。
  3. 在左側(cè)導(dǎo)航欄,單擊Connector 任務(wù)列表
  4. Connector 任務(wù)列表頁面,從選擇實(shí)例的下拉列表選擇Connector所屬的實(shí)例,然后單擊創(chuàng)建 Connector
  5. 創(chuàng)建 Connector配置向?qū)骓撁妫瓿梢韵虏僮鳌?/span>
    1. 配置基本信息頁簽,按需配置以下參數(shù),然后單擊下一步
      重要 云消息隊(duì)列 Kafka 版會(huì)為您自動(dòng)選中授權(quán)創(chuàng)建服務(wù)關(guān)聯(lián)角色
      • 如果未創(chuàng)建服務(wù)關(guān)聯(lián)角色,云消息隊(duì)列 Kafka 版會(huì)為您自動(dòng)創(chuàng)建一個(gè)服務(wù)關(guān)聯(lián)角色,以便您使用云消息隊(duì)列 Kafka 版導(dǎo)出數(shù)據(jù)至對象存儲(chǔ)的功能。
      • 如果已創(chuàng)建服務(wù)關(guān)聯(lián)角色,云消息隊(duì)列 Kafka 版不會(huì)重復(fù)創(chuàng)建。
      關(guān)于該服務(wù)關(guān)聯(lián)角色的更多信息,請參見服務(wù)關(guān)聯(lián)角色
      參數(shù)描述示例值
      名稱Connector的名稱。命名規(guī)則:
      • 可以包含數(shù)字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個(gè)字符。
      • 同一個(gè)云消息隊(duì)列 Kafka 版實(shí)例內(nèi)保持唯一。

      Connector的數(shù)據(jù)同步任務(wù)必須使用名稱為connect-任務(wù)名稱Group。如果您未手動(dòng)創(chuàng)建該Group,系統(tǒng)將為您自動(dòng)創(chuàng)建。

      kafka-oss-sink
      實(shí)例默認(rèn)配置為實(shí)例的名稱與實(shí)例ID。demo alikafka_post-cn-st21p8vj****
    2. 配置源服務(wù)頁簽,選擇數(shù)據(jù)源消息隊(duì)列Kafka版,并配置以下參數(shù),然后單擊下一步
      參數(shù)描述示例值
      數(shù)據(jù)源 Topic需要同步數(shù)據(jù)的Topic。oss-test-input
      消費(fèi)線程并發(fā)數(shù)數(shù)據(jù)源Topic的消費(fèi)線程并發(fā)數(shù)。默認(rèn)值為6。取值說明如下:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      消費(fèi)初始位置開始消費(fèi)的位置。取值說明如下:
      • 最早位點(diǎn):從最初位點(diǎn)開始消費(fèi)。
      • 最近位點(diǎn):從最新位點(diǎn)開始消費(fèi)。
      最早位點(diǎn)
      VPC ID數(shù)據(jù)同步任務(wù)所在的VPC。單擊配置運(yùn)行環(huán)境顯示該參數(shù)。默認(rèn)為云消息隊(duì)列 Kafka 版實(shí)例所在的VPC,您無需填寫。vpc-bp1xpdnd3l***
      vSwitch ID數(shù)據(jù)同步任務(wù)所在的交換機(jī)。單擊配置運(yùn)行環(huán)境顯示該參數(shù)。該交換機(jī)必須與云消息隊(duì)列 Kafka 版實(shí)例處于同一VPC。默認(rèn)為部署云消息隊(duì)列 Kafka 版實(shí)例時(shí)填寫的交換機(jī)。vsw-bp1d2jgg81***
      失敗處理消息發(fā)送失敗后,是否繼續(xù)訂閱出現(xiàn)錯(cuò)誤的Topic的分區(qū)。單擊配置運(yùn)行環(huán)境顯示該參數(shù)。取值說明如下。
      • 繼續(xù)訂閱:繼續(xù)訂閱出現(xiàn)錯(cuò)誤的Topic的分區(qū),并打印錯(cuò)誤日志。
      • 停止訂閱:停止訂閱出現(xiàn)錯(cuò)誤的Topic的分區(qū),并打印錯(cuò)誤日志
      說明
      繼續(xù)訂閱
      創(chuàng)建資源方式選擇創(chuàng)建Connector所依賴的Topic與Group的方式。單擊配置運(yùn)行環(huán)境顯示該參數(shù)。
      • 自動(dòng)創(chuàng)建
      • 手動(dòng)創(chuàng)建
      自動(dòng)創(chuàng)建
      Connector 消費(fèi)組Connector使用的Group。單擊配置運(yùn)行環(huán)境顯示該參數(shù)。該Group的名稱建議以connect-cluster開頭。connect-cluster-kafka-oss-sink
      任務(wù)位點(diǎn) Topic用于存儲(chǔ)消費(fèi)位點(diǎn)的Topic。單擊配置運(yùn)行環(huán)境顯示該參數(shù)。
      • Topic:建議以connect-offset開頭。
      • 分區(qū)數(shù):Topic的分區(qū)數(shù)量必須大于1。
      • 存儲(chǔ)引擎:Topic的存儲(chǔ)引擎必須為Local存儲(chǔ)。
        說明 僅專業(yè)版實(shí)例支持在創(chuàng)建Topic時(shí)選擇存儲(chǔ)引擎為Local存儲(chǔ),標(biāo)準(zhǔn)版暫不支持。
      • cleanup.policy:Topic的日志清理策略必須為compact。
      connect-offset-kafka-oss-sink
      任務(wù)配置 Topic用于存儲(chǔ)任務(wù)配置的Topic。單擊配置運(yùn)行環(huán)境顯示該參數(shù)。
      • Topic:建議以connect-config開頭。
      • 分區(qū)數(shù):Topic的分區(qū)數(shù)量必須為1。
      • 存儲(chǔ)引擎:Topic的存儲(chǔ)引擎必須為Local存儲(chǔ)。
        說明 僅專業(yè)版實(shí)例支持在創(chuàng)建Topic時(shí)選擇存儲(chǔ)引擎為Local存儲(chǔ),標(biāo)準(zhǔn)版暫不支持。
      • cleanup.policy:Topic的日志清理策略必須為compact。
      connect-config-kafka-oss-sink
      任務(wù)狀態(tài) Topic用于存儲(chǔ)任務(wù)狀態(tài)的Topic。單擊配置運(yùn)行環(huán)境顯示該參數(shù)。
      • Topic:建議以connect-status開頭。
      • 分區(qū)數(shù):Topic的分區(qū)數(shù)量建議為6。
      • 存儲(chǔ)引擎:Topic的存儲(chǔ)引擎必須為Local存儲(chǔ)。
        說明 僅專業(yè)版實(shí)例支持在創(chuàng)建Topic時(shí)選擇存儲(chǔ)引擎為Local存儲(chǔ),標(biāo)準(zhǔn)版暫不支持。
      • cleanup.policy:Topic的日志清理策略必須為compact。
      connect-status-kafka-oss-sink
      死信隊(duì)列 Topic用于存儲(chǔ)Connect框架的異常數(shù)據(jù)的Topic。單擊配置運(yùn)行環(huán)境顯示該參數(shù)。該Topic可以和異常數(shù)據(jù)Topic為同一個(gè)Topic,以節(jié)省Topic資源。
      • Topic:建議以connect-error開頭。
      • 分區(qū)數(shù):Topic的分區(qū)數(shù)量建議為6。
      • 存儲(chǔ)引擎:Topic的存儲(chǔ)引擎可以為Local存儲(chǔ)或云存儲(chǔ)。
        說明 僅專業(yè)版實(shí)例支持在創(chuàng)建Topic時(shí)選擇存儲(chǔ)引擎為Local存儲(chǔ),標(biāo)準(zhǔn)版暫不支持。
      connect-error-kafka-oss-sink
      異常數(shù)據(jù) Topic用于存儲(chǔ)Sink的異常數(shù)據(jù)的Topic。單擊配置運(yùn)行環(huán)境顯示該參數(shù)。該Topic可以和死信隊(duì)列Topic為同一個(gè)Topic,以節(jié)省Topic資源。
      • Topic:建議以connect-error開頭。
      • 分區(qū)數(shù):Topic的分區(qū)數(shù)量建議為6。
      • 存儲(chǔ)引擎:Topic的存儲(chǔ)引擎可以為Local存儲(chǔ)或云存儲(chǔ)。
        說明 僅專業(yè)版實(shí)例支持在創(chuàng)建Topic時(shí)選擇存儲(chǔ)引擎為Local存儲(chǔ),標(biāo)準(zhǔn)版暫不支持。
      connect-error-kafka-oss-sink
    3. 配置目標(biāo)服務(wù)頁簽,選擇目標(biāo)服務(wù)對象存儲(chǔ),并配置以下參數(shù),然后單擊創(chuàng)建
      參數(shù)描述示例值
      Bucket 名稱對象存儲(chǔ)Bucket的名稱。bucket_test
      Access Key阿里云賬號的AccessKey ID。LTAI4GG2RGAjppjK********
      Secret Key阿里云賬號的AccessKey Secret。WbGPVb5rrecVw3SQvEPw6R********

      請確保您使用的AccessKey ID所對應(yīng)的賬號已被授予以下最小權(quán)限:

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "oss:GetObject",
                      "oss:PutObject"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
      說明

      AccessKey ID和AccessKey Secret是云消息隊(duì)列 Kafka 版創(chuàng)建任務(wù)時(shí)作為環(huán)境變量傳遞至對象存儲(chǔ)的數(shù)據(jù),任務(wù)創(chuàng)建成功后,云消息隊(duì)列 Kafka 版不保存AccessKey ID和AccessKey Secret信息。

      創(chuàng)建完成后,在Connector 任務(wù)列表頁面,查看創(chuàng)建的Connector 。
  6. 創(chuàng)建完成后,在Connector 任務(wù)列表頁面,找到創(chuàng)建的Connector ,單擊其操作列的部署

發(fā)送測試消息

您可以向云消息隊(duì)列 Kafka 版的數(shù)據(jù)源Topic發(fā)送消息,測試數(shù)據(jù)能否被導(dǎo)出至對象存儲(chǔ)。

  1. Connector 任務(wù)列表頁面,找到目標(biāo)Connector,在其右側(cè)操作列,單擊測試
  2. 發(fā)送消息面板,發(fā)送測試消息。
    • 發(fā)送方式選擇控制臺(tái)
      1. 消息 Key文本框中輸入消息的Key值,例如demo。
      2. 消息內(nèi)容文本框輸入測試的消息內(nèi)容,例如 {"key": "test"}。
      3. 設(shè)置發(fā)送到指定分區(qū),選擇是否指定分區(qū)。
        • 單擊,在分區(qū) ID文本框中輸入分區(qū)的ID,例如0。如果您需查詢分區(qū)的ID,請參見查看分區(qū)狀態(tài)
        • 單擊,不指定分區(qū)。
    • 發(fā)送方式選擇Docker,執(zhí)行運(yùn)行 Docker 容器生產(chǎn)示例消息區(qū)域的Docker命令,發(fā)送消息。
    • 發(fā)送方式選擇SDK,根據(jù)您的業(yè)務(wù)需求,選擇需要的語言或者框架的SDK以及接入方式,通過SDK發(fā)送消息。

驗(yàn)證結(jié)果

云消息隊(duì)列 Kafka 版的數(shù)據(jù)源Topic發(fā)送消息后,查看OSS文件管理,驗(yàn)證數(shù)據(jù)導(dǎo)出結(jié)果。更多信息,請參見文件概覽

文件管理中顯示新導(dǎo)出的文件。

files
云消息隊(duì)列 Kafka 版數(shù)據(jù)導(dǎo)出至對象存儲(chǔ)的格式示例如下:
[
    {
        "key":"123",
        "offset":4,
        "overflowFlag":true,
        "partition":0,
        "timestamp":1603779578478,
        "topic":"Test",
        "value":"1",
        "valueSize":272687
    }
]

更多操作

您可以按需對該Connector所依賴的函數(shù)計(jì)算資源進(jìn)行配置。

Connector 任務(wù)列表頁面,找到創(chuàng)建的Connector,單擊其操作列的更多 > 配置函數(shù)
頁面跳轉(zhuǎn)至函數(shù)計(jì)算控制臺(tái),您可以按需配置函數(shù)資源。