隨著時間的積累,云消息隊列 Kafka 版中的日志數據會越來越多。當您需要查看并分析龐雜的日志數據時,可通過阿里云Logstash將云消息隊列 Kafka 版中的日志數據導入阿里云Elasticsearch,然后通過Kibana進行可視化展示與分析。本文介紹將云消息隊列 Kafka 版接入阿里云Elasticsearch的操作方法。
前提條件
在開始本教程前,請確保您已完成以下操作:
購買并部署云消息隊列 Kafka 版實例。具體信息,請參見VPC接入。
創建阿里云Elasticsearch實例。具體信息,請參見創建阿里云Elasticsearch實例。
創建阿里云Logstash實例。具體信息,請參見創建阿里云Logstash實例。
背景信息
通過阿里云Logstash將數據從云消息隊列 Kafka 版導入阿里云Elasticsearch的過程如下圖所示。
云消息隊列 Kafka 版
云消息隊列 Kafka 版是阿里云提供的分布式、高吞吐、可擴展的消息隊列服務。云消息隊列 Kafka 版廣泛用于日志收集、監控數據聚合、流式數據處理、在線和離線分析等大數據領域,已成為大數據生態中不可或缺的部分。更多信息,請參見什么是云消息隊列 Kafka 版?。
阿里云Elasticsearch
Elasticsearch簡稱ES,是一個基于Lucene的實時分布式的搜索與分析引擎,是遵從Apache開源條款的一款開源產品,是當前主流的企業級搜索引擎。它提供了一個分布式服務,可以使您快速的近乎于準實時的存儲、查詢和分析超大數據集,通常被用來作為構建復雜查詢特性和需求強大應用的基礎引擎或技術。阿里云Elasticsearch支持5.5.3、6.3.2、6.7.0、6.8.0和7.4.0版本,并提供了商業插件X-Pack服務,致力于數據分析、數據搜索等場景服務。在開源Elasticsearch的基礎上提供企業級權限管控、安全監控告警、自動報表生成等功能。更多信息,請參見什么是阿里云Elasticsearch。
阿里云Logstash
阿里云Logstash作為服務器端的數據處理管道,提供了100%兼容開源的Logstash功能。Logstash能夠動態地從多個來源采集數據、轉換數據,并且將數據存儲到所選擇的位置。通過輸入、過濾和輸出插件,Logstash可以對任何類型的事件加工和轉換。更多信息,請參見什么是阿里云Logstash。
步驟一:獲取VPC環境接入點
阿里云Logstash通過云消息隊列 Kafka 版的接入點與云消息隊列 Kafka 版在VPC環境下建立連接。
在概覽頁面的資源分布區域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在實例詳情頁面的接入點信息頁簽,獲取實例的VPC環境接入點。
步驟二:創建Topic
創建用于存儲消息的Topic。
在概覽頁面的資源分布區域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Topic 管理。
在Topic 管理頁面,單擊創建 Topic。
在創建 Topic面板,設置Topic屬性,然后單擊確定。
參數
說明
示例
名稱
Topic名稱。
demo
描述
Topic的簡單描述。
demo test
分區數
Topic的分區數量。
12
存儲引擎
說明當前僅專業版實例支持選擇存儲引擎類型,標準版暫不支持,默認選擇為云存儲類型。
Topic消息的存儲引擎。
云消息隊列 Kafka 版支持以下兩種存儲引擎。
云存儲:底層接入阿里云云盤,具有低時延、高性能、持久性、高可靠等特點,采用分布式3副本機制。實例的規格類型為標準版(高寫版)時,存儲引擎只能為云存儲。
Local 存儲:使用原生Kafka的ISR復制算法,采用分布式3副本機制。
云存儲
消息類型
Topic消息的類型。
普通消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,可能會造成消息亂序。當存儲引擎選擇云存儲時,默認選擇普通消息。
分區順序消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,仍然保證分區內按照發送順序存儲。但是會出現部分分區發送消息失敗,等到分區恢復后即可恢復正常。當存儲引擎選擇Local 存儲時,默認選擇分區順序消息。
普通消息
日志清理策略
Topic日志的清理策略。
當存儲引擎選擇Local 存儲(當前僅專業版實例支持選擇存儲引擎類型為Local存儲,標準版暫不支持)時,需要配置日志清理策略。
云消息隊列 Kafka 版支持以下兩種日志清理策略。
Delete:默認的消息清理策略。在磁盤容量充足的情況下,保留在最長保留時間范圍內的消息;在磁盤容量不足時(一般磁盤使用率超過85%視為不足),將提前刪除舊消息,以保證服務可用性。
Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保證相同Key的消息,最新的value值一定會被保留。主要適用于系統宕機后恢復狀態,系統重啟后重新加載緩存等場景。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic存儲系統狀態信息或配置信息。
重要Compact Topic一般只用在某些生態組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的消息收發請勿為Topic設置該屬性。具體信息,請參見云消息隊列 Kafka 版Demo庫。
Compact
標簽
Topic的標簽。
demo
創建完成后,在Topic 管理頁面的列表中顯示已創建的Topic。
步驟三:發送消息
向創建的Topic發送消息。
在概覽頁面的資源分布區域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Topic 管理。
在Topic 管理頁面,單擊目標Topic名稱進入Topic 詳情頁面,然后單擊體驗發送消息。
在快速體驗消息收發面板,發送測試消息。
發送方式選擇控制臺。
在消息 Key文本框中輸入消息的Key值,例如demo。
在消息內容文本框輸入測試的消息內容,例如 {"key": "test"}。
設置發送到指定分區,選擇是否指定分區。
單擊是,在分區 ID文本框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
單擊否,不指定分區。
根據界面提示信息,通過SDK訂閱消息,或者執行Docker命令訂閱消息。
發送方式選擇Docker,運行Docker容器。
執行運行 Docker 容器生產示例消息區域的Docker命令,發送消息。
執行發送后如何消費消息?區域的Docker命令,訂閱消息。
發送方式選擇SDK,根據您的業務需求,選擇需要的語言或者框架的SDK以及接入方式,通過SDK體驗消息收發。
步驟四:創建Group
創建阿里云Elasticsearch所屬的Group。
在概覽頁面的資源分布區域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Group 管理。
在Group 管理頁面,單擊創建 Group。
在創建 Group面板的Group ID文本框輸入Group的名稱,在描述文本框簡要描述Group,并給Group添加標簽,單擊確定。
創建完成后,在Group 管理頁面的列表中顯示已創建的Group。
步驟五:創建索引
通過阿里云Elasticsearch創建索引,接收云消息隊列 Kafka 版的數據。
- 說明
Username為您創建阿里云Elasticsearch實例時設置的用戶名。
Password為您創建阿里云Elasticsearch實例時設置的密碼。
在Kibana控制臺的左側導航欄,單擊Dev Tools。
執行以下命令創建索引。
PUT /elastic_test {}
步驟六:創建管道
通過阿里云Logstash創建管道。管道部署后,將源源不斷地從云消息隊列 Kafka 版導入數據進阿里云Elasticsearch。
登錄阿里云Elasticsearch控制臺的Logstash實例頁面。
在頂部菜單欄,選擇地域。
在Logstash實例頁面,單擊創建的實例。
在左側導航欄,單擊管道管理。
在管道列表區域,單擊創建管道。
在Config配置中,輸入配置。
配置示例如下。
input { kafka { bootstrap_servers => ["alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092"] group_id => "elastic_group" topics => ["elastic_test"] consumer_threads => 12 decorate_events => true } } output { elasticsearch { hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"] index => "elastic_test" password => "XXX" user => "elastic" } }
表 1. input參數說明
參數
描述
示例值
bootstrap_servers
云消息隊列 Kafka 版的VPC環境接入點。
alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
group_id
Group的名稱。
elastic_group
topics
Topic的名稱。
elastic_test
consumer_threads
消費線程數。建議與Topic的分區數保持一致。
12
decorate_events
是否包含消息元數據。默認值為false。
true
表 2. output參數說明
參數
描述
示例值
hosts
阿里云Elasticsearch服務的訪問地址。您可在阿里云Elasticsearch實例的基本信息頁面獲取。
http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200
index
索引的名稱。
elastic_test
password
訪問阿里云Elasticsearch服務的密碼。您在創建阿里云Elasticsearch實例時設置的密碼。
XXX
user
訪問阿里云Elasticsearch服務的用戶名。您在創建阿里云Elasticsearch實例時設置的用戶名。
elastic
在管道參數配置中,輸入配置信息,然后單擊保存并部署。
在提示對話框,單擊確認。
步驟七:搜索數據
您可以在Kibana控制臺搜索通過管道導入阿里云Elasticsearch的數據,確認數據是否導入成功。
- 說明
Username為您創建阿里云Elasticsearch實例時設置的用戶名。
Password為您創建阿里云Elasticsearch實例時設置的密碼。
在Kibana控制臺的左側導航欄,單擊Dev Tools圖標。
執行以下命令搜索數據。
GET /elastic_test/_search {}
返回結果如下。