MaxCompute支持將開源Logstash收集的日志數據寫入MaxCompute。您可以通過Logstash的輸出插件logstash-output-maxcompute
,將Logstash收集的日志數據使用MaxCompute流式數據通道(Streaming Tunnel)功能上傳到MaxCompute。
前提條件
在執行操作前請確認您已完成如下操作:
已安裝Logstash并創建Logstash日志收集實例。
更多信息,請參見Getting Started with Logstash。
已創建目標MaxCompute項目。
更多創建MaxCompute項目信息,請參見創建MaxCompute項目。
背景信息
Logstash是一個開源的服務器端數據處理管道,可以同時從多個數據源獲取數據,并對數據進行轉換,然后將轉換后的數據發送到用戶的目標“存儲端”。
您需要通過Logstash的logstash-output-maxcompute
插件,將Logstash收集的日志數據使用MaxCompute流式數據通道(Streaming Tunnel)功能上傳到MaxCompute。
logstash-output-maxcompute
插件基于Logstash v7.8.0版本開發,可以作為輸出端口。該插件的特點如下:
使用流式數據通道,避免通過批量數據通道導入產生的并發和小文件問題。
支持動態分區,可以根據Logstash解析的日志字段產生分區字段,能夠自動創建不存在的分區。
logstash-output-maxcompute
插件應用于如下場景:
需要收集的應用的日志格式在Logstash上有輸入插件支持或易于解析,例如NGINX日志。
希望根據日志內容自動創建并導入對應分區。
logstash-output-maxcompute
插件支持的數據類型為:STRING、BIGINT、DOUBLE、DATETIME和BOOLEAN。
日志中DATETIME類型的字段的格式將自動使用
ruby Time.parse
函數推斷。如果日志BOOLEAN字段滿足
.to_string().lowercase() == "true"
,則結果為True。其他任何值為False。
本文將以收集NGINX日志為例,介紹如何配置和使用插件。
步驟一:下載并安裝插件
您可以下載已安裝logstash-output-maxcompute
插件的Logstash實例Logstash實例下載鏈接,跳過安裝步驟執行下一步。如果需要自行安裝,請按照如下步驟操作:
下載
logstash-output-maxcompute
插件(logstash-output-maxcompute插件下載鏈接)并放置在Logstash的根目錄%logstash%
下。修改Logstash根目錄
%logstash%
下的Gemfile
文件,將source "https://rubygems.org"
替換為source 'https://gems.ruby-china.com'
。以Windows系統為例,在系統的命令行窗口,切換至Logstash的根目錄
%logstash%
下,執行如下命令安裝logstash-output-maxcompute
插件。bin\logstash-plugin install logstash-output-maxcompute-1.1.0.gem
當返回
Installation successful
提示信息時,表示插件安裝成功。可選:運行如下命令驗證安裝結果。
bin\logstash-plugin list maxcompute
說明Linux系統需要執行命令
bin/logstash-plugin list maxcompute
。如果安裝成功,會返回
logstash-output-maxcompute
信息。如果安裝失敗,解決方案請參見RubyGems。
步驟二:創建目標表
通過MaxCompute客戶端或其他可以運行MaxCompute SQL的工具執行如下命令,在目標MaxCompute項目中創建目標表,例如logstash_test_groknginx
。后續會將日志信息以日期為分區導入此表中。
create table logstash_test_groknginx(
clientip string,
remote_user string,
time datetime,
verb string,
uri string,
version string,
response string,
body_bytes bigint,
referrer string,
agent string
) partitioned by (pt string);
步驟三:編寫Logstash Pipeline配置文件
在Logstash的根目錄%logstash%
下創建配置文件pipeline.conf,并輸入如下內容:
input { stdin {} }
filter {
grok {
match => {
"message" => "%{IP:clientip} - (%{USER:remote_user}|-) \[%{HTTPDATE:httptimestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response} %{NUMBER:body_bytes} %{QS:referrer} %{QS:agent}"
}
}
date {
match => [ "httptimestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "timestamp"
}
}
output {
maxctunnel {
aliyun_access_id => "<your_accesskey_id>"
aliyun_access_key => "<your_accesskey_secret>"
aliyun_mc_endpoint => "<your_project_endpoint>"
project => "<your_project_name>"
table => "<table_name>"
partition => "pt=$<timestamp.strftime('%F')>"
value_fields => ["clientip", "remote_user", "timestamp", "verb", "request", "httpversion", "response", "bytes", "referrer", "agent"]
}
}
參數 | 說明 |
your_accesskey_id | 可以訪問目標MaxCompute項目的AccessKey ID。 |
your_accesskey_secret | AccessKey ID對應的AccessKey Secret。 |
your_project_endpoint | 目標MaxCompute項目所在區域的Endpoint信息。更多Endpoint信息,請參見Endpoint。 |
your_project_name | 目標MaxCompute項目的名稱。 |
table_name | 目標表的名稱,即步驟二中創建的表。 |
partition | 配置插件如何根據日志字段生成對應的分區信息。如果目標表有多個分區,需要指定到最后一級。配置格式如下:
|
partition_time_format | 可選。指定當一個字符串型的日期時間字段被分區信息引用時,該字段的源格式字符串。 在本例中,時間字段 即使未使用 如果不使用
|
value_fields | 指定目標表中的每個字段對應的日志字段,指定順序與表中字段的順序一致。 目標表字段的順序為 |
aliyun_mc_tunnel_endpoint | 可選。您可以通過此配置項強制指定Tunnel Endpoint,覆蓋自動路由機制。 |
retry_time | 失敗重試次數。當寫入MaxCompute失敗時,嘗試重新寫入的次數。默認值為3。 |
retry_interval | 失敗重試間隔。在兩次嘗試之間最少間隔的時間,單位為秒。默認值為1。 |
batch_size | 一次最多處理的日志條數。默認值為100。 |
batch_timeout | 寫入MaxCompute的超時時間,單位為秒。默認值為5。 |
在本配置文件中,指定的日志輸入為標準輸入(input { stdin {} })
。在實際應用場景中,您可以使用Logstash File輸入插件從本地硬盤中自動讀取NGINX日志。更多信息,請參見Logstash文檔。
步驟四:運行和測試
以Windows系統為例,在系統的命令行窗口,切換至Logstash的根目錄
%logstash%
下,執行如下命令啟動Logstash。bin\logstash -f pipeline.conf
返回
Successfully started Logstash API endpoint
信息時,Logstash啟動完畢。在系統的命令行窗口,粘貼如下日志樣例,并按下鍵盤上的Enter鍵。
1.1.1.1 - - [09/Jul/2020:01:02:03 +0800] "GET /masked/request/uri/1 HTTP/1.1" 200 143363 "-" "Masked UserAgent" - 0.095 0.071 2.2.2.2 - - [09/Jul/2020:04:05:06 +0800] "GET /masked/request/uri/2 HTTP/1.1" 200 143388 "-" "Masked UserAgent 2" - 0.095 0.072
返回
write .. records on partition .. completed
時,表示成功寫入MaxCompute。通過MaxCompute客戶端或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢數據寫入結果。
set odps.sql.allow.fullscan=true; select * from logstash_test_groknginx;
返回結果如下:
+--------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+ | clientip | remote_user | time | verb | uri | version | response | body_bytes | referrer | agent | pt | +------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+ | 1.1.1.1 | - | 2020-07-09 01:02:03 | GET | /masked/request/uri/1 | 1.1 | 200 | 0 | "-" | "Masked UserAgent" | 2020-02-10 | | 2.2.2.2 | - | 2020-07-09 04:05:06 | GET | /masked/request/uri/2 | 1.1 | 200 | 0 | "-" | "Masked UserAgent 2" | 2020-02-10 | +------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+ 2 records (at most 10000 supported) fetched by instance tunnel.