logstash-input-sls插件是阿里云Logstash自帶的默認插件。作為Logstash的input插件,logstash-input-sls插件提供了從日志服務獲取日志的功能。

說明 logstash-input-sls是阿里云維護的開源插件,詳情請參見logstash-input-logservice

功能特性

  • 支持分布式協同消費:可配置多臺服務器同時消費一個Logstore服務。
    說明 多臺Logstash服務器進行分布式協同消費時,由于logstash-input-sls插件限制,需保證各個服務器僅部署一個input sls管道。如果單個服務器中存在多個input sls管道,輸出端可能會出現數據重復的異常情況。
  • 高性能:基于Java ConsumerGroup實現,單核消費速度可達20 MB/s。
  • 高可靠:消費進度會被保存到服務端,宕機恢復時,會從上一次checkpoint處自動恢復。
  • 自動負載均衡:根據消費者數量自動分配shard,消費者增加或退出后會自動進行負載均衡。

前提條件

您已完成以下操作:

使用logstash-input-sls插件

滿足以上前提條件后,您可以通過配置文件管理管道的方式創建管道任務。在創建管道任務時,按照以下說明配置管道參數。配置完成后進行保存與部署,即可觸發Logstash從日志服務獲取日志。
注意 RAM用戶使用logstash-input-sls插件前,還需要在日志服務側設置消費組相關的權限策略,詳細信息請參見RAM用戶授權

以使用阿里云Logstash消費某一個Logstore,并將日志輸出到阿里云Elasticsearch為例,配置示例如下。

input {
 logservice{
  endpoint => "your project endpoint"
  access_id => "your access id"
  access_key => "your access key"
  project => "your project name"
  logstore => "your logstore name"
  consumer_group => "consumer group name"
  consumer_name => "consumer name"
  position => "end"
  checkpoint_second => 30
  include_meta => true
  consumer_name_with_ip => true
  }
}

output {
  elasticsearch {
    hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
    index => "<your_index>"
    user => "elastic"
    password => "changeme"
  }
}

假設某Logstore有10個shard,每個shard的數據流量1 MB/s;每臺阿里云Logstash機器處理的能力為3 MB/s,可分配5臺阿里云Logstash服務器,每個服務器創建一個input sls管道;并且每個服務器管道設置相同的consumer_groupconsumer_name,將consumer_name_with_ip字段設置為true。

這種情況每臺服務器會分配到2個shard,分別處理2 MB/s的數據。

參數說明

logstash-input-sls支持的參數如下。

參數名 參數類型 是否必填 說明
endpoint String VPC網絡下的日志服務項目的Endpoint,詳情請參見經典網絡及VPC網絡服務入口
access_id String 阿里云Access Key ID,需要具備ConsumerGroup相關權限,詳情請參見使用消費組消費
access_key String 阿里云Access Key Secret,需要具備ConsumerGroup相關權限,詳情請參見使用消費組消費
project String 日志服務項目名。
logstore String 日志服務日志庫名。
consumer_group String 自定義消費組名。
consumer_name String 自定義消費者名。同一個消費組內消費者名不能重復,否則會出現未定義行為。
position String 消費位置,可選:
  • begin:從日志庫寫入的第一條數據開始消費。
  • end:從當前時間點開始消費。
  • yyyy-MM-dd HH:mm:ss:從指定時間點開始消費。
checkpoint_second Number 每隔幾秒checkpoint一次,建議10~60秒,不能低于10秒,默認30秒。
include_meta Boolean 傳入日志是否包含Meta,Meta包括日志source、time、tag以及topic,默認為true。
consumer_name_with_ip Boolean 消費者名是否包含IP地址,默認為true。分布式協同消費下必須設置為true。

性能基準測試信息

  • 測試環境
    • 處理器:Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz,4 Core
    • 內存:8 GB
    • 環境:Linux
  • 阿里云Logstash配置
    input {
      logservice{
      endpoint => "cn-hangzhou-intranet.log.aliyuncs.com"
      access_id => "***"
      access_key => "***"
      project => "test-project"
      logstore => "logstore1"
      consumer_group => "consumer_group1"
      consumer_name => "consumer1"
      position => "end"
      checkpoint_second => 30
      include_meta => true
      consumer_name_with_ip => true
      }
    }
    output {
      elasticsearch {
        hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
        index => "myindex"
        user => "elastic"
        password => "changeme"
      }
    }
  • 測試過程
    1. 使用Java Producer向Logstore發送數據,每秒分別發送2 MB、4 MB、8 MB、16 MB、32 MB數據。

      每條日志約500字節,包括10個Key和Value對。

    2. 啟動阿里云Logstash消費Logstore中的數據,并確保消費延遲沒有上漲(消費速度能夠跟上生產的速度)。
  • 測試結果
    流量(MB/S) CPU使用率(%) 內存占用量(GB)
    32 170.3 1.3
    16 83.3 1.3
    8 41.5 1.3
    4 21.0 1.3
    2 11.3 1.3