Spark Streaming消費(fèi)
日志服務(wù)采集到日志數(shù)據(jù)后,您可以通過運(yùn)行Spark Streaming任務(wù)消費(fèi)日志數(shù)據(jù)。
日志服務(wù)提供的Spark SDK實(shí)現(xiàn)了Receiver模式和Direct模式兩種消費(fèi)模式。Maven依賴如下:
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-logservice_2.11</artifactId>
<version>1.7.2</version>
</dependency>
Receiver模式
Receiver模式通過消費(fèi)組消費(fèi)日志數(shù)據(jù)并暫存在Spark Executor,Spark Streaming任務(wù)啟動(dòng)之后從Executor讀取并處理數(shù)據(jù)。每條數(shù)據(jù)以JSON字符串的形式返回。消費(fèi)組自動(dòng)定時(shí)保存Checkpoint到服務(wù)端,無需手動(dòng)更新Checkpoint。更多信息,請(qǐng)參見通過消費(fèi)組消費(fèi)日志數(shù)據(jù)。
參數(shù)說明
參數(shù)
類型
說明
project
String
日志服務(wù)Project名稱。
logstore
String
日志服務(wù)Logstore名稱。
consumerGroup
String
消費(fèi)組名稱。
endpoint
String
日志服務(wù)Project所在地域的服務(wù)入口。更多信息,請(qǐng)參見服務(wù)接入點(diǎn)。
accessKeyId
String
訪問日志服務(wù)的AccessKey ID。
accessKeySecret
String
訪問日志服務(wù)的AccessKey Secret。
示例
說明默認(rèn)配置下,Receiver模式在異常情況下可能導(dǎo)致數(shù)據(jù)丟失。為了避免此類情況發(fā)生,建議開啟Write-Ahead Logs開關(guān)(Spark 1.2以上版本支持)。更多關(guān)于Write-Ahead Logs的細(xì)節(jié)請(qǐng)參見Spark。
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.aliyun.logservice.LoghubUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.SparkConf object TestLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Loghub") val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, project, logstore, consumerGroup, endpoint, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => rdd.map(bytes => new String(bytes)).top(10).foreach(println) ) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
Direct模式
Direct模式不需要消費(fèi)組,使用API在任務(wù)運(yùn)行時(shí)直接從服務(wù)端請(qǐng)求數(shù)據(jù)。Direct模式具有如下優(yōu)勢(shì):
簡(jiǎn)化并行:Spark partition個(gè)數(shù)與Logstore Shard總數(shù)一致。只需分裂Shard即可提高任務(wù)的并行度。
高效:不需要開啟Write-Ahead Logs來保證數(shù)據(jù)不丟失。
Exactly-once語義:直接從服務(wù)端按需獲取數(shù)據(jù),任務(wù)成功之后再提交Checkpoint。
由于Spark異常退出或者其他原因?qū)е氯蝿?wù)未正常結(jié)束,可能會(huì)導(dǎo)致部分?jǐn)?shù)據(jù)被重復(fù)消費(fèi)。
Direct模式需要依賴ZooKeeper環(huán)境,用于臨時(shí)保存中間狀態(tài)。同時(shí),必須設(shè)置Checkpoint目錄。中間狀態(tài)數(shù)據(jù)保存在ZooKeeper內(nèi)對(duì)應(yīng)的Checkpoint目錄內(nèi)。如果任務(wù)重啟之后希望重新消費(fèi),需要在ZooKeeper內(nèi)刪除該目錄,并更改消費(fèi)組名稱。
參數(shù)說明
參數(shù)
類型
說明
project
String
日志服務(wù)Project名稱。
logstore
String
日志服務(wù)Logstore名稱。
consumerGroup
String
消費(fèi)組名稱,僅用于保存消費(fèi)位置。
endpoint
String
日志服務(wù)Project所在地域的服務(wù)入口。更多信息,請(qǐng)參見服務(wù)接入點(diǎn)。
accessKeyId
String
訪問日志服務(wù)的Access Key ID。
accessKeySecret
String
訪問日志服務(wù)的Access Key Secret。
zkAddress
String
ZooKeeper的連接地址。
限流配置
Spark Streaming流式消費(fèi)是將數(shù)據(jù)分成微小的批次進(jìn)行處理,因此日志服務(wù)開始消費(fèi)時(shí),需預(yù)知每個(gè)批次的邊界,即每個(gè)批次需要獲取的數(shù)據(jù)條數(shù)。
日志服務(wù)底層的存儲(chǔ)模型以LogGroup為單位,正常情況下每個(gè)LogGroup對(duì)應(yīng)一次寫入請(qǐng)求,例如一次寫入請(qǐng)求可能包含數(shù)千條日志,這些日志作為一個(gè)LogGroup進(jìn)行存儲(chǔ)和消費(fèi)。而通過WebTracking方式寫入日志時(shí),每次寫入請(qǐng)求僅包含一條日志,即一個(gè)LogGroup只有一條日志。為了能夠應(yīng)對(duì)不同寫入場(chǎng)景的消費(fèi)需求,SDK提供如下兩個(gè)參數(shù)進(jìn)行限流配置。
參數(shù)
說明
默認(rèn)值
spark.loghub.batchGet.step
限制單次消費(fèi)請(qǐng)求獲取的最大LogGroup個(gè)數(shù)。
100
spark.streaming.loghub.maxRatePerShard
限制單批次內(nèi)每個(gè)Shard消費(fèi)的日志條數(shù)。
10000
通過spark.streaming.loghub.maxRatePerShard可指定每個(gè)批次每個(gè)Shard期望消費(fèi)的最大日志條數(shù)。Spark SDK的實(shí)現(xiàn)原理是每次從服務(wù)端獲取spark.loghub.batchGet.step中的LogGroup個(gè)數(shù)并累計(jì)其中的日志條數(shù),直到達(dá)到或超過spark.streaming.loghub.maxRatePerShard。因此spark.streaming.loghub.maxRatePerShard并非一個(gè)精確控制單批次消費(fèi)日志條數(shù)的參數(shù),實(shí)際上每個(gè)批次消費(fèi)的日志條數(shù)與spark.loghub.batchGet.step以及每個(gè)LogGroup中包含的日志條數(shù)相關(guān)。
示例
import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition import org.apache.spark.SparkConf import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils} object TestDirectLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val zkAddress = if (args.length >= 8) args(7) else "localhost:2181" def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Direct Loghub") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkAddress, "enable.auto.commit" -> "false") val loghubStream = LoghubUtils.createDirectStream( ssc, project, logstore, consumerGroup, accessKeyId, accessKeySecret, endpoint, zkParas, LogHubCursorPosition.END_CURSOR) loghubStream.checkpoint(batchInterval).foreachRDD(rdd => { println(s"count by key: ${rdd.map(s => { s.sorted (s.length, s) }).countByKey().size}") loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
更多信息,請(qǐng)參見GitHub。