當您需要將數據從云消息隊列 Kafka 版實例的數據源Topic導出至阿里云Elasticsearch的索引,您可以通過Elasticsearch Sink Connector實現。本文介紹如何創建Elasticsearch Sink Connector。
前提條件
在導出數據前,請確保您已完成以下操作:- 云消息隊列 Kafka 版
- 為云消息隊列 Kafka 版實例開啟Connector。更多信息,請參見開啟Connector。
- 為云消息隊列 Kafka 版實例創建數據源Topic。更多信息,請參見步驟一:創建Topic。
- 函數計算
- Elasticsearch
- 在Elasticsearch管理控制臺創建實例和索引。更多信息,請參見快速入門。
- 在Elasticsearch白名單配置中添加了函數計算服務地址所在的網段。更多信息,請參見配置實例公網或私網訪問白名單。
說明- 函數計算使用的Elasticsearch客戶端版本為7.7.0,為保持兼容,您需創建7.0或以上版本的Elasticsearch實例。
- 配置白名單時,您可以設置網段為0.0.0.0/0,代表整個VPC可訪問,訪問成功后根據需要修改為對應的網段。
注意事項
- 僅支持在同地域內,將數據從云消息隊列 Kafka 版實例的數據源Topic導出至函數計算,再由函數計算導出至Elasticsearch。關于Connector的限制說明,請參見使用限制。
- 該功能基于函數計算服務提供。函數計算為您提供了一定的免費額度,超額部分將產生費用,請以函數計算的計費規則為準。計費詳情,請參見計費概述。
- 函數計算的函數調用支持日志查詢,以便您迅速排查問題。具體操作步驟,請參見配置日志。
- 消息轉儲時,云消息隊列 Kafka 版中消息用UTF-8 String序列化,暫不支持二進制的數據格式。
- 如果Elasticsearch Sink Connector接入點是私網接入點,函數計算運行環境默認無法訪問,為確保網絡暢通,需在函數計算控制臺為函數服務配置與Elasticsearch一致的VPC和vSwitch信息。更多信息,請參見更新服務。
創建并部署Elasticsearch Sink Connector
- 登錄云消息隊列 Kafka 版控制臺。
- 在概覽頁面的資源分布區域,選擇地域。
- 在左側導航欄,單擊Connector 任務列表。
- 在Connector 任務列表頁面,從選擇實例的下拉列表選擇Connector所屬的實例,然后單擊創建 Connector。
- 在創建 Connector配置向導頁面,完成以下操作。
- 創建完成后,在Connector 任務列表頁面,找到創建的Connector ,單擊其操作列的部署。
配置函數服務
您在云消息隊列 Kafka 版控制臺成功創建并部署Elasticsearch Sink Connector后,函數計算會自動為您創建給該Connector使用的函數服務,服務命名格式為kafka-service-<connector_name>-<隨機String>
。
- 在Connector 任務列表頁面,找到目標Connector,在其右側操作列,選擇 。頁面跳轉至函數計算控制臺。
- 在函數計算控制臺,找到自動創建的函數服務,并配置其VPC和交換機信息。請確保該信息和您阿里云Elasticsearch相同。配置的具體步驟,請參見更新服務。
發送測試消息
您可以向云消息隊列 Kafka 版的數據源Topic發送消息,測試數據能否被導出至阿里云Elasticsearch。
- 在Connector 任務列表頁面,找到目標Connector,在其右側操作列,單擊測試。
- 在發送消息面板,發送測試消息。
- 發送方式選擇控制臺。
- 在消息 Key文本框中輸入消息的Key值,例如demo。
- 在消息內容文本框輸入測試的消息內容,例如 {"key": "test"}。
- 設置發送到指定分區,選擇是否指定分區。
- 單擊是,在分區 ID文本框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
- 單擊否,不指定分區。
- 發送方式選擇Docker,執行運行 Docker 容器生產示例消息區域的Docker命令,發送消息。
- 發送方式選擇SDK,根據您的業務需求,選擇需要的語言或者框架的SDK以及接入方式,通過SDK發送消息。
- 發送方式選擇控制臺。
驗證結果
向云消息隊列 Kafka 版的數據源Topic發送消息后,登錄Kibana控制臺,執行GET /<index_name>/_search
查看索引,驗證數據導出結果。
云消息隊列 Kafka 版數據導出至Elasticsearch的格式示例如下:
{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "product_****",
"_type" : "_doc",
"_id" : "TX3TZHgBfHNEDGoZ****",
"_score" : 1.0,
"_source" : {
"msg_body" : {
"key" : "test",
"offset" : 2,
"overflowFlag" : false,
"partition" : 2,
"timestamp" : 1616599282417,
"topic" : "dv****",
"value" : "test1",
"valueSize" : 8
},
"doc_as_upsert" : true
}
}
]
}
}