日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Logstash(流式數據傳輸)

MaxCompute支持將開源Logstash收集的日志數據寫入MaxCompute。您可以通過Logstash的輸出插件logstash-output-maxcompute,將Logstash收集的日志數據使用MaxCompute流式數據通道(Streaming Tunnel)功能上傳到MaxCompute。

前提條件

在執行操作前請確認您已完成如下操作:

背景信息

Logstash是一個開源的服務器端數據處理管道,可以同時從多個數據源獲取數據,并對數據進行轉換,然后將轉換后的數據發送到用戶的目標“存儲端”。

您需要通過Logstash的logstash-output-maxcompute插件,將Logstash收集的日志數據使用MaxCompute流式數據通道(Streaming Tunnel)功能上傳到MaxCompute。

logstash-output-maxcompute插件基于Logstash v7.8.0版本開發,可以作為輸出端口。該插件的特點如下:

  • 使用流式數據通道,避免通過批量數據通道導入產生的并發和小文件問題。

  • 支持動態分區,可以根據Logstash解析的日志字段產生分區字段,能夠自動創建不存在的分區。

logstash-output-maxcompute插件應用于如下場景:

  • 需要收集的應用的日志格式在Logstash上有輸入插件支持或易于解析,例如NGINX日志。

  • 希望根據日志內容自動創建并導入對應分區。

logstash-output-maxcompute插件支持的數據類型為:STRING、BIGINT、DOUBLE、DATETIME和BOOLEAN。

說明
  • 日志中DATETIME類型的字段的格式將自動使用ruby Time.parse函數推斷。

  • 如果日志BOOLEAN字段滿足.to_string().lowercase() == "true",則結果為True。其他任何值為False。

本文將以收集NGINX日志為例,介紹如何配置和使用插件。

步驟一:下載并安裝插件

您可以下載已安裝logstash-output-maxcompute插件的Logstash實例Logstash實例下載鏈接,跳過安裝步驟執行下一步。如果需要自行安裝,請按照如下步驟操作:

  1. 下載logstash-output-maxcompute插件(logstash-output-maxcompute插件下載鏈接)并放置在Logstash的根目錄%logstash%下。

  2. 修改Logstash根目錄%logstash%下的Gemfile文件,將source "https://rubygems.org"替換為source 'https://gems.ruby-china.com'

  3. 以Windows系統為例,在系統的命令行窗口,切換至Logstash的根目錄%logstash%下,執行如下命令安裝logstash-output-maxcompute插件。

    bin\logstash-plugin install logstash-output-maxcompute-1.1.0.gem

    當返回Installation successful提示信息時,表示插件安裝成功。安裝成功

  4. 可選:運行如下命令驗證安裝結果。

    bin\logstash-plugin list maxcompute
    說明

    Linux系統需要執行命令bin/logstash-plugin list maxcompute

    如果安裝成功,會返回logstash-output-maxcompute信息。如果安裝失敗,解決方案請參見RubyGems驗證結果

步驟二:創建目標表

通過MaxCompute客戶端或其他可以運行MaxCompute SQL的工具執行如下命令,在目標MaxCompute項目中創建目標表,例如logstash_test_groknginx。后續會將日志信息以日期為分區導入此表中。

create table logstash_test_groknginx(
 clientip string,
 remote_user string,
 time datetime,
 verb string,
 uri string,
 version string,
 response string,
 body_bytes bigint,
 referrer string,
 agent string
) partitioned by (pt string);

步驟三:編寫Logstash Pipeline配置文件

在Logstash的根目錄%logstash%下創建配置文件pipeline.conf,并輸入如下內容:

input { stdin {} }

filter {
        grok {
                match => {
                        "message" => "%{IP:clientip} - (%{USER:remote_user}|-) \[%{HTTPDATE:httptimestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response} %{NUMBER:body_bytes} %{QS:referrer} %{QS:agent}"
                }
        }
        date {
                match => [ "httptimestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
                target => "timestamp"
        }
}

output {
       maxctunnel {
                aliyun_access_id => "<your_accesskey_id>"
                aliyun_access_key => "<your_accesskey_secret>"
                aliyun_mc_endpoint => "<your_project_endpoint>"
                project => "<your_project_name>"
                table => "<table_name>"
                partition => "pt=$<timestamp.strftime('%F')>"
                value_fields => ["clientip", "remote_user", "timestamp", "verb", "request", "httpversion", "response", "bytes", "referrer", "agent"]
        }
}

參數

說明

your_accesskey_id

可以訪問目標MaxCompute項目的AccessKey ID。

your_accesskey_secret

AccessKey ID對應的AccessKey Secret。

your_project_endpoint

目標MaxCompute項目所在區域的Endpoint信息。更多Endpoint信息,請參見Endpoint

your_project_name

目標MaxCompute項目的名稱。

table_name

目標表的名稱,即步驟二中創建的表。

partition

配置插件如何根據日志字段生成對應的分區信息。如果目標表有多個分區,需要指定到最后一級。配置格式如下:

  • 如果某個分區的值為常量,格式為{分區列名}={常量值}

  • 如果某個分區的值為解析后的日志中一個字段的值,格式為{分區列名}=$<{日志字段名}>

  • 如果某個分區的值為解析后的日志中一個日期時間字段的值,并且需要進行重新格式化,格式為{分區列名}=$<{日志字段名}.strftime('{時間格式}')>。其中:{時間格式}是重新格式化的格式化字符串

    在本示例中,將格式化到僅保留日期(%F)。如果要按照日期date作為第一級分區,小時hour作為第二級分區,配置格式為"date=$<timestamp.strftime('%F')>,hour=$<timestamp.strftime('%H')>"

  • 多級分區之間用英文逗號(,)連接,分區指定的順序和建表時的順序必須一致。

partition_time_format

可選。指定當一個字符串型的日期時間字段被分區信息引用時,該字段的源格式字符串

在本例中,時間字段timestamp已經被date插件轉換為時間類型,因此無需指定。

即使未使用date過濾插件進行轉換,亦未指定此配置項的值,在大多數情況下插件仍然可以自動識別內容為日期時間的字符串,并自動完成需要的轉換。即只在少數自動識別失敗的情況下需要手動指定此項的值。

如果不使用date過濾插件,而是手動進行轉換,則需要配置如下信息:

  • 手動指定partition_time_formatpartition_time_format => "%d/%b/%Y:%H:%M:%S %z"

  • 將分區中引用的字段改為日志中的字符串字段:partition => "pt=$<httptimestamp.strftime('%F')>"

value_fields

指定目標表中的每個字段對應的日志字段,指定順序與表中字段的順序一致。

目標表字段的順序為clientip string、remote_user string、time datetime、verb string、uri string、version string、response string、body_bytes bigint、referrer string、agent string,依次對應"clientip"、"remote_user"、"timestamp"、"verb"、"request"、"httpversion"、"response"、"bytes"、"referrer"、"agent"

aliyun_mc_tunnel_endpoint

可選。您可以通過此配置項強制指定Tunnel Endpoint,覆蓋自動路由機制。

retry_time

失敗重試次數。當寫入MaxCompute失敗時,嘗試重新寫入的次數。默認值為3。

retry_interval

失敗重試間隔。在兩次嘗試之間最少間隔的時間,單位為秒。默認值為1。

batch_size

一次最多處理的日志條數。默認值為100。

batch_timeout

寫入MaxCompute的超時時間,單位為秒。默認值為5。

說明

在本配置文件中,指定的日志輸入為標準輸入(input { stdin {} })。在實際應用場景中,您可以使用Logstash File輸入插件從本地硬盤中自動讀取NGINX日志。更多信息,請參見Logstash文檔

步驟四:運行和測試

  1. 以Windows系統為例,在系統的命令行窗口,切換至Logstash的根目錄%logstash%下,執行如下命令啟動Logstash。

    bin\logstash -f pipeline.conf

    返回Successfully started Logstash API endpoint信息時,Logstash啟動完畢。啟動完畢

  2. 在系統的命令行窗口,粘貼如下日志樣例,并按下鍵盤上的Enter鍵。

    1.1.1.1 - - [09/Jul/2020:01:02:03 +0800] "GET /masked/request/uri/1 HTTP/1.1" 200 143363 "-" "Masked UserAgent" - 0.095 0.071
    2.2.2.2 - - [09/Jul/2020:04:05:06 +0800] "GET /masked/request/uri/2 HTTP/1.1" 200 143388 "-" "Masked UserAgent 2" - 0.095 0.072

    返回write .. records on partition .. completed時,表示成功寫入MaxCompute。寫入成功

  3. 通過MaxCompute客戶端或其他可以運行MaxCompute SQL的工具,執行如下命令,查詢數據寫入結果。

    set odps.sql.allow.fullscan=true;
    select * from logstash_test_groknginx;

    返回結果如下:

    +--------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
    | clientip   | remote_user | time       | verb       | uri        | version    | response   | body_bytes | referrer   | agent      | pt         |
    +------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
    | 1.1.1.1    | -           | 2020-07-09 01:02:03 | GET        | /masked/request/uri/1 | 1.1        | 200        | 0          | "-"        | "Masked UserAgent" | 2020-02-10       |
    | 2.2.2.2    | -           | 2020-07-09 04:05:06 | GET        | /masked/request/uri/2 | 1.1        | 200        | 0          | "-"        | "Masked UserAgent 2" | 2020-02-10       |
    +------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
    2 records (at most 10000 supported) fetched by instance tunnel.