日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Spark Streaming消費(fèi)

更新時(shí)間:

日志服務(wù)采集到日志數(shù)據(jù)后,您可以通過運(yùn)行Spark Streaming任務(wù)消費(fèi)日志數(shù)據(jù)。

日志服務(wù)提供的Spark SDK實(shí)現(xiàn)了Receiver模式和Direct模式兩種消費(fèi)模式。Maven依賴如下:

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

Receiver模式

Receiver模式通過消費(fèi)組消費(fèi)日志數(shù)據(jù)并暫存在Spark Executor,Spark Streaming任務(wù)啟動(dòng)之后從Executor讀取并處理數(shù)據(jù)。每條數(shù)據(jù)以JSON字符串的形式返回。消費(fèi)組自動(dòng)定時(shí)保存Checkpoint到服務(wù)端,無需手動(dòng)更新Checkpoint。更多信息,請(qǐng)參見通過消費(fèi)組消費(fèi)日志數(shù)據(jù)

  • 參數(shù)說明

    參數(shù)

    類型

    說明

    project

    String

    日志服務(wù)Project名稱。

    logstore

    String

    日志服務(wù)Logstore名稱。

    consumerGroup

    String

    消費(fèi)組名稱。

    endpoint

    String

    日志服務(wù)Project所在地域的服務(wù)入口。更多信息,請(qǐng)參見服務(wù)接入點(diǎn)

    accessKeyId

    String

    訪問日志服務(wù)的AccessKey ID。

    accessKeySecret

    String

    訪問日志服務(wù)的AccessKey Secret。

  • 示例

    說明

    默認(rèn)配置下,Receiver模式在異常情況下可能導(dǎo)致數(shù)據(jù)丟失。為了避免此類情況發(fā)生,建議開啟Write-Ahead Logs開關(guān)(Spark 1.2以上版本支持)。更多關(guān)于Write-Ahead Logs的細(xì)節(jié)請(qǐng)參見Spark

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

Direct模式

Direct模式不需要消費(fèi)組,使用API在任務(wù)運(yùn)行時(shí)直接從服務(wù)端請(qǐng)求數(shù)據(jù)。Direct模式具有如下優(yōu)勢(shì):

  • 簡(jiǎn)化并行:Spark partition個(gè)數(shù)與Logstore Shard總數(shù)一致。只需分裂Shard即可提高任務(wù)的并行度。

  • 高效:不需要開啟Write-Ahead Logs來保證數(shù)據(jù)不丟失。

  • Exactly-once語義:直接從服務(wù)端按需獲取數(shù)據(jù),任務(wù)成功之后再提交Checkpoint。

    由于Spark異常退出或者其他原因?qū)е氯蝿?wù)未正常結(jié)束,可能會(huì)導(dǎo)致部分?jǐn)?shù)據(jù)被重復(fù)消費(fèi)。

Direct模式需要依賴ZooKeeper環(huán)境,用于臨時(shí)保存中間狀態(tài)。同時(shí),必須設(shè)置Checkpoint目錄。中間狀態(tài)數(shù)據(jù)保存在ZooKeeper內(nèi)對(duì)應(yīng)的Checkpoint目錄內(nèi)。如果任務(wù)重啟之后希望重新消費(fèi),需要在ZooKeeper內(nèi)刪除該目錄,并更改消費(fèi)組名稱。

  • 參數(shù)說明

    參數(shù)

    類型

    說明

    project

    String

    日志服務(wù)Project名稱。

    logstore

    String

    日志服務(wù)Logstore名稱。

    consumerGroup

    String

    消費(fèi)組名稱,僅用于保存消費(fèi)位置。

    endpoint

    String

    日志服務(wù)Project所在地域的服務(wù)入口。更多信息,請(qǐng)參見服務(wù)接入點(diǎn)

    accessKeyId

    String

    訪問日志服務(wù)的Access Key ID。

    accessKeySecret

    String

    訪問日志服務(wù)的Access Key Secret。

    zkAddress

    String

    ZooKeeper的連接地址。

  • 限流配置

    Spark Streaming流式消費(fèi)是將數(shù)據(jù)分成微小的批次進(jìn)行處理,因此日志服務(wù)開始消費(fèi)時(shí),需預(yù)知每個(gè)批次的邊界,即每個(gè)批次需要獲取的數(shù)據(jù)條數(shù)。

    日志服務(wù)底層的存儲(chǔ)模型以LogGroup為單位,正常情況下每個(gè)LogGroup對(duì)應(yīng)一次寫入請(qǐng)求,例如一次寫入請(qǐng)求可能包含數(shù)千條日志,這些日志作為一個(gè)LogGroup進(jìn)行存儲(chǔ)和消費(fèi)。而通過WebTracking方式寫入日志時(shí),每次寫入請(qǐng)求僅包含一條日志,即一個(gè)LogGroup只有一條日志。為了能夠應(yīng)對(duì)不同寫入場(chǎng)景的消費(fèi)需求,SDK提供如下兩個(gè)參數(shù)進(jìn)行限流配置。

    參數(shù)

    說明

    默認(rèn)值

    spark.loghub.batchGet.step

    限制單次消費(fèi)請(qǐng)求獲取的最大LogGroup個(gè)數(shù)。

    100

    spark.streaming.loghub.maxRatePerShard

    限制單批次內(nèi)每個(gè)Shard消費(fèi)的日志條數(shù)。

    10000

    通過spark.streaming.loghub.maxRatePerShard可指定每個(gè)批次每個(gè)Shard期望消費(fèi)的最大日志條數(shù)。Spark SDK的實(shí)現(xiàn)原理是每次從服務(wù)端獲取spark.loghub.batchGet.step中的LogGroup個(gè)數(shù)并累計(jì)其中的日志條數(shù),直到達(dá)到或超過spark.streaming.loghub.maxRatePerShard。因此spark.streaming.loghub.maxRatePerShard并非一個(gè)精確控制單批次消費(fèi)日志條數(shù)的參數(shù),實(shí)際上每個(gè)批次消費(fèi)的日志條數(shù)與spark.loghub.batchGet.step以及每個(gè)LogGroup中包含的日志條數(shù)相關(guān)。

  • 示例

    import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }

更多信息,請(qǐng)參見GitHub