logstash-input-sls插件是阿里云Logstash自帶的默認插件。作為Logstash的input插件,logstash-input-sls插件提供了從日志服務獲取日志的功能。
說明 logstash-input-sls是阿里云維護的開源插件,詳情請參見logstash-input-logservice。
功能特性
- 支持分布式協同消費:可配置多臺服務器同時消費一個Logstore服務。
說明 多臺Logstash服務器進行分布式協同消費時,由于logstash-input-sls插件限制,需保證各個服務器僅部署一個input sls管道。如果單個服務器中存在多個input sls管道,輸出端可能會出現數據重復的異常情況。
- 高性能:基于Java ConsumerGroup實現,單核消費速度可達20 MB/s。
- 高可靠:消費進度會被保存到服務端,宕機恢復時,會從上一次checkpoint處自動恢復。
- 自動負載均衡:根據消費者數量自動分配shard,消費者增加或退出后會自動進行負載均衡。
前提條件
您已完成以下操作:
- 安裝logstash-input-sls插件。
具體操作步驟請參見安裝或卸載插件。
- 創建日志服務項目和Logstore,并采集數據。
具體操作步驟請參見日志服務快速入門教程。
使用logstash-input-sls插件
滿足以上前提條件后,您可以通過配置文件管理管道的方式創建管道任務。在創建管道任務時,按照以下說明配置管道參數。配置完成后進行保存與部署,即可觸發Logstash從日志服務獲取日志。
注意 RAM用戶使用logstash-input-sls插件前,還需要在日志服務側設置消費組相關的權限策略,詳細信息請參見RAM用戶授權。
以使用阿里云Logstash消費某一個Logstore,并將日志輸出到阿里云Elasticsearch為例,配置示例如下。
input {
logservice{
endpoint => "your project endpoint"
access_id => "your access id"
access_key => "your access key"
project => "your project name"
logstore => "your logstore name"
consumer_group => "consumer group name"
consumer_name => "consumer name"
position => "end"
checkpoint_second => 30
include_meta => true
consumer_name_with_ip => true
}
}
output {
elasticsearch {
hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
index => "<your_index>"
user => "elastic"
password => "changeme"
}
}
假設某Logstore有10個shard,每個shard的數據流量1 MB/s;每臺阿里云Logstash機器處理的能力為3 MB/s,可分配5臺阿里云Logstash服務器,每個服務器創建一個input sls管道;并且每個服務器管道設置相同的consumer_group和consumer_name,將consumer_name_with_ip字段設置為true。
這種情況每臺服務器會分配到2個shard,分別處理2 MB/s的數據。
參數說明
logstash-input-sls支持的參數如下。
參數名 | 參數類型 | 是否必填 | 說明 |
---|---|---|---|
endpoint | String | 是 | VPC網絡下的日志服務項目的Endpoint,詳情請參見經典網絡及VPC網絡服務入口。 |
access_id | String | 是 | 阿里云Access Key ID,需要具備ConsumerGroup相關權限,詳情請參見使用消費組消費。 |
access_key | String | 是 | 阿里云Access Key Secret,需要具備ConsumerGroup相關權限,詳情請參見使用消費組消費。 |
project | String | 是 | 日志服務項目名。 |
logstore | String | 是 | 日志服務日志庫名。 |
consumer_group | String | 是 | 自定義消費組名。 |
consumer_name | String | 是 | 自定義消費者名。同一個消費組內消費者名不能重復,否則會出現未定義行為。 |
position | String | 是 | 消費位置,可選:
|
checkpoint_second | Number | 否 | 每隔幾秒checkpoint一次,建議10~60秒,不能低于10秒,默認30秒。 |
include_meta | Boolean | 否 | 傳入日志是否包含Meta,Meta包括日志source、time、tag以及topic,默認為true。 |
consumer_name_with_ip | Boolean | 否 | 消費者名是否包含IP地址,默認為true。分布式協同消費下必須設置為true。 |
性能基準測試信息
- 測試環境
- 處理器:Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz,4 Core
- 內存:8 GB
- 環境:Linux
- 阿里云Logstash配置
input { logservice{ endpoint => "cn-hangzhou-intranet.log.aliyuncs.com" access_id => "***" access_key => "***" project => "test-project" logstore => "logstore1" consumer_group => "consumer_group1" consumer_name => "consumer1" position => "end" checkpoint_second => 30 include_meta => true consumer_name_with_ip => true } } output { elasticsearch { hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"] index => "myindex" user => "elastic" password => "changeme" } }
- 測試過程
- 使用Java Producer向Logstore發送數據,每秒分別發送2 MB、4 MB、8 MB、16 MB、32 MB數據。
每條日志約500字節,包括10個Key和Value對。
- 啟動阿里云Logstash消費Logstore中的數據,并確保消費延遲沒有上漲(消費速度能夠跟上生產的速度)。
- 使用Java Producer向Logstore發送數據,每秒分別發送2 MB、4 MB、8 MB、16 MB、32 MB數據。
- 測試結果
流量(MB/S) CPU使用率(%) 內存占用量(GB) 32 170.3 1.3 16 83.3 1.3 8 41.5 1.3 4 21.0 1.3 2 11.3 1.3