將云消息隊列Kafka版接入阿里云Elasticsearch
隨著時間的積累,云消息隊列 Kafka 版中的日志數(shù)據(jù)會越來越多。當您需要查看并分析龐雜的日志數(shù)據(jù)時,可通過阿里云Logstash將云消息隊列 Kafka 版中的日志數(shù)據(jù)導入阿里云Elasticsearch,然后通過Kibana進行可視化展示與分析。本文介紹將云消息隊列 Kafka 版接入阿里云Elasticsearch的操作方法。
前提條件
在開始本教程前,請確保您已完成以下操作:
購買并部署云消息隊列 Kafka 版實例。具體信息,請參見VPC接入。
創(chuàng)建阿里云Elasticsearch實例。具體信息,請參見創(chuàng)建阿里云Elasticsearch實例。
重要請注意保存創(chuàng)建阿里云Elasticsearch實例時設置的用戶名及密碼。該用戶名及密碼將用于步驟五:創(chuàng)建索引、步驟六:創(chuàng)建管道和步驟七:搜索數(shù)據(jù)。
創(chuàng)建阿里云Logstash實例。具體信息,請參見創(chuàng)建阿里云Logstash實例。
背景信息
通過阿里云Logstash將數(shù)據(jù)從云消息隊列 Kafka 版導入阿里云Elasticsearch的過程如下圖所示。
云消息隊列 Kafka 版
云消息隊列 Kafka 版是阿里云提供的分布式、高吞吐、可擴展的消息隊列服務。云消息隊列 Kafka 版廣泛用于日志收集、監(jiān)控數(shù)據(jù)聚合、流式數(shù)據(jù)處理、在線和離線分析等大數(shù)據(jù)領域,已成為大數(shù)據(jù)生態(tài)中不可或缺的部分。更多信息,請參見什么是云消息隊列 Kafka 版?。
阿里云Elasticsearch
Elasticsearch簡稱ES,是一個基于Lucene的實時分布式的搜索與分析引擎,是遵從Apache開源條款的一款開源產(chǎn)品,是當前主流的企業(yè)級搜索引擎。它提供了一個分布式服務,可以使您快速的近乎于準實時的存儲、查詢和分析超大數(shù)據(jù)集,通常被用來作為構建復雜查詢特性和需求強大應用的基礎引擎或技術。阿里云Elasticsearch支持5.5.3、6.3.2、6.7.0、6.8.0和7.4.0版本,并提供了商業(yè)插件X-Pack服務,致力于數(shù)據(jù)分析、數(shù)據(jù)搜索等場景服務。在開源Elasticsearch的基礎上提供企業(yè)級權限管控、安全監(jiān)控告警、自動報表生成等功能。更多信息,請參見什么是阿里云Elasticsearch。
阿里云Logstash
阿里云Logstash作為服務器端的數(shù)據(jù)處理管道,提供了100%兼容開源的Logstash功能。Logstash能夠動態(tài)地從多個來源采集數(shù)據(jù)、轉換數(shù)據(jù),并且將數(shù)據(jù)存儲到所選擇的位置。通過輸入、過濾和輸出插件,Logstash可以對任何類型的事件加工和轉換。更多信息,請參見什么是阿里云Logstash。
步驟一:獲取VPC環(huán)境接入點
阿里云Logstash通過云消息隊列 Kafka 版的接入點與云消息隊列 Kafka 版在VPC環(huán)境下建立連接。
在概覽頁面的資源分布區(qū)域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在實例詳情頁面的接入點信息頁簽,獲取實例的VPC環(huán)境接入點。
步驟二:創(chuàng)建Topic
創(chuàng)建用于存儲消息的Topic。
在概覽頁面的資源分布區(qū)域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Topic 管理。
在Topic 管理頁面,單擊創(chuàng)建 Topic。
在創(chuàng)建 Topic面板,設置Topic屬性,然后單擊確定。
參數(shù)
說明
示例
名稱
Topic名稱。
demo
描述
Topic的簡單描述。
demo test
分區(qū)數(shù)
Topic的分區(qū)數(shù)量。
12
存儲引擎
說明當前僅專業(yè)版實例支持選擇存儲引擎類型,標準版暫不支持,默認選擇為云存儲類型。
Topic消息的存儲引擎。
云消息隊列 Kafka 版支持以下兩種存儲引擎。
云存儲:底層接入阿里云云盤,具有低時延、高性能、持久性、高可靠等特點,采用分布式3副本機制。實例的規(guī)格類型為標準版(高寫版)時,存儲引擎只能為云存儲。
Local 存儲:使用原生Kafka的ISR復制算法,采用分布式3副本機制。
云存儲
消息類型
Topic消息的類型。
普通消息:默認情況下,保證相同Key的消息分布在同一個分區(qū)中,且分區(qū)內(nèi)消息按照發(fā)送順序存儲。集群中出現(xiàn)機器宕機時,可能會造成消息亂序。當存儲引擎選擇云存儲時,默認選擇普通消息。
分區(qū)順序消息:默認情況下,保證相同Key的消息分布在同一個分區(qū)中,且分區(qū)內(nèi)消息按照發(fā)送順序存儲。集群中出現(xiàn)機器宕機時,仍然保證分區(qū)內(nèi)按照發(fā)送順序存儲。但是會出現(xiàn)部分分區(qū)發(fā)送消息失敗,等到分區(qū)恢復后即可恢復正常。當存儲引擎選擇Local 存儲時,默認選擇分區(qū)順序消息。
普通消息
日志清理策略
Topic日志的清理策略。
當存儲引擎選擇Local 存儲(當前僅專業(yè)版實例支持選擇存儲引擎類型為Local存儲,標準版暫不支持)時,需要配置日志清理策略。
云消息隊列 Kafka 版支持以下兩種日志清理策略。
Delete:默認的消息清理策略。在磁盤容量充足的情況下,保留在最長保留時間范圍內(nèi)的消息;在磁盤容量不足時(一般磁盤使用率超過85%視為不足),將提前刪除舊消息,以保證服務可用性。
Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保證相同Key的消息,最新的value值一定會被保留。主要適用于系統(tǒng)宕機后恢復狀態(tài),系統(tǒng)重啟后重新加載緩存等場景。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic存儲系統(tǒng)狀態(tài)信息或配置信息。
重要Compact Topic一般只用在某些生態(tài)組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的消息收發(fā)請勿為Topic設置該屬性。具體信息,請參見云消息隊列 Kafka 版Demo庫。
Compact
標簽
Topic的標簽。
demo
創(chuàng)建完成后,在Topic 管理頁面的列表中顯示已創(chuàng)建的Topic。
步驟三:發(fā)送消息
向創(chuàng)建的Topic發(fā)送消息。
在概覽頁面的資源分布區(qū)域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Topic 管理。
在Topic 管理頁面,單擊目標Topic名稱進入Topic 詳情頁面,然后單擊體驗發(fā)送消息。
在快速體驗消息收發(fā)面板,發(fā)送測試消息。
發(fā)送方式選擇控制臺。
在消息 Key文本框中輸入消息的Key值,例如demo。
在消息內(nèi)容文本框輸入測試的消息內(nèi)容,例如 {"key": "test"}。
設置發(fā)送到指定分區(qū),選擇是否指定分區(qū)。
單擊是,在分區(qū) ID文本框中輸入分區(qū)的ID,例如0。如果您需查詢分區(qū)的ID,請參見查看分區(qū)狀態(tài)。
單擊否,不指定分區(qū)。
根據(jù)界面提示信息,通過SDK訂閱消息,或者執(zhí)行Docker命令訂閱消息。
發(fā)送方式選擇Docker,運行Docker容器。
執(zhí)行運行 Docker 容器生產(chǎn)示例消息區(qū)域的Docker命令,發(fā)送消息。
執(zhí)行發(fā)送后如何消費消息?區(qū)域的Docker命令,訂閱消息。
發(fā)送方式選擇SDK,根據(jù)您的業(yè)務需求,選擇需要的語言或者框架的SDK以及接入方式,通過SDK體驗消息收發(fā)。
步驟四:創(chuàng)建Group
創(chuàng)建阿里云Elasticsearch所屬的Group。
在概覽頁面的資源分布區(qū)域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Group 管理。
在Group 管理頁面,單擊創(chuàng)建 Group。
在創(chuàng)建 Group面板的Group ID文本框輸入Group的名稱,在描述文本框簡要描述Group,并給Group添加標簽,單擊確定。
創(chuàng)建完成后,在Group 管理頁面的列表中顯示已創(chuàng)建的Group。
步驟五:創(chuàng)建索引
通過阿里云Elasticsearch創(chuàng)建索引,接收云消息隊列 Kafka 版的數(shù)據(jù)。
- 說明
Username為您創(chuàng)建阿里云Elasticsearch實例時設置的用戶名。
Password為您創(chuàng)建阿里云Elasticsearch實例時設置的密碼。
在Kibana控制臺的左側導航欄,單擊Dev Tools。
執(zhí)行以下命令創(chuàng)建索引。
PUT /elastic_test {}
步驟六:創(chuàng)建管道
通過阿里云Logstash創(chuàng)建管道。管道部署后,將源源不斷地從云消息隊列 Kafka 版導入數(shù)據(jù)進阿里云Elasticsearch。
登錄阿里云Elasticsearch控制臺的Logstash實例頁面。
在頂部菜單欄,選擇地域。
在Logstash實例頁面,單擊創(chuàng)建的實例。
在左側導航欄,單擊管道管理。
在管道列表區(qū)域,單擊創(chuàng)建管道。
在Config配置中,輸入配置。
配置示例如下。
input { kafka { bootstrap_servers => ["alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092"] group_id => "elastic_group" topics => ["elastic_test"] consumer_threads => 12 decorate_events => true } } output { elasticsearch { hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"] index => "elastic_test" password => "XXX" user => "elastic" } }
表 1. input參數(shù)說明
參數(shù)
描述
示例值
bootstrap_servers
云消息隊列 Kafka 版的VPC環(huán)境接入點。
alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
group_id
Group的名稱。
elastic_group
topics
Topic的名稱。
elastic_test
consumer_threads
消費線程數(shù)。建議與Topic的分區(qū)數(shù)保持一致。
12
decorate_events
是否包含消息元數(shù)據(jù)。默認值為false。
true
表 2. output參數(shù)說明
參數(shù)
描述
示例值
hosts
阿里云Elasticsearch服務的訪問地址。您可在阿里云Elasticsearch實例的基本信息頁面獲取。
http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200
index
索引的名稱。
elastic_test
password
訪問阿里云Elasticsearch服務的密碼。您在創(chuàng)建阿里云Elasticsearch實例時設置的密碼。
XXX
user
訪問阿里云Elasticsearch服務的用戶名。您在創(chuàng)建阿里云Elasticsearch實例時設置的用戶名。
elastic
在管道參數(shù)配置中,輸入配置信息,然后單擊保存并部署。
在提示對話框,單擊確認。
步驟七:搜索數(shù)據(jù)
您可以在Kibana控制臺搜索通過管道導入阿里云Elasticsearch的數(shù)據(jù),確認數(shù)據(jù)是否導入成功。
- 說明
Username為您創(chuàng)建阿里云Elasticsearch實例時設置的用戶名。
Password為您創(chuàng)建阿里云Elasticsearch實例時設置的密碼。
在Kibana控制臺的左側導航欄,單擊Dev Tools圖標。
執(zhí)行以下命令搜索數(shù)據(jù)。
GET /elastic_test/_search {}
返回結果如下。