日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

離線Spark消費(fèi)示例

更新時(shí)間:

本文介紹Spark如何訪問SLS。

Spark RDD訪問SLS

代碼示例

## TestBatchLoghub.Scala

object TestBatchLoghub {
  def main(args: Array[String]): Unit = {
    if (args.length < 6) {
      System.err.println(
        """Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
          |  <access key id> <access key secret> <output path> <start time> <end time=now>
        """.stripMargin)
      System.exit(1)
    }

    val loghubProject = args(0)
    val logStore = args(1)
    val endpoint = args(2)
    val accessKeyId = args(3)
    val accessKeySecret = args(4)
    val outputPath = args(5)
    val startTime = args(6).toLong

    val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
    var rdd:JavaRDD[String] = null
    if (args.length > 7) {
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
    } else {
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
    }

    rdd.saveAsTextFile(outputPath)
  }
}
說明

Maven pom文件可以參見aliyun-emapreduce-demo

編譯運(yùn)行

說明

運(yùn)行代碼示例前必須先配置環(huán)境變量。關(guān)于如何配置環(huán)境變量,請(qǐng)參見配置環(huán)境變量

## 編譯命令
mvn clean package -DskipTests

## 編譯完后,作業(yè)JAR包位于target/shaded/下。

## 提交執(zhí)行
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g --num-executors 2 --class x.x.x.TestBatchLoghub xxx.jar <sls project> <sls logstore> <sls endpoint> $ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET <output path> <start time> [<end time=now>]
重要
  • x.x.x.TestBatchLoghubxxx.jar需要替換成真實(shí)的類路徑和包路徑。

  • 作業(yè)資源需要根據(jù)實(shí)際數(shù)據(jù)規(guī)模和實(shí)際集群規(guī)模調(diào)整,如果集群太小,直接運(yùn)行以上命令可能無法執(zhí)行。

spark-sql訪問SLS

訪問命令

spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
  --hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
  --hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
說明

/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*中包含LogHub DataSource類型。如果您EMR集群使用的是Spark2,則應(yīng)修改上面命令中的spark3應(yīng)該換成spark2

如果您希望在本地電腦的開發(fā)環(huán)境中使用Spark3依賴SLS,類似于Spark2的操作方式,可以按照以下步驟操作:

  1. 下載集群/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12目錄下的內(nèi)容到本地。

  2. 使用Maven將JAR包安裝到本地。

    mvn install:install-file -DgroupId=com.aliyun.emr -DartifactId=emr-datasources_shaded_2.12 -Dversion=3.0.2 -Dpackaging=jar -Dfile=/Users/zhongqiang.czq/Downloads/tempory/emr-datasources_shaded_2.12-3.0.2.jar
  3. 在pom文件中添加以下依賴項(xiàng)。

    <dependency>
      <groupId>com.aliyun.emr</groupId>
      <artifactId>emr-datasources_shaded_2.12</artifactId>
      <version>3.0.2</version>
    </dependency>

建表和讀取數(shù)據(jù)示例

create table test_sls
using loghub
  options(endpoint='cn-hangzhou-intranet.log.aliyuncs.com',
          access.key.id='${hiveconf:accessKeyId}',
          access.key.secret='${hiveconf:accessKeySecret}',
          sls.project='test_project',
          sls.store='test_store',
          startingoffsets='earliest'
);

select * from test_sls;

配置環(huán)境變量

配置環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

重要
  • 阿里云賬號(hào)AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進(jìn)行API訪問或日常運(yùn)維,具體操作,請(qǐng)參見創(chuàng)建RAM用戶

  • 請(qǐng)不要將AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號(hào)下所有資源的安全。

  • LinuxmacOS系統(tǒng)配置方法:

    執(zhí)行以下命令配置環(huán)境變量。

    其中, <access_key_id>需替換為您RAM用戶的AccessKey ID,<access_key_secret>替換為您RAM用戶的AccessKey Secret。

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
  • Windows系統(tǒng)配置方法

    1. 新建環(huán)境變量文件,添加環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,并寫入已準(zhǔn)備好的AccessKey ID和AccessKey Secret。

    2. 重啟Windows系統(tǒng)生效。

相關(guān)文檔

Spark訪問Kafka:Structured Streaming + Kafka Integration Guide