通過logstash-input-datahub插件,您可以讀取DataHub中的數據到其他數據源中。本文介紹如何使用logstash-input-datahub插件。
前提條件
您已完成以下操作:
使用logstash-input-datahub插件
參見通過配置文件管理管道,在創建管道任務時,按照以下說明配置Pipeline參數,保存并部署后,即可觸發阿里云Logstash讀取DataHub的數據到目標數據庫中。
Logstash的Pipeline配置如下,相關參數說明請參見參數說明。
input {
datahub {
access_id => "Your accessId"
access_key => "Your accessKey"
endpoint => "Endpoint"
project_name => "test_project"
topic_name => "test_topic"
interval => 5
#cursor => {
# "0"=>"20000000000000000000000003110091"
# "2"=>"20000000000000000000000003110091"
# "1"=>"20000000000000000000000003110091"
# "4"=>"20000000000000000000000003110091"
# "3"=>"20000000000000000000000003110000"
#}
shard_ids => []
pos_file => "/ssd/1/<Logstash實例ID>/logstash/data/文件名"
}
}
output {
elasticsearch {
hosts => ["http://es-cn-mp91cbxsm000c****.elasticsearch.aliyuncs.com:9200"]
user => "elastic"
password => "your_password"
index => "datahubtest"
document_type => "_doc"
}
}
注意 目前阿里云Logstash只支持同一專有網絡下的數據傳輸,如果源端數據在公網環境下,請參見配置NAT公網數據傳輸,通過公網訪問Logstash。
參數說明
logstash-input-datahub插件支持的參數如下。
參數 | 類型 | 是否必選 | 說明 |
---|---|---|---|
endpoint | string | 是 | DataHub對外服務的訪問域名,詳細信息請參見域名列表。 |
access_id | string | 是 | 阿里云賬號的AccessKey ID。 |
access_key | string | 是 | 阿里云賬號的Access Key Secret。 |
project_name | string | 是 | DataHub的項目名稱。 |
topic_name | string | 是 | DataHub的Topic名稱。 |
retry_times | number | 否 | 重試次數。-1表示無限重試(默認)、0表示不重試、大于0表示按照設置的次數重試。 |
retry_interval | number | 否 | 重試的間隔,單位為秒。 |
shard_ids | array | 否 | 需要消費的shard列表。默認為空,表示全部消費。 |
cursor | string | 否 | 消費起點。默認為空,表示從頭開始消費。 |
pos_file | string | 是 | Checkpoint記錄文件,必須配置,優先使用checkpoint恢復消費。 |
enable_pb | boolean | 否 | 是否使用pb傳輸,默認為true。如果不支持pb傳輸,請將該參數設置為false。 |
compress_method | string | 否 | 網絡傳輸的壓縮算法,默認不壓縮。可選項:lz4、deflate。 |
print_debug_info | boolean | 否 | 是否打印DataHub的Debug信息,默認為false。設置為true時,會打印大量信息,這些信息僅用來進行腳本調試。 |