日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Spark-2.x示例

本文為您介紹Spark-2.x依賴的配置以及Spark-2.x示例說明。

配置Spark-2.x的依賴

通過MaxCompute提供的Spark客戶端提交應用時,需要在pom.xml文件中添加以下依賴。pom.xml文件請參見pom.xml。

<properties>
    <spark.version>2.3.0</spark.version>
    <cupid.sdk.version>3.3.8-public</cupid.sdk.version>
    <scala.version>2.11.8</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>cupid-sdk</artifactId>
    <version>${cupid.sdk.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>hadoop-fs-oss</artifactId>
    <version>${cupid.sdk.version}</version>
</dependency>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
    <version>${cupid.sdk.version}</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-actors</artifactId>
    <version>${scala.version}</version>
</dependency>

上述代碼中Scope的定義如下:

  • spark-core、spark-sql等所有Spark社區發布的包,設置Scopeprovided

  • odps-spark-datasource設置Scopecompile

WordCount示例(Scala)

  • 代碼示例

    WordCount.scala

  • 提交方式

    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    
    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class \
        com.aliyun.odps.spark.examples.WordCount \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

MaxCompute Table讀寫示例(Scala)

  • 代碼示例

    SparkSQL.scala

  • 提交方式

    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.sparksql.SparkSQL \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

GraphX PageRank示例(Scala)

  • 代碼示例

    PageRank.scala

  • 提交方式

    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.graphx.PageRank \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Mllib Kmeans-ON-OSS示例(Scala)

spark.hadoop.fs.oss.ststoken.roleArnspark.hadoop.fs.oss.endpoint的填寫請參見Oss-Access文檔說明。

  • 代碼示例

    KmeansModelSaveToOss.scala

  • 提交方式

    # 編輯代碼。
    val modelOssDir = "oss://bucket/kmeans-model" // 填寫具體的OSS Bucket路徑。
    val spark = SparkSession
      .builder()
      .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider")
      .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole")
      .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
      .appName("KmeansModelSaveToOss")
      .getOrCreate()
    
    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

OSS UnstructuredData示例(Scala)

spark.hadoop.fs.oss.ststoken.roleArnspark.hadoop.fs.oss.endpoint的填寫請參見Oss-Access文檔說明。

  • 代碼示例

    SparkUnstructuredDataCompute.scala

  • 提交方式

    # 編輯代碼。
    val pathIn = "oss://bucket/inputdata/" // 填寫具體的OSS Bucket路徑。
    val spark = SparkSession
      .builder()
      .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider")
      .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole")
      .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
      .appName("SparkUnstructuredDataCompute")
      .getOrCreate()
    
    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

SparkPi示例(Scala)

  • 代碼示例

    SparkPi.scala

  • 提交方式

    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    
    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.SparkPi \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

支持Spark Streaming LogHub示例(Scala)

  • 代碼示例

    LogHubStreamingDemo.scala

  • 提交方式

    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHubStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

支持Spark Streaming LogHub寫MaxCompute示例(Scala)

  • 代碼示例

    LogHub2OdpsDemo.scala

  • 提交方式

    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHub2OdpsDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

支持Spark Streaming DataHub示例(Scala)

  • 代碼示例

    DataHubStreamingDemo.scala

  • 提交方式

    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHubStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

支持Spark Streaming DataHub寫MaxCompute示例(Scala)

  • 代碼示例

    DataHub2OdpsDemo.scala

  • 提交方式

    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHub2OdpsDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

支持Spark Streaming Kafka示例(Scala)

  • 代碼示例

    KafkaStreamingDemo.scala

  • 提交方式

    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.kafka.KafkaStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
說明

更多信息請參見MaxCompute-Spark。

支持Spark StructuredStreaming DataHub示例(Scala)

  • 代碼示例

    DatahubStructuredStreamingDemo.scala

  • 提交方式

    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.datahub.DatahubStructuredStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

支持Spark StructuredStreaming Kafka示例(Scala)

  • 代碼示例

    KafkaStructuredStreamingDemo.scala

  • 提交方式

    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.kafka.KafkaStructuredStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

支持Spark StructuredStreaming LogHub示例(Scala)

  • 代碼示例

    LoghubStructuredStreamingDemo.scala

  • 提交方式

    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.loghub.LoghubStructuredStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

MaxCompute Table讀寫PySpark示例(Python)

  • 代碼示例

    spark_sql.py

  • 提交方式

    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --jars /path/to/odps-spark-datasource_2.11-3.3.8-public.jar \
        /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_sql.py

PySpark寫OSS示例(Python)

  • 代碼示例

    spark_oss.py

  • 提交方式

    # 環境變量spark-defaults.conf的配置請參見搭建開發環境。
    # OSS相關配置請參見OSS Access文檔說明。
    
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --jars /path/to/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar \
        /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_oss.py
    # spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar可以通過Spark-2.x編譯得到。

Spark-SQL示例(Java)

Spark-SQL Java示例代碼請參見JavaSparkSQL.java。

從MaxCompute中讀取數據寫入HBase

通過IntelliJ IDEA工具編寫代碼,實現從MaxCompute中讀取數據寫入HBase。

  • 代碼示例

    object McToHbase {
      def main(args: Array[String]) {
        val spark = SparkSession
          .builder()
          .appName("spark_sql_ddl")
          .config("spark.sql.catalogImplementation", "odps")
          .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
          .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
          .getOrCreate()
          val sc = spark.sparkContext
          val config = HBaseConfiguration.create()
          val zkAddress = ""
          config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
          val jobConf = new JobConf(config)
          jobConf.setOutputFormat(classOf[TableOutputFormat])
          jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")
    
        try{
          import spark._
          spark.sql("select '7', 'long'").rdd.map(row => {
            val id = row(0).asInstanceOf[String]
            val name = row(1).asInstanceOf[String]
            val put = new Put(Bytes.toBytes(id))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(name))
            (new ImmutableBytesWritable, put)
        }).saveAsHadoopDataset(jobConf)
      } finally {
        sc.stop()
      }
    
      }
    }
  • 提交方式:通過IntelliJ IDEA提交并運行示例代碼。更多操作信息,請參見Spark在MaxCompute的運行方式。

讀寫OSS文件

通過IntelliJ IDEA工具或DataWorks,實現讀寫OSS文件。

  • 代碼示例

    • 示例1:Local模式下的代碼示例。

      package com.aliyun.odps.spark.examples
      import java.io.ByteArrayInputStream
      import org.apache.spark.sql.SparkSession
      
      object SparkOSS {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .config("spark.master", "local[4]") // 需要設置spark.master為local[N]才能直接運行,N為并發數。
            .config("spark.hadoop.fs.oss.accessKeyId", "")
            .config("spark.hadoop.fs.oss.accessKeySecret", "")
            .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com")
            .appName("SparkOSS")
            .getOrCreate()
      
          val sc = spark.sparkContext
          try {
            //OSS文件的讀取。
            val pathIn = "oss://spark-oss/workline.txt"
            val inputData = sc.textFile(pathIn, 5)
                  //RDD寫入。
            inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3")
      
          } finally {
            sc.stop()
          }
        }
      }
      說明

      執行該代碼前,請您務必檢查是否已添加了hadoop-fs-oss依賴,否則會報錯。

    • 示例2:Local模式下的代碼示例。

      package com.aliyun.odps.spark.examples
      import java.io.ByteArrayInputStream
      import com.aliyun.oss.{OSSClientBuilder,OSSClient}
      import org.apache.spark.sql.SparkSession
      
      object SparkOSS {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .config("spark.master", "local[4]") // 需要設置spark.master為local[N]才能直接運行,N為并發數。
            .config("spark.hadoop.fs.oss.accessKeyId", "")
            .config("spark.hadoop.fs.oss.accessKeySecret", "")
            .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com")
            .appName("SparkOSS")
            .getOrCreate()
      
          val sc = spark.sparkContext
          try {
            //OSS文件的讀取。
            val pathIn = "oss://spark-oss/workline.txt"
            val inputData = sc.textFile(pathIn, 5)
            val cnt = inputData.count
            inputData.count()
            println(s"count: $cnt")
      
            //OSS文件的寫入。
            // 阿里云賬號AccessKey擁有所有API的訪問權限,風險很高。強烈建議您創建并使用RAM用戶進行API訪問或日常運維,請登錄RAM控制臺創建RAM用戶
      		  // 此處以把AccessKey 和 AccessKeySecret 保存在環境變量為例說明。您也可以根據業務需要,保存到配置文件里
      			// 強烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會存在密鑰泄漏風險
            val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
            val filePath="user/data"
            ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes()))
            ossClient.shutdown()
          } finally {
            sc.stop()
          }
        }
      }
    • 示例3:Cluster模式下的代碼示例。

      package com.aliyun.odps.spark.examples
      import java.io.ByteArrayInputStream
      import com.aliyun.oss.{OSSClientBuilder,OSSClient}
      import org.apache.spark.sql.SparkSession
      
      object SparkOSS {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .appName("SparkOSS")
            .getOrCreate()
      
          val sc = spark.sparkContext
          try {
            //OSS文件的讀取。
            val pathIn = "oss://spark-oss/workline.txt"
            val inputData = sc.textFile(pathIn, 5)
            val cnt = inputData.count
            inputData.count()
            println(s"count: $cnt")
      
            // inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3")
            //OSS文件的寫入。
            // 阿里云賬號AccessKey擁有所有API的訪問權限,風險很高。強烈建議您創建并使用RAM用戶進行API訪問或日常運維,請登錄RAM控制臺創建RAM用戶
      			// 此處以把AccessKey 和 AccessKeySecret 保存在環境變量為例說明。您也可以根據業務需要,保存到配置文件里
      			// 強烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會存在密鑰泄漏風險
            val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
            val filePath="user/data"
            ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes()))
            ossClient.shutdown()
          } finally {
            sc.stop()
          }
        }
      }
  • 提交方式:

讀MaxCompute寫OSS

通過IntelliJ IDEA工具或DataWorks,實現讀取MaxCompute數據并寫入OSS。

  • 代碼示例

    • Local模式下的示例代碼。

      package com.aliyun.odps.spark.examples.userpakage
      
      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      object SparkODPS2OSS {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession
            .builder()
            .appName("Spark2OSS")
            .config("spark.master", "local[4]")// 需設置spark.master為local[N]才能直接運行,N為并發數
            .config("spark.hadoop.odps.project.name", "")
            .config("spark.hadoop.odps.access.id", "")
            .config("spark.hadoop.odps.access.key", "")
            .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
            .config("spark.sql.catalogImplementation", "odps")
            .config("spark.hadoop.fs.oss.accessKeyId","")
            .config("spark.hadoop.fs.oss.accessKeySecret","")
            .config("spark.hadoop.fs.oss.endpoint","oss-cn-beijing.aliyuncs.com")
            .getOrCreate()
      
          try{
      
            //通過SparkSql查詢表
            val data = spark.sql("select * from  user_detail")
           //展示查詢數據
            data.show(10)
            //將查詢到的數據存儲到一個OSS的文件中
            data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3")
          }finally {
            spark.stop()
          }
      
        }
      }
    • Cluster模式下的示例代碼。

      package com.aliyun.odps.spark.examples.userpakage
      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      object SparkODPS2OSS {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession
            .builder()
            .appName("SparkODPS2OSS")
            .getOrCreate()
      
          try{
      
            //通過SparkSql查詢表
            val data = spark.sql("select * from  user_detail")
           //展示查詢數據
            data.show(10)
            //將查詢到的數據存儲到一個OSS的文件中
            data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3")
      
          }finally {
            spark.stop()
          }
      
        }
      }
  • 提交方式:

    • Local模式下的代碼通過IntelliJ IDEA開發、測試并提交。

    • 在DataWorks上通過ODPS Spark節點提交并運行。詳情請參見開發ODPS Spark任務

    說明

    Spark開發環境的配置請參見Spark在MaxCompute的運行方式

讀OSS外部表

Spark on MaxCompute支持讀PARQUET、TEXTFILE、 ORC、AVRO和SEQUENCEFILE格式的OSS外部表,創建OSS外部表詳情請參見創建OSS外部表。

說明
  • 當Spark版本為2.x版本時,需要添加如下參數:

    spark.sql.odps.enableExternalTable=true
    spark.sql.odps.enableExternalProject=true
  • 當Spark版本為3.x版本時,需要添加如下參數:

    spark.sql.catalog.odps.enableExternalTable=true
    spark.sql.catalog.odps.enableExternalProject=true

讀OSS外部表示例用到mc_table的MaxCompute表數據信息如下。

1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
1,6,9,1,46.81006,-92.08174,9/15/2014 0:00,S
1,7,53,1,46.81006,-92.08174,9/15/2014 0:00,N
1,8,63,1,46.81006,-92.08174,9/15/2014 0:00,SW
1,9,4,1,46.81006,-92.08174,9/15/2014 0:00,NE
1,10,31,1,46.81006,-92.08174,9/15/2014 0:00,N
  • 讀PARQUET格式的OSS外部表示例

    1. 在MaxCompute中使用如下命令通過內置開源解析器創建PARQUET格式OSS外部表。

      create external table if not exists mc_oss_parquet_external(
          vehicleId STRING ,
          recordId STRING,
          patientId STRING,
          calls STRING,
          locationLatitute STRING,
          locationLongtitue STRING,
          recordTime string,
          direction string)
      stored as parquet 
      location '<oss_location>' ;
    2. 導入數據。

      insert into table mc_oss_parquet_external select * from mc_table;
    3. 使用Spark on MaxCompute查詢數據。

      import org.apache.spark.sql.SparkSession
      object externalTable_rds {  
          def main(args: Array[String]): Unit = {
              val spark = SparkSession
                .builder()
                .appName("external_TableL-on-MaxCompute")
                .getOrCreate()    
      // 訪問外部表parquet
      print("=====讀parquet表=====")    
      spark.sql("select * from <project_name>.mc_oss_parquet_external").show(1000)
        }
      }

      查詢結果如下。PARQUET數據

  • 讀TEXTFILE格式的OSS外部表示例。

    • 關聯TEXT數據。

      1. 在MaxCompute中使用如下命令通過內置開源解析器創建TEXTFILE(TEXT)格式OSS外部表。

        create external table if not exists mc_oss_textfile_external(
            vehicleId STRING ,
            recordId STRING,
            patientId STRING,
            calls STRING,
            locationLatitute STRING,
            locationLongtitue STRING,
            recordTime string,
            direction string)
        stored as textfile 
        location '<oss_location>' ;
      2. 導入數據。

        insert into table mc_oss_textfile_external select * from mc_table;
      3. 使用Spark on MaxCompute查詢數據。

        import org.apache.spark.sql.SparkSession
        object externalTable_rds {  
            def main(args: Array[String]): Unit = {
                val spark = SparkSession
                  .builder()
                  .appName("external_TableL-on-MaxCompute")
                  .getOrCreate()    
        // 訪問外部表  stored as textfile;
        print("=====讀外部表-textfile=====")
        spark.sql("select * from <project_name>.mc_oss_textfile_external").show(1000)
          }
        }

        查詢結果如下。text結果

    • 關聯CSV數據。

      1. 在MaxCompute中使用如下命令通過內置開源解析器創建TEXTFILE(CSV)格式OSS外部表。

        create external table if not exists mc_oss_csv_external(
            vehicleId STRING ,
            recordId STRING,
            patientId STRING,
            calls STRING,
            locationLatitute STRING,
            locationLongtitue STRING,
            recordTime string,
            direction string)
        row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
        with serdeproperties (
          "separatorChar" = ",",
          "quoteChar"= '"',
          "escapeChar"= "\\"
        )
        stored as textfile 
        location '<oss_location>'
        tblproperties (
          "skip.header.line.count"="1",
          "skip.footer.line.count"="1"
        )
        ;
      2. 導入數據。

        insert into table mc_oss_csv_external select * from mc_table;
      3. 使用Spark on MaxCompute查詢數據。

        import org.apache.spark.sql.SparkSession
        object externalTable_rds {  
            def main(args: Array[String]): Unit = {
                val spark = SparkSession
                  .builder()
                  .appName("external_TableL-on-MaxCompute")
                  .getOrCreate()    
        // 訪問外部表 csv
        print("=====讀 csv 外部表=====")
        spark.sql("select * from <project_name>.mc_oss_csv_external").show(1000)
          }
        }

        查詢結果如下。csv

    • 關聯JSON數據。

      1. 在MaxCompute中使用如下命令通過內置開源解析器創建TEXTFILE(JSON)格式OSS外部表。

        create external table if not exists mc_oss_json_external(
            vehicleId STRING ,
            recordId STRING,
            patientId STRING,
            calls STRING,
            locationLatitute STRING,
            locationLongtitue STRING,
            recordTime string,
            direction string)
        row format serde 'org.apache.hive.hcatalog.data.JsonSerDe'
        stored as textfile 
        location '<oss_location>'
        ;
      2. 導入數據。

        insert into table mc_oss_json_external select * from mc_table;
      3. 使用Spark on MaxCompute查詢數據。

        import org.apache.spark.sql.SparkSession
        object externalTable_rds {  
            def main(args: Array[String]): Unit = {
                val spark = SparkSession
                  .builder()
                  .appName("external_TableL-on-MaxCompute")
                  .getOrCreate()    
        // 訪問外部表  stored as textfile;
        print("=====讀外部表-json=====")
        spark.sql("select * from <project_name>.mc_oss_json_external").show(1000)
          }
        }

        查詢結果如下。json

  • 讀ORC格式的OSS外部表示例。

    1. 在MaxCompute中使用如下命令通過內置開源解析器創建ORC格式OSS外部表。

      create external table if not exists mc_oss_orc_external(
          vehicleId STRING ,
          recordId STRING,
          patientId STRING,
          calls STRING,
          locationLatitute STRING,
          locationLongtitue STRING,
          recordTime string,
          direction string)
      stored as orc 
      location '<oss_location>' ;
    2. 導入數據。

      insert into table mc_oss_orc_external select * from mc_table;
    3. 使用Spark on MaxCompute查詢數據。

      import org.apache.spark.sql.SparkSession
      object externalTable_rds {  
          def main(args: Array[String]): Unit = {
              val spark = SparkSession
                .builder()
                .appName("external_TableL-on-MaxCompute")
                .getOrCreate()    
      // 訪問外部表 orc;
      print("=====讀orc外部表=====")
      spark.sql("select * from <project_name>.mc_oss_orc_external").show(1000)
        }
      }

      查詢結果如下。orc

  • 讀AVRO格式的OSS外部表示例。

    1. 在MaxCompute中使用如下命令通過內置開源解析器創建AVRO格式OSS外部表。

      create external table if not exists mc_oss_avro_external(
          vehicleId STRING ,
          recordId STRING,
          patientId STRING,
          calls STRING,
          locationLatitute STRING,
          locationLongtitue STRING,
          recordTime string,
          direction string)
      stored as avro 
      location '<oss_location>' ;
    2. 導入數據。

      insert into table mc_oss_avro_external select * from mc_table;
    3. 使用Spark on MaxCompute查詢數據。

      import org.apache.spark.sql.SparkSession
      object externalTable_rds {  
          def main(args: Array[String]): Unit = {
              val spark = SparkSession
                .builder()
                .appName("external_TableL-on-MaxCompute")
                .getOrCreate()    
      // 訪問外部表 avro;
      print("=====讀avro外部表=====")
      spark.sql("select * from <project_name>.mc_oss_avro_external").show(1000)
        }
      }

      查詢結果如下。avro

  • 讀SEQUENCEFILE格式的OSS外部表示例。

    1. 在MaxCompute中使用如下命令通過內置開源解析器創建SEQUENCEFILE格式OSS外部表。

      create external table if not exists mc_oss_sequencfile_external(
          vehicleId STRING ,
          recordId STRING,
          patientId STRING,
          calls STRING,
          locationLatitute STRING,
          locationLongtitue STRING,
          recordTime string,
          direction string)
      stored as sequencfile 
      location '<oss_location>' ;
    2. 導入數據。

      insert into table mc_oss_sequencfile_external select * from mc_table;
    3. 使用Spark on MaxCompute查詢數據。

      import org.apache.spark.sql.SparkSession
      object externalTable_rds {  
          def main(args: Array[String]): Unit = {
              val spark = SparkSession
                .builder()
                .appName("external_TableL-on-MaxCompute")
                .getOrCreate()    
      // 訪問外部表 sequencefile;
      print("=====sequencefile=====")
      spark.sql("select * from <project_name>.mc_oss_sequencfile_external").show(1000)
        }
      }

      查詢結果如下。sequencfile

  • 參數說明

    參數

    說明

    oss_location

    數據文件所在OSS路徑。格式為oss://<oss_endpoint>/<Bucket名稱>/<OSS目錄名稱>/。MaxCompute默認會讀取該路徑下的所有數據文件。

    • oss_endpoint:OSS訪問域名信息。建議您使用OSS提供的內網域名,否則將產生OSS流量費用。更多OSS內網域名信息,請參見訪問域名和數據中心。

      說明

      建議數據文件存放的OSS地域與MaxCompute項目所在地域保持一致。由于MaxCompute只在部分地域部署,跨地域的數據連通性可能存在問題。

    • Bucket名稱:OSS存儲空間名稱,即Bucket名稱。更多查看存儲空間名稱信息,請參見列舉存儲空間。

    • 目錄名稱:OSS目錄名稱。目錄后不需要指定文件名。

      --正確用法。
      oss://oss-cn-shanghai-internal.aliyuncs.com/oss-mc-test/Demo1/
      --錯誤用法。
      http://oss-cn-shanghai-internal.aliyuncs.com/oss-mc-test/Demo1/                -- 不支持HTTP連接。
      https://oss-cn-shanghai-internal.aliyuncs.com/oss-mc-test/Demo1/               -- 不支持HTTPS連接。
      oss://oss-cn-shanghai-internal.aliyuncs.com/Demo1                              -- 連接地址錯誤。
      oss://oss-cn-shanghai-internal.aliyuncs.com/oss-mc-test/Demo1/vehicle.csv     -- 不需要指定文件名。

    project_name

    創建OSS外部表所在的MaxCompute項目名稱。

讀Hologres外部表

  • 前提條件

    • 在Hologres實例內已創建內部表,詳情請參見新建內部表。

    • 在MaxCompute實例內已創建Hologres外部表,詳情請參見Hologres外部表。

    說明
    • 當Spark版本為2.x版本時,需要添加如下參數:

      spark.sql.odps.enableExternalTable=true
      spark.sql.odps.enableExternalProject=true
    • 當Spark版本為3.x版本時,需要添加如下參數:

      spark.sql.catalog.odps.enableExternalTable=true
      spark.sql.catalog.odps.enableExternalProject=true
  • 使用示例

    -- 配置項
    -- 當前默認關閉對于外表和外部project的支持
    spark.sql.odps.enableExternalTable=true
    -- 指定spark版本
    spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0
    -- 如果出現中文亂碼,需要加如下配置
    spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
    spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
    
    -- 代碼
    import org.apache.spark.sql.SparkSession
    
    object externalTable_holo {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("external_TableL-on-MaxCompute")
          .getOrCreate()
    
        // 訪問外部表 Hologres;
        print("=====hologres=====")
        spark.sql("select * from <tablename_holo_ext>").show(1000)
      }
    }

    tablename_holo_ext為創建的Hologres外部表名稱。

讀HBase外部表

  • 前提條件

    說明
    • 當Spark版本為2.x版本時,需要添加如下參數:

      spark.sql.odps.enableExternalTable=true
      spark.sql.odps.enableExternalProject=true
    • 當Spark版本為3.x版本時,需要添加如下參數:

      spark.sql.catalog.odps.enableExternalTable=true
      spark.sql.catalog.odps.enableExternalProject=true
  • 使用示例

    -- 配置項
    -- 當前默認關閉對于外表和外部project的支持
    spark.sql.odps.enableExternalTable=true
    -- 指定spark版本
    spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0
    -- 如果出現中文亂碼,需要加如下配置
    spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
    spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
    
    -- 代碼
    import org.apache.spark.sql.SparkSession
    
    object externalTable_hbase {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("external_TableL-on-MaxCompute")
          .getOrCreate()
    
        // 訪問外部表 HBase;
        print("=====HBase=====")
        spark.sql("select * from <tablename_hbase_ext>").show(1000)
    
      }
    }
                        

    tablename_hbase_ext為創建的HBase外部表名稱。