通過logstash-input-datahub插件,您可以讀取DataHub中的數據到其他數據源中。本文介紹如何使用logstash-input-datahub插件。

前提條件

您已完成以下操作:

  • 安裝logstash-input-datahub插件。

    具體操作,請參見安裝或卸載插件

  • 開通DataHub產品,并完成創建項目、創建Topic和導入數據。

    具體操作,請參見快速入門

使用logstash-input-datahub插件

參見通過配置文件管理管道,在創建管道任務時,按照以下說明配置Pipeline參數,保存并部署后,即可觸發阿里云Logstash讀取DataHub的數據到目標數據庫中。

Logstash的Pipeline配置如下,相關參數說明請參見參數說明

input {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "test_project"
        topic_name => "test_topic"
        interval => 5
        #cursor => {
        #    "0"=>"20000000000000000000000003110091"
        #    "2"=>"20000000000000000000000003110091"
        #    "1"=>"20000000000000000000000003110091"
        #    "4"=>"20000000000000000000000003110091"
        #    "3"=>"20000000000000000000000003110000"
        #}
        shard_ids => []
        pos_file => "/ssd/1/<Logstash實例ID>/logstash/data/文件名"
    }
}
output {
    elasticsearch {
      hosts => ["http://es-cn-mp91cbxsm000c****.elasticsearch.aliyuncs.com:9200"]
      user => "elastic"
      password => "your_password"
      index => "datahubtest"
      document_type => "_doc"
  }
}
注意 目前阿里云Logstash只支持同一專有網絡下的數據傳輸,如果源端數據在公網環境下,請參見配置NAT公網數據傳輸,通過公網訪問Logstash。

參數說明

logstash-input-datahub插件支持的參數如下。
參數 類型 是否必選 說明
endpoint string DataHub對外服務的訪問域名,詳細信息請參見域名列表
access_id string 阿里云賬號的AccessKey ID。
access_key string 阿里云賬號的Access Key Secret。
project_name string DataHub的項目名稱。
topic_name string DataHub的Topic名稱。
retry_times number 重試次數。-1表示無限重試(默認)、0表示不重試、大于0表示按照設置的次數重試。
retry_interval number 重試的間隔,單位為秒。
shard_ids array 需要消費的shard列表。默認為空,表示全部消費。
cursor string 消費起點。默認為空,表示從頭開始消費。
pos_file string Checkpoint記錄文件,必須配置,優先使用checkpoint恢復消費。
enable_pb boolean 是否使用pb傳輸,默認為true。如果不支持pb傳輸,請將該參數設置為false。
compress_method string 網絡傳輸的壓縮算法,默認不壓縮。可選項:lz4deflate
print_debug_info boolean 是否打印DataHub的Debug信息,默認為false。設置為true時,會打印大量信息,這些信息僅用來進行腳本調試。