當您需要將數據從云消息隊列 Kafka 版實例的數據源Topic導出至阿里云Elasticsearch的索引,您可以通過Elasticsearch Sink Connector實現。本文介紹如何創建Elasticsearch Sink Connector。

前提條件

在導出數據前,請確保您已完成以下操作:
  • 云消息隊列 Kafka 版
    • 云消息隊列 Kafka 版實例開啟Connector。更多信息,請參見開啟Connector
    • 云消息隊列 Kafka 版實例創建數據源Topic。更多信息,請參見步驟一:創建Topic
  • 函數計算
  • Elasticsearch
    說明
    • 函數計算使用的Elasticsearch客戶端版本為7.7.0,為保持兼容,您需創建7.0或以上版本的Elasticsearch實例。
    • 配置白名單時,您可以設置網段為0.0.0.0/0,代表整個VPC可訪問,訪問成功后根據需要修改為對應的網段。

注意事項

  • 僅支持在同地域內,將數據從云消息隊列 Kafka 版實例的數據源Topic導出至函數計算,再由函數計算導出至Elasticsearch。關于Connector的限制說明,請參見使用限制
  • 該功能基于函數計算服務提供。函數計算為您提供了一定的免費額度,超額部分將產生費用,請以函數計算的計費規則為準。計費詳情,請參見計費概述
  • 函數計算的函數調用支持日志查詢,以便您迅速排查問題。具體操作步驟,請參見配置日志
  • 消息轉儲時,云消息隊列 Kafka 版中消息用UTF-8 String序列化,暫不支持二進制的數據格式。
  • 如果Elasticsearch Sink Connector接入點是私網接入點,函數計算運行環境默認無法訪問,為確保網絡暢通,需在函數計算控制臺為函數服務配置與Elasticsearch一致的VPC和vSwitch信息。更多信息,請參見更新服務

創建并部署Elasticsearch Sink Connector

  1. 登錄云消息隊列 Kafka 版控制臺
  2. 概覽頁面的資源分布區域,選擇地域。
  3. 在左側導航欄,單擊Connector 任務列表
  4. Connector 任務列表頁面,從選擇實例的下拉列表選擇Connector所屬的實例,然后單擊創建 Connector
  5. 創建 Connector配置向導頁面,完成以下操作。
    1. 配置基本信息頁簽,按需配置以下參數,然后單擊下一步
      重要 云消息隊列 Kafka 版會為您自動選中授權創建服務關聯角色
      • 如果未創建服務關聯角色,云消息隊列 Kafka 版會為您自動創建一個服務關聯角色,以便您使用云消息隊列 Kafka 版導出數據至Elasticsearch的功能。
      • 如果已創建服務關聯角色,云消息隊列 Kafka 版不會重復創建。
      關于該服務關聯角色的更多信息,請參見服務關聯角色
      參數描述示例值
      名稱Connector的名稱。命名規則:
      • 可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字符。
      • 同一個云消息隊列 Kafka 版實例內保持唯一。

      Connector的數據同步任務必須使用名稱為connect-任務名稱Group。如果您未手動創建該Group,系統將為您自動創建。

      kafka-elasticsearch-sink
      實例默認配置為實例的名稱與實例ID。demo alikafka_post-cn-st21p8vj****
    2. 配置源服務頁簽,選擇數據源消息隊列Kafka版,并配置以下參數,然后單擊下一步
      參數描述示例值
      數據源 Topic需要同步數據的Topic。elasticsearch-test-input
      消費線程并發數數據源Topic的消費線程并發數。默認值為6。取值說明如下:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      消費初始位置開始消費的位置。取值說明如下:
      • 最早位點:從最初位點開始消費。
      • 最近位點:從最新位點開始消費。
      最早位點
      VPC ID數據同步任務所在的VPC。單擊配置運行環境顯示該參數。默認為云消息隊列 Kafka 版實例所在的VPC,您無需填寫。vpc-bp1xpdnd3l***
      vSwitch ID數據同步任務所在的交換機。單擊配置運行環境顯示該參數。該交換機必須與云消息隊列 Kafka 版實例處于同一VPC。默認為部署云消息隊列 Kafka 版實例時填寫的交換機。vsw-bp1d2jgg81***
      失敗處理消息發送失敗后,是否繼續訂閱出現錯誤的Topic的分區。單擊配置運行環境顯示該參數。取值說明如下:
      • 繼續訂閱:繼續訂閱出現錯誤的Topic的分區,并打印錯誤日志。
      • 停止訂閱:停止訂閱出現錯誤的Topic的分區,并打印錯誤日志。
      說明
      繼續訂閱
      創建資源方式選擇創建Connector所依賴的Topic與Group的方式。單擊配置運行環境顯示該參數。
      • 自動創建
      • 手動創建
      自動創建
      Connector 消費組Connector的數據同步任務使用的Group。單擊配置運行環境顯示該參數。該Group的名稱必須為connect-任務名稱connect-kafka-elasticsearch-sink
      任務位點 Topic用于存儲消費位點的Topic。單擊配置運行環境顯示該參數。
      • Topic:建議以connect-offset開頭。
      • 分區數:Topic的分區數量必須大于1。
      • 存儲引擎:Topic的存儲引擎必須為Local存儲。
        說明 僅專業版實例支持在創建Topic時選擇存儲引擎類型為Local存儲,標準版暫不支持。
      • cleanup.policy:Topic的日志清理策略必須為compact。
      connect-offset-kafka-elasticsearch-sink
      任務配置 Topic用于存儲任務配置的Topic。單擊配置運行環境顯示該參數。
      • Topic:建議以connect-config開頭。
      • 分區數:Topic的分區數量必須為1。
      • 存儲引擎:Topic的存儲引擎必須為Local存儲。
        說明 僅專業版實例支持在創建Topic時選擇存儲引擎類型為Local存儲,標準版暫不支持。
      • cleanup.policy:Topic的日志清理策略必須為compact。
      connect-config-kafka-elasticsearch-sink
      任務狀態 Topic用于存儲任務狀態的Topic。單擊配置運行環境顯示該參數。
      • Topic:建議以connect-status開頭。
      • 分區數:Topic的分區數量建議為6。
      • 存儲引擎:Topic的存儲引擎必須為Local存儲。
        說明 僅專業版實例支持在創建Topic時選擇存儲引擎類型為Local存儲,標準版暫不支持。
      • cleanup.policy:Topic的日志清理策略必須為compact。
      connect-status-kafka-elasticsearch-sink
      死信隊列 Topic用于存儲Connect框架的異常數據的Topic。單擊配置運行環境顯示該參數。該Topic可以和異常數據Topic為同一個Topic,以節省Topic資源。
      • Topic:建議以connect-error開頭。
      • 分區數:Topic的分區數量建議為6。
      • 存儲引擎:Topic的存儲引擎可以為Local存儲或云存儲。
        說明 僅專業版實例支持在創建Topic時選擇存儲引擎類型為Local存儲,標準版暫不支持。
      connect-error-kafka-elasticsearch-sink
      異常數據 Topic用于存儲Sink的異常數據的Topic。單擊配置運行環境顯示該參數。該Topic可以和死信隊列Topic為同一個Topic,以節省Topic資源。
      • Topic:建議以connect-error開頭。
      • 分區數:Topic的分區數量建議為6。
      • 存儲引擎:Topic的存儲引擎可以為Local存儲或云存儲。
        說明 僅專業版實例支持在創建Topic時選擇存儲引擎類型為Local存儲,標準版暫不支持。
      connect-error-kafka-elasticsearch-sink
    3. 配置目標服務頁簽,選擇目標服務Elasticsearch,并配置以下參數,然后單擊創建
      參數描述示例值
      ES 實例 ID阿里云Elasticsearch實例ID。es-cn-oew1o67x0000****
      接入地址阿里云Elasticsearch實例的公網或私網地址。更多信息,請參見查看實例的基本信息es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com
      接入端口訪問阿里云Elasticsearch的公網或私網端口,取值如下:
      • 9200:基于HTTP或HTTPS。
      • 9300:基于TCP。

      更多信息,請參見查看實例的基本信息

      9300
      用戶名登錄Kibana控制臺的用戶名,默認為elastic。您也可以創建自定義用戶。具體操作,請參見通過Elasticsearch X-Pack角色管理實現用戶權限管控elastic
      用戶密碼登錄Kibana控制臺的密碼。elastic用戶的密碼在創建實例時設定,如果忘記可重置。具體操作,請參見重置實例訪問密碼********
      索引阿里云Elasticsearch的索引名稱。elastic_test
      說明
      • 用戶名和用戶密碼會被用來初始化Elasticsearch對象,通過bulk投遞消息,請確認賬號對索引有寫權限。
      • 用戶名和用戶密碼是云消息隊列 Kafka 版創建任務時作為環境變量傳遞至函數計算的函數,任務創建成功后,云消息隊列 Kafka 版不保存相關信息。
      創建完成后,在Connector 任務列表頁面,查看創建的Connector 。
  6. 創建完成后,在Connector 任務列表頁面,找到創建的Connector ,單擊其操作列的部署

配置函數服務

您在云消息隊列 Kafka 版控制臺成功創建并部署Elasticsearch Sink Connector后,函數計算會自動為您創建給該Connector使用的函數服務,服務命名格式為kafka-service-<connector_name>-<隨機String>

  1. Connector 任務列表頁面,找到目標Connector,在其右側操作列,選擇更多 > 函數配置
    頁面跳轉至函數計算控制臺。
  2. 在函數計算控制臺,找到自動創建的函數服務,并配置其VPC和交換機信息。請確保該信息和您阿里云Elasticsearch相同。配置的具體步驟,請參見更新服務

發送測試消息

您可以向云消息隊列 Kafka 版的數據源Topic發送消息,測試數據能否被導出至阿里云Elasticsearch。

  1. Connector 任務列表頁面,找到目標Connector,在其右側操作列,單擊測試
  2. 發送消息面板,發送測試消息。
    • 發送方式選擇控制臺
      1. 消息 Key文本框中輸入消息的Key值,例如demo。
      2. 消息內容文本框輸入測試的消息內容,例如 {"key": "test"}。
      3. 設置發送到指定分區,選擇是否指定分區。
        • 單擊,在分區 ID文本框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態
        • 單擊,不指定分區。
    • 發送方式選擇Docker,執行運行 Docker 容器生產示例消息區域的Docker命令,發送消息。
    • 發送方式選擇SDK,根據您的業務需求,選擇需要的語言或者框架的SDK以及接入方式,通過SDK發送消息。

驗證結果

云消息隊列 Kafka 版的數據源Topic發送消息后,登錄Kibana控制臺,執行GET /<index_name>/_search查看索引,驗證數據導出結果。

云消息隊列 Kafka 版數據導出至Elasticsearch的格式示例如下:
{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "product_****",
        "_type" : "_doc",
        "_id" : "TX3TZHgBfHNEDGoZ****",
        "_score" : 1.0,
        "_source" : {
          "msg_body" : {
            "key" : "test",
            "offset" : 2,
            "overflowFlag" : false,
            "partition" : 2,
            "timestamp" : 1616599282417,
            "topic" : "dv****",
            "value" : "test1",
            "valueSize" : 8
          },
          "doc_as_upsert" : true
        }
      }
    ]
  }
}