日志服務支持通過aliyun-log-flume插件與Flume進行對接,實現日志數據的寫入和消費。
背景信息
aliyun-log-flume是一個實現日志服務與Flume對接的插件,與Flume對接后,日志服務可以通過Flume與其它數據系統如HDFS、Kafka等對接。aliyun-log-flume提供Sink和Source實現日志服務與Flume的對接。
Sink:Flume讀取其他數據源的數據然后寫入日志服務。
Source:Flume消費日志服務的日志數據然后寫入其他系統。
更多信息,請參見aliyun-log-flume。
操作步驟
下載并安裝Flume。
更多信息,請參見Flume。
下載aliyun-log-flume插件,并將插件存放于***/flume/lib目錄下。
更多信息,請參見aliyun-log-flume-1.3.jar。
在***/flume/conf目錄下,創建配置文件flumejob.conf。
啟動Flume。
Sink
通過Sink將其他數據源的數據通過Flume寫入日志服務。目前支持兩種解析格式:
SIMPLE:將整個Flume Event作為一個字段寫入日志服務。
DELIMITED:將整個Flume Event作為被分隔符分隔的數據,根據配置的列名解析成對應的字段寫入日志服務。
Sink的配置如下:
參數 | 是否必須 | 說明 |
type | 是 | 默認配置為com.aliyun.Loghub.flume.sink.LoghubSink。 |
endpoint | 是 | Project的服務入口,例如 |
project | 是 | Project名稱。 |
logstore | 是 | Logstore名稱。 |
accessKeyId | 是 | 阿里云AccessKey ID,用于標識用戶。為保證賬號安全,建議您使用RAM用戶的AccessKey。如何獲取AccessKey,請參見訪問密鑰。 |
accessKey | 是 | 阿里云AccessKey Secret,用于驗證用戶的密鑰。為保證賬號安全,建議您使用RAM用戶的AccessKey。如何獲取AccessKey,請參見訪問密鑰。 |
batchSize | 否 | 每批次寫入日志服務的數據條數。默認為1000條。 |
maxBufferSize | 否 | 緩存隊列的大小。默認為1000條。 |
serializer | 否 | Event序列化格式。支持的模式如下:
|
columns | 否 | 當serializer為DELIMITED時,必須指定該字段列表,用半角逗號(,)分隔,順序與實際數據中的字段順序一致。 |
separatorChar | 否 | 當serializer為DELIMITED時,用于指定數據的分隔符,必須為單個字符。默認為英文逗號(,)。 |
quoteChar | 否 | 當serializer為DELIMITED時,用于指定引用符。默認為半角雙引號(")。 |
escapeChar | 否 | 當serializer為DELIMITED時,用于指定轉義字符。默認為半角雙引號(")。 |
useRecordTime | 否 | 用于設置是否使用數據中的timestamp字段作為日志時間。默認為false表示使用當前時間。 |
Sink配置示例請參見GitHub。
Source
通過Source將日志服務的日志數據通過Flume投遞到其他的數據源。目前支持兩種輸出格式。
DELIMITED:數據以分隔符日志的形式寫入Flume。
JSON:數據以JSON日志的形式寫入Flume。
Source配置如下:
參數 | 是否必須 | 說明 |
type | 是 | 默認配置為com.aliyun.Loghub.flume.source.LoghubSource。 |
endpoint | 是 | Project的服務入口,例如 |
project | 是 | Project名稱。 |
logstore | 是 | Logstore名稱。 |
accessKeyId | 是 | 阿里云AccessKey ID,用于標識用戶。為保證賬號安全,建議您使用RAM用戶的AccessKey。如何獲取AccessKey,請參見訪問密鑰。 |
accessKey | 是 | 阿里云AccessKey Secret,用于驗證用戶的密鑰。為保證賬號安全,建議您使用RAM用戶的AccessKey。如何獲取AccessKey,請參見訪問密鑰。 |
heartbeatIntervalMs | 否 | 客戶端和日志服務的心跳間隔,默認為30000毫秒。 |
fetchIntervalMs | 否 | 數據拉取間隔,默認為100毫秒。 |
fetchInOrder | 否 | 是否按順序消費。默認為false。 |
batchSize | 否 | 每批次讀取的數據條數,默認為100條。 |
consumerGroup | 否 | 讀取的消費組名稱。 |
initialPosition | 否 | 讀取數據的起點位置,支持begin,end,timestamp。默認為begin。 說明 如果服務端已經存在Checkpoint,會優先使用服務端的Checkpoint。 |
timestamp | 否 | 當initialPosition為timestamp時,必須指定時間戳,為Unix時間戳格式。 |
deserializer | 是 | Event反序列化格式,支持的模式如下:
|
columns | 否 | 當deserializer為DELIMITED時,必須指定字段列表,用半角逗號(,)分隔,順序與實際數據中的字段順序一致。 |
separatorChar | 否 | 當deserializer為DELIMITED時,用于指定數據的分隔符,必須為單個字符。默認為英文逗號(,)。 |
quoteChar | 否 | 當deserializer為DELIMITED時,用于指定引用符。默認為半角雙引號(")。 |
escapeChar | 否 | 當deserializer為DELIMITED時,用于指定轉義字符。默認為半角雙引號(")。 |
appendTimestamp | 否 | 當deserializer為DELIMITED時,用于設置是否將時間戳作為一個字段自動添加到每行末尾。默認為false。 |
sourceAsField | 否 | 當deserializer為JSON時,用于設置是否將日志Source作為一個字段,字段名稱為__source__。默認為false。 |
tagAsField | 否 | 當deserializer為JSON時,用于設置是否將日志Tag作為字段,字段名稱為__tag__:{tag名稱}。默認為false。 |
timeAsField | 否 | 當deserializer為JSON時,用于設置是否將日志時間作為一個字段,字段名稱為__time__。默認為false。 |
useRecordTime | 否 | 用于設置是否使用日志的時間,如果為false則使用當前時間。默認為false。 |
Source配置示例請參見GitHub。