Flume插件
產(chǎn)品介紹
Flume-DataHub插件是基于Flume開發(fā)的DataHub數(shù)據(jù)訂閱/發(fā)布插件,可以將采集到的數(shù)據(jù)寫入DataHub,也可以從DataHub讀取數(shù)據(jù)寫入其他系統(tǒng)。該插件遵守Flume插件開發(fā)規(guī)范,安裝方便,可以很方便的向DataHub發(fā)布/訂閱數(shù)據(jù)。
產(chǎn)品安裝
安裝限制
JDK版本 >= 1.8
Apache Maven 3.x
Flume-NG 1.x
下載flume(如已下載請?zhí)^)
首先下載flume。
$ tar zxvf apache-flume-1.11.0-bin.tar.gz
方便起見,以下介紹以${FLUME_HOME}表示Flume主目錄位置。
安裝flume-datahub
直接安裝(推薦)
首先下載flume-datahub插件,下載鏈接 。(歷史版本可在文末進(jìn)行下載)
解壓flume插件并放在${FLUME_HOME}/plugins.d目錄下
$ tar aliyun-flume-datahub-sink-x.x.x.tar.gz $ cd aliyun-flume-datahub-sink-x.x.x $ mkdir ${FLUME_HOME}/plugins.d $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
源碼安裝
首先下載源碼,github地址。
編譯并安裝
$ cd aliyun-maxcompute-data-collectors $ mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true $ cd flume-plugin/target $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
參數(shù)介紹
sink 參數(shù)介紹
名稱 | 默認(rèn)值 | 是否必須 | 描述 |
datahub.endPoint | - | 必須 | 阿里云datahub的服務(wù)地址 |
datahub.accessId | - | 必須 | 阿里云access id |
datahub.accessKey | - | 必須 | 阿里云access key |
datahub.project | - | 必須 | datahub項目名稱 |
datahub.topic | - | 必須 | datahub topic名稱 |
datahub.shard.ids | 所有shard | 可選 | 寫入datahub的指定shard列表,以”,”分隔,例如 0,1,2。每次從shard列表隨機(jī)選擇一個shard寫入DataHub。在發(fā)生shard分裂或者合并時,如果沒有指定該參數(shù),那么shard分裂或者合并后,flume會自動調(diào)整shard列表,否則需要用戶手動修改配置文件。 |
datahub.enablePb | true | 可選 | 是否使用pb傳輸,部分專有云不支持需要手動設(shè)置為false |
datahub.compressType | none | 可選 | 是否壓縮傳輸,目前支持 LZ4, DEFLATE |
datahub.batchSize | 1000 | 可選 | datahub每次發(fā)送的最大數(shù)據(jù)量 |
datahub.maxBufferSize | 2*1024*1024 | 可選 | datahub單次請求寫入數(shù)據(jù)量的最大值(單位:Byte)。不建議修改該參數(shù),單次寫入數(shù)據(jù)量過大可能寫入失敗 |
datahub.batchTimeout | 5 | 可選 | 如果數(shù)據(jù)量沒有達(dá)到batchSize,向datahub同步數(shù)據(jù)之前等待的時間(單位:秒) |
datahub.retryTimes | 3 | 可選 | 數(shù)據(jù)同步失敗重試次數(shù) |
datahub.retryInterval | 5 | 可選 | 數(shù)據(jù)同步失敗重試間隔(單位:秒) |
datahub.dirtyDataContinue | true | 可選 | 遇到臟數(shù)據(jù)是否繼續(xù)處理,為true時,會自動將臟數(shù)據(jù)以,分隔符寫入臟數(shù)據(jù)文件,不影響后續(xù)數(shù)據(jù)的處理 |
datahub.dirtyDataFile | DataHub-Flume-dirty-file | 可選 | 臟數(shù)據(jù)文件 |
serializer | - | 必須 | 數(shù)據(jù)解析方式,目前支持DELIMITED(分,JSON(每行為單層Json)和REGEX(正則表達(dá)式) |
serializer.delimiter | , | 可選 | 數(shù)據(jù)字段分割符,如果要使用特殊字符需要添加雙引號,例如”\t” |
serializer.regex | (.*) | 可選 | 數(shù)據(jù)解析的正則表達(dá)式,每個字段的數(shù)據(jù)被解析成一個group |
serializer.fieldnames | - | 必須 | 輸入數(shù)據(jù)字段到datahub字段的映射,以輸入的順序標(biāo)示字段,如果要跳過某個字段, 不指定列名即可,例如 c1,c2,,c3,表示將輸入數(shù)據(jù)的第一、二、四字段和datahub的c1,c2,c3字段進(jìn)行匹配。 |
serializer.charset | UTF-8 | 可選 | 數(shù)據(jù)解析編碼格式 |
Source 參數(shù)
名稱 | 默認(rèn)值 | 是否必須 | 描述 |
datahub.endPoint | - | 必須 | 阿里云datahub的服務(wù)地址 |
datahub.accessId | - | 必須 | 阿里云access id |
datahub.accessKey | - | 必須 | 阿里云access key |
datahub.project | - | 必須 | datahub項目名稱 |
datahub.topic | - | 必須 | datahub topic名稱 |
datahub.subId | - | 必須 | datahub 訂閱 id |
datahub.startTime | - | 可選 | datahub 指定時間點進(jìn)行讀數(shù)據(jù),格式為yyyy-MM-dd HH:mm:ss,使用該參數(shù)會首先重置訂閱,然后根據(jù)訂閱讀取數(shù)據(jù)。 |
datahub.shard.ids | - | 可選 | 讀取datahub的指定shard列表,以”,”分隔,例如 0,1,2。每次讀數(shù)據(jù)時會從shard列表隨機(jī)選擇一個shard進(jìn)行消費。如不指定,則采用協(xié)同消費進(jìn)行數(shù)據(jù)讀取。不建議使用該參數(shù),如果配置了多個source的情況下,不指定該參數(shù)時,協(xié)同消費會自動分配shard,盡可能保證每個source負(fù)載均衡。 |
datahub.enablePb | true | 可選 | 是否使用pb傳輸,部分專有云不支持需要手動設(shè)置為false |
datahub.compressType | none | 可選 | 是否壓縮傳輸,目前支持 LZ4, DEFLATE |
datahub.batchSize | 1000 | 可選 | DataHub每次讀取的最大數(shù)據(jù)量 |
datahub.batchTimeout | 5 | 可選 | 如果數(shù)據(jù)量沒有達(dá)到batchSize,向datahub同步數(shù)據(jù)之前等待的時間(單位:秒) |
datahub.retryTimes | 3 | 可選 | 數(shù)據(jù)讀取失敗重試次數(shù),重試間隔默認(rèn)為1S,不可調(diào)整 |
datahub.autoCommit | true | 可選 | 設(shè)為true表示由consumer自動提交點位,可能發(fā)生數(shù)據(jù)未消費但是點位被提交的可能,修改為false表示數(shù)據(jù)被提交到flume channel之后才會提交該點位 |
datahub.offsetCommitTimeout | 30 | 可選 | 自動提交點位時間間隔(單位:秒) |
datahub.sessionTimeout | 60 | 可選 | source功能采取協(xié)同消費實現(xiàn),協(xié)同消費超時沒有發(fā)送心跳,則session會自動關(guān)閉 |
serializer | - | 必須 | 數(shù)據(jù)解析方式,目前支持DELIMITED(分隔符),數(shù)據(jù)的每個字段將會以datahub schema順序?qū)懗梢恍校⒁詃elimiter進(jìn)行分隔 |
serializer.delimiter | , | 可選 | 數(shù)據(jù)字段分割符,如果要使用特殊字符需要添加雙引號,例如”\t” |
serializer.charset | UTF-8 | 可選 | 數(shù)據(jù)解析編碼格式 |
Sink 使用案例
案例一:DELIMITED serializer
DELIMITED解析數(shù)據(jù)時將每一行作為一條Record,并以給定的分隔符對數(shù)據(jù)進(jìn)行解析。下面以csv文件為例,說明如何使用flume將批量csv文件準(zhǔn)時時上傳到DataHub。
數(shù)據(jù)文件
將以下內(nèi)容保存在本地文件/temp/test.csv中。
0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289
DataHub Topic Schema
以上數(shù)據(jù)對應(yīng)的DataHub schme為:
字段名稱 | 字段類型 |
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
decimal | DECIMAL |
flume 配置文件
在目錄 ${FLUME_HOME}/conf 下創(chuàng)建文件名為datahub_basic.conf的文件,然后將以下內(nèi)容寫入文件。本實例采用Exec Source作為數(shù)據(jù)源,更多Source可以參考Flume官方文檔。注意:ExecSource源可能發(fā)生數(shù)據(jù)丟失,因為EeecSource無法保證將事件放入Channel,在這種情況下,數(shù)據(jù)將丟失。例如,tail命令獲取數(shù)據(jù)時,此時flume channel已滿,而這部分?jǐn)?shù)據(jù)將會丟失。建議使用Spooling Directory Source或者Taildir Source,下方有Taildir Source案例介紹。這里將靜態(tài)文件/temp/test.csv作為數(shù)據(jù)源,如果文件為動態(tài)寫入的日志文件,可使用命令tail -F logFile
進(jìn)行實時采集。
# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /temp/test.csv
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
a1.sinks.k1.datahub.enablePb = true
a1.sinks.k1.datahub.compressType = DEFLATE
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動Flume
Dflume.root.logger=INFO,console選項可以將日志實時輸出到控制臺,如需更多信息可采用DEBUG模式。使用如下命令啟動Flume,即可完成CSV文件數(shù)據(jù)采集進(jìn)入DataHub:
$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console
案例二:REGEX serializer
REGEX解析數(shù)據(jù)時將每一行作為一條Record,并以給定的正則表達(dá)式對數(shù)據(jù)進(jìn)行解析,一條Record的信息的多個內(nèi)容以分組表示。下面以日志文件為例,說明flume如何利用正則表達(dá)式準(zhǔn)時時上傳到DataHub。
數(shù)據(jù)文件
將以下內(nèi)容保存在本地文件/temp/test.csv中。其中需要同步的數(shù)據(jù)內(nèi)容為日期后面的詳細(xì)數(shù)據(jù)。
1. [2019-11-12 15:20:08] 0,j4M6PhzL1DXVTQawdfk306N2KnCDxtR0KK1pke5O,true,1254409.5059812006,1573543208698,1254409.5059819978
2. [2019-11-12 15:22:35] 0,mYLF8UzIYCCFUm1jYs9wzd2Hl6IMr2N7GPYXZSZy,true,1254409.5645912462,1573543355740,1254409.5645920434
3. [2019-11-12 15:23:14] 0,MOemUZur37n4SGtdUQyMohgmM6cxZRBXjJ34HzqX,true,1254409.5799291395,1573543394219,1254409.579929538
4. [2019-11-12 15:23:30] 0,EAFc1VTOvC9rYzPl9zJYa6cc8uJ089EaFd79B25i,true,1254409.5862723626,1573543410134,1254409.5862731598
5. [2019-11-12 15:23:53] 0,zndVraA4GP7FP8p4CkQFsKJkxwtYK3zXjDdkhmRk,true,1254409.5956010541,1573543433538,1254409.5956018514
6. [2019-11-12 15:24:00] 0,9YrjjoALEfyZm07J7OuNvDVNyspIzrbOOAGnZtHx,true,1254409.598201082,1573543440061,1254409.5982018793
7. [2019-11-12 15:24:23] 0,mWsFgFlUnXKQQR6RpbAYDF9OhGYgU8mljvGCtZ26,true,1254409.6073950487,1573543463126,1254409.607395447
8. [2019-11-12 15:26:51] 0,5pZRRzkW3WDLdYLOklNgTLFX0Q0uywZ8jhw7RYfI,true,1254409.666525653,1573543611475,1254409.6665264503
9. [2019-11-12 15:29:11] 0,hVgGQrXpBtTJm6sovVK4YGjfNMdQ3z9pQHxD5Iqd,true,1254409.7222845491,1573543751364,1254409.7222853464
10. [2019-11-12 15:29:52] 0,7wQOQmxoaEl6Cxl1OSo6cr8MAc1AdJWJQaTPT5xs,true,1254409.7387664048,1573543792714,1254409.738767202
11. [2019-11-12 15:30:30] 0,a3Th5Q6a8Vy2h1zfWLEP7MdPhbKyTY3a4AfcOJs2,true,1254409.7538966285,1573543830673,1254409.7538974257
12. [2019-11-12 15:34:54] 0,d0yQAugqJ8M8OtmVQYMTYR8hi3uuX5WsH9VQRBpP,true,1254409.8589555968,1573544094247,1254409.8589563938
DataHub Topic Schema
以上數(shù)據(jù)對應(yīng)的DataHub schme為:
字段名稱 | 字段類型 |
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
decimal | DECIMAL |
flume 配置文件
在目錄 ${FLUME_HOME}/conf 下創(chuàng)建文件名為datahub_basic.conf的文件,然后將以下內(nèi)容寫入文件。本實例采用Exec Source作為數(shù)據(jù)源,更多Source可以參考Flume官方文檔。注意:ExecSource源可能發(fā)生數(shù)據(jù)丟失,因為EeecSource無法保證將事件放入Channel,在這種情況下,數(shù)據(jù)將丟失。例如,tail命令獲取數(shù)據(jù)時,此時flume channel已滿,而這部分?jǐn)?shù)據(jù)將會丟失。建議使用Spooling Directory Source或者Taildir Source,下方有Taildir Source案例介紹。這里將靜態(tài)文件/temp/test.csv作為數(shù)據(jù)源,如果文件為動態(tài)寫入的日志文件,可使用命令tail -F logFile
進(jìn)行實時采集。
# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /temp/test.csv
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = REGEX
a1.sinks.k1.serializer.regex = \\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\] (\\d+),(\\S+),([a-z]+),([-+]?[0-9]*\\.?[0-9]*),(\\d+),([-+]?[0-9]*\\.?[0-9]*)
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動Flume
Dflume.root.logger=INFO,console選項可以將日志實時輸出到控制臺,如需更多信息可采用DEBUG模式。使用如下命令啟動Flume,即可完成CSV文件數(shù)據(jù)采集進(jìn)入DataHub:
$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console
案例三:Flume Taildir Source
在上面的介紹中提到,F(xiàn)lume使用exec source時,可能會導(dǎo)致數(shù)據(jù)丟失,所以在實際生產(chǎn)環(huán)境中并不建議使用。如果想要采集本地日志,可以使用Taildir Source或者Spooling Directory Source。下面以Taildir為例,介紹日志文件的采集。Taildir將會可以指定文件組,然后觀察指定的文件,并在檢測到新行添加到每個文件后,幾乎實時的進(jìn)行讀取。如果正在寫入新行,則此源將重試讀取它們,以等待寫入完成。 Taildir Source會把每個文件的已經(jīng)讀到的位置信息以JSON格式儲存在positionFile文件中,source event 放入channel失敗,已讀位置不會更新,所以Taildir Source是可靠的。
數(shù)據(jù)文件
所有的日志將以如下格式追加到文件末尾。日志文件命名格式為 *.log
0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
DataHub Topic Schema
以上數(shù)據(jù)對應(yīng)的DataHub schme為:
字段名稱 | 字段類型 |
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
decimal | DECIMAL |
flume 配置文件
在目錄 ${FLUME_HOME}/conf 下創(chuàng)建文件名為datahub_basic.conf的文件,然后將以下內(nèi)容寫入文件。
# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /temp/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /temp/.*log
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
a1.sinks.k1.datahub.enablePb = true
a1.sinks.k1.datahub.compressType = DEFLATE
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動Flume
Dflume.root.logger=INFO,console選項可以將日志實時輸出到控制臺,如需更多信息可采用DEBUG模式。使用如下命令啟動Flume,即可完成CSV文件數(shù)據(jù)采集進(jìn)入DataHub:
1. $ cd ${FLUME_HOME}
2. $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console
案例四:JSON serializer
JSON解析數(shù)據(jù)時將每一行作為一條Record,只做一層JSON解析,嵌套的內(nèi)容直接當(dāng)作string,第一層的name若在配置的serializer.fieldnames
中,就會加入到對應(yīng)的列中。下面以日志文件為例,說明flume如何利用JSON解析方式準(zhǔn)時時上傳到DataHub。
數(shù)據(jù)文件
將以下內(nèi)容保存在本地文件/temp/test.json中。其中需要同步的數(shù)據(jù)內(nèi)容為日期后面的詳細(xì)數(shù)據(jù)。
{"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
{"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
{"my_time":1573206062763,"gender":false,"name":"hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V","id":1,"salary":1254275.1144637289,"decimal":1254275.1144637289}
{"my_time":1573206062763,"gender":true,"name":"vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l","id":2,"salary":1254275.1144637289,"decimal":1254275.1144637289}
{"my_time":1573206062763,"gender":false,"name":"t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs","id":3,"salary":1254275.1144637289,"decimal":1254275.1144637289}
{"my_time":1573206062763,"gender":true,"name":"MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI","id":4,"salary":1254275.1144637289,"decimal":1254275.1144637289}
{"my_time":1573206062763,"gender":false,"name":"bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV","id":5,"salary":1254275.1144637289,"decimal":1254275.1144637289}
{"my_time":1573206062763,"gender":true,"name":"wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX","id":6,"salary":1254275.1144637289,"decimal":1254275.1144637289}
{"my_time":1573206062763,"gender":false,"name":"whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8","id":7,"salary":1254275.1144637289,"decimal":1254275.1144637289}
{"gender":true,"name":{"a":"OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ"},"id":8,"salary":1254275.1144637289,"decimal":1254275.1144637289}
DataHub Topic Schema
以上數(shù)據(jù)對應(yīng)的DataHub schme為:
字段名稱 | 字段類型 |
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
decimal | DECIMAL |
flume 配置文件
在目錄 ${FLUME_HOME}/conf 下創(chuàng)建文件名為datahub_basic.conf的文件,然后將以下內(nèi)容寫入文件。本實例采用Exec Source作為數(shù)據(jù)源,更多Source可以參考Flume官方文檔。
# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /temp/test.json
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
a1.sinks.k1.datahub.project = datahub_project_test
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.serializer = JSON
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動Flume
Dflume.root.logger=INFO,console選項可以將日志實時輸出到控制臺,如需更多信息可采用DEBUG模式。使用如下命令啟動Flume,即可完成CSV文件數(shù)據(jù)采集進(jìn)入DataHub:
$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console
Source 使用案例
案例一
DataHub-Flume Source可以將DataHub中的數(shù)據(jù)讀取出來,并且可靠的移動到另外的系統(tǒng)中,本文以logger(直接輸出到控制臺)為例,介紹DataHub-Flume Source的使用方法。
DataHub Topic Schema
以上數(shù)據(jù)對應(yīng)的DataHub schme為:
字段名稱 | 字段類型 |
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
decimal | DECIMAL |
flume 配置文件
在目錄 ${FLUME_HOME}/conf 下創(chuàng)建文件名為datahub_source.conf的文件,然后將以下內(nèi)容寫入文件。
# A single-node Flume configuration for DataHub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.aliyun.datahub.flume.sink.DatahubSource
a1.sources.r1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
a1.sources.r1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sources.r1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sources.r1.datahub.project = datahub_test
a1.sources.r1.datahub.topic = test_flume
a1.sources.r1.datahub.subId = {YOUR_ALIYUN_DATAHUB_SUB_ID}
a1.sources.r1.serializer = DELIMITED
a1.sources.r1.serializer.delimiter = ,
a1.sources.r1.serializer.charset = UTF-8
a1.sources.r1.datahub.retryTimes = 3
a1.sources.r1.datahub.batchSize = 1000
a1.sources.r1.datahub.batchTimeout = 5
a1.sources.r1.datahub.enablePb = false
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動Flume
$ cd ${FLUME_HOME}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_source.conf -Dflume.root.logger=INFO,console
Flume metric
DataHub-Flume 支持Flume的內(nèi)置計數(shù)監(jiān)控器,用戶可以利用監(jiān)控器來監(jiān)控自己的Flume插件的運行情況。DataHub-Flume插件的Sink和Source都支持metric信息顯示,具體參數(shù)含義可查看下表(只含DataHub相關(guān)的參數(shù),更多參數(shù)含義參考Flume官方文檔)。
1.)DatahubSink
名稱 | 描述 |
BatchEmptyCount | batch timeout時沒有數(shù)據(jù)需要寫入DataHub發(fā)生的次數(shù) |
BatchCompleteCount | Batch處理成功次數(shù),僅包含全部寫入成功的情況 |
EventDrainAttemptCount | 嘗試寫入DataHub的數(shù)據(jù)數(shù)量(解析成功數(shù)量) |
BatchUnderflowCount | 成功寫入DataHub的數(shù)據(jù)數(shù)量小于需要寫入的數(shù)據(jù)量發(fā)生的次數(shù)。數(shù)據(jù)解析完成,但寫入DataHub時部分失敗或全部失敗。 |
EventDrainSuccessCount | 成功寫入DataHub的數(shù)據(jù)量 |
2.)DatahubSource
名稱 | 描述 |
EventReceivedCount | Source接收到的DataHub的數(shù)據(jù)數(shù)量 |
EventAcceptedCount | Source將DataHub數(shù)據(jù)成功寫入channel的數(shù)量 |
使用方法
Flume提供了多種監(jiān)控方法,本文以HTTP監(jiān)控為例,介紹Flume監(jiān)控工具的使用,更多的監(jiān)控方法可以參考Flume官方文檔 。使用HTTP方式監(jiān)控,只需要在Flume插件啟動時增加兩個參數(shù)即可,-Dflume.monitoring.type=http -Dflume.monitoring.port=1234
,其中type將監(jiān)控方式指定為http,port為指定的端口號。使用示例如下:
bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234
插件成功啟動之后,便可以登錄Web界面進(jìn)行查看。地址為 https://ip:1234/metrics
FAQ
Q: flume啟動報錯org.apache.flume.ChannelFullException: Space for commit to queue couldn’t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
A: flume默認(rèn)堆內(nèi)存20MB,配置的batchSize過大時,flume使用的堆內(nèi)存會超出20M。
解決方案1:調(diào)小batchSize;
解決方案2:調(diào)大flume最大堆內(nèi)存。
$ vim bin/flume-ng
JAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"
Q: DataHub-Flume插件是否支持JSON格式
A: 目前不支持,不過用戶可以通過自定義正則表達(dá)式進(jìn)行數(shù)據(jù)解析,或者修改DataHub-Flume插件代碼,添加JSONEvent進(jìn)行支持。
Q: DataHub-Flume插件支持Blob Topic嗎?
A: 目前DataHub-Flume插件僅支持Tuple Topic,暫不支持blob。
Q:flume 報錯 org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count
A: channel已滿,source數(shù)據(jù)寫入channel失敗。可以在配置文件中修改channel capacity解決,并且可以適當(dāng)降低datahub source的batchSize。
Q: 使用舊版本flume時報錯,可能會因為jar包沖突導(dǎo)致無法正常啟動。
例如:使用flume1.6時,啟動時報錯:java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader;
。因為新版本的插件依賴的jar包和flume本身依賴的jar包版本不一致,使用了flume依賴的舊版本jar包導(dǎo)致新版本的method找不到。
A: 刪除${FLUME_HOME}/lib目錄下的三個jar包即可。
jackson-annotations-2.3.0.jar
jackson-databind-2.3.1.jar
jackson-annotations-2.3.0.jar
Q: 使用flume采集數(shù)據(jù)時,空字符串自動轉(zhuǎn)為null
A: 在flume插件2.0.2中對于非空字符串會做trim,空字符串直接轉(zhuǎn)為null。flume插件2.0.3中已經(jīng)優(yōu)化掉,非空字符串寫入DataHub依舊為空字符串。
Q:啟動報錯
A:刪除Flume lib文件夾中的guava 、zstd的 jar包文件,重新啟動