本文為您介紹Spark-2.x依賴的配置以及Spark-2.x示例說明。
配置Spark-2.x的依賴
通過MaxCompute提供的Spark客戶端提交應用時,需要在pom.xml文件中添加以下依賴。pom.xml文件請參見pom.xml。
<properties>
<spark.version>2.3.0</spark.version>
<cupid.sdk.version>3.3.8-public</cupid.sdk.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<version>${cupid.sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>hadoop-fs-oss</artifactId>
<version>${cupid.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
<version>${cupid.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.version}</version>
</dependency>
上述代碼中Scope的定義如下:
spark-core、spark-sql等所有Spark社區發布的包,設置Scope為provided。
odps-spark-datasource設置Scope為compile。
WordCount示例(Scala)
代碼示例
提交方式
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class \ com.aliyun.odps.spark.examples.WordCount \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
MaxCompute Table讀寫示例(Scala)
代碼示例
提交方式
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.sparksql.SparkSQL \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
GraphX PageRank示例(Scala)
代碼示例
提交方式
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.graphx.PageRank \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Mllib Kmeans-ON-OSS示例(Scala)
spark.hadoop.fs.oss.ststoken.roleArn
和spark.hadoop.fs.oss.endpoint
的填寫請參見Oss-Access文檔說明。
代碼示例
提交方式
# 編輯代碼。 val modelOssDir = "oss://bucket/kmeans-model" // 填寫具體的OSS Bucket路徑。 val spark = SparkSession .builder() .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider") .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com") .appName("KmeansModelSaveToOss") .getOrCreate() cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
OSS UnstructuredData示例(Scala)
spark.hadoop.fs.oss.ststoken.roleArn
和spark.hadoop.fs.oss.endpoint
的填寫請參見Oss-Access文檔說明。
代碼示例
提交方式
# 編輯代碼。 val pathIn = "oss://bucket/inputdata/" // 填寫具體的OSS Bucket路徑。 val spark = SparkSession .builder() .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider") .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com") .appName("SparkUnstructuredDataCompute") .getOrCreate() cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
SparkPi示例(Scala)
代碼示例
提交方式
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.SparkPi \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支持Spark Streaming LogHub示例(Scala)
代碼示例
提交方式
# 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHubStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支持Spark Streaming LogHub寫MaxCompute示例(Scala)
代碼示例
提交方式
# 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHub2OdpsDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支持Spark Streaming DataHub示例(Scala)
代碼示例
提交方式
# 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHubStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支持Spark Streaming DataHub寫MaxCompute示例(Scala)
代碼示例
提交方式
# 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHub2OdpsDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支持Spark Streaming Kafka示例(Scala)
代碼示例
提交方式
# 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.kafka.KafkaStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
更多信息請參見MaxCompute-Spark。
支持Spark StructuredStreaming DataHub示例(Scala)
代碼示例
提交方式
# 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.datahub.DatahubStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支持Spark StructuredStreaming Kafka示例(Scala)
代碼示例
提交方式
# 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.kafka.KafkaStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支持Spark StructuredStreaming LogHub示例(Scala)
代碼示例
提交方式
# 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.loghub.LoghubStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
MaxCompute Table讀寫PySpark示例(Python)
代碼示例
提交方式
# 環境變量spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --jars /path/to/odps-spark-datasource_2.11-3.3.8-public.jar \ /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_sql.py
PySpark寫OSS示例(Python)
代碼示例
提交方式
# 環境變量spark-defaults.conf的配置請參見搭建開發環境。 # OSS相關配置請參見OSS Access文檔說明。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --jars /path/to/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar \ /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_oss.py # spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar可以通過Spark-2.x編譯得到。
Spark-SQL示例(Java)
Spark-SQL Java示例代碼請參見JavaSparkSQL.java。
從MaxCompute中讀取數據寫入HBase
通過IntelliJ IDEA工具編寫代碼,實現從MaxCompute中讀取數據寫入HBase。
代碼示例
object McToHbase { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("spark_sql_ddl") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api") .getOrCreate() val sc = spark.sparkContext val config = HBaseConfiguration.create() val zkAddress = "" config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress); val jobConf = new JobConf(config) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test") try{ import spark._ spark.sql("select '7', 'long'").rdd.map(row => { val id = row(0).asInstanceOf[String] val name = row(1).asInstanceOf[String] val put = new Put(Bytes.toBytes(id)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(name)) (new ImmutableBytesWritable, put) }).saveAsHadoopDataset(jobConf) } finally { sc.stop() } } }
提交方式:通過IntelliJ IDEA提交并運行示例代碼。更多操作信息,請參見Spark在MaxCompute的運行方式。
讀寫OSS文件
通過IntelliJ IDEA工具或DataWorks,實現讀寫OSS文件。
代碼示例
示例1:Local模式下的代碼示例。
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .config("spark.master", "local[4]") // 需要設置spark.master為local[N]才能直接運行,N為并發數。 .config("spark.hadoop.fs.oss.accessKeyId", "") .config("spark.hadoop.fs.oss.accessKeySecret", "") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com") .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { //OSS文件的讀取。 val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) //RDD寫入。 inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3") } finally { sc.stop() } } }
說明執行該代碼前,請您務必檢查是否已添加了
hadoop-fs-oss
依賴,否則會報錯。示例2:Local模式下的代碼示例。
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import com.aliyun.oss.{OSSClientBuilder,OSSClient} import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .config("spark.master", "local[4]") // 需要設置spark.master為local[N]才能直接運行,N為并發數。 .config("spark.hadoop.fs.oss.accessKeyId", "") .config("spark.hadoop.fs.oss.accessKeySecret", "") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com") .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { //OSS文件的讀取。 val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) val cnt = inputData.count inputData.count() println(s"count: $cnt") //OSS文件的寫入。 // 阿里云賬號AccessKey擁有所有API的訪問權限,風險很高。強烈建議您創建并使用RAM用戶進行API訪問或日常運維,請登錄RAM控制臺創建RAM用戶 // 此處以把AccessKey 和 AccessKeySecret 保存在環境變量為例說明。您也可以根據業務需要,保存到配置文件里 // 強烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會存在密鑰泄漏風險 val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) val filePath="user/data" ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes())) ossClient.shutdown() } finally { sc.stop() } } }
示例3:Cluster模式下的代碼示例。
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import com.aliyun.oss.{OSSClientBuilder,OSSClient} import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { //OSS文件的讀取。 val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) val cnt = inputData.count inputData.count() println(s"count: $cnt") // inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3") //OSS文件的寫入。 // 阿里云賬號AccessKey擁有所有API的訪問權限,風險很高。強烈建議您創建并使用RAM用戶進行API訪問或日常運維,請登錄RAM控制臺創建RAM用戶 // 此處以把AccessKey 和 AccessKeySecret 保存在環境變量為例說明。您也可以根據業務需要,保存到配置文件里 // 強烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會存在密鑰泄漏風險 val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) val filePath="user/data" ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes())) ossClient.shutdown() } finally { sc.stop() } } }
提交方式:
Local模式下的代碼通過IntelliJ IDEA開發、測試并提交。更多操作信息,請參見Spark在MaxCompute的運行方式。
在DataWorks上通過ODPS Spark節點提交并運行。詳情請參見開發ODPS Spark任務。
讀MaxCompute寫OSS
通過IntelliJ IDEA工具或DataWorks,實現讀取MaxCompute數據并寫入OSS。
代碼示例
Local模式下的示例代碼。
package com.aliyun.odps.spark.examples.userpakage import org.apache.spark.sql.{SaveMode, SparkSession} object SparkODPS2OSS { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("Spark2OSS") .config("spark.master", "local[4]")// 需設置spark.master為local[N]才能直接運行,N為并發數 .config("spark.hadoop.odps.project.name", "") .config("spark.hadoop.odps.access.id", "") .config("spark.hadoop.odps.access.key", "") .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.fs.oss.accessKeyId","") .config("spark.hadoop.fs.oss.accessKeySecret","") .config("spark.hadoop.fs.oss.endpoint","oss-cn-beijing.aliyuncs.com") .getOrCreate() try{ //通過SparkSql查詢表 val data = spark.sql("select * from user_detail") //展示查詢數據 data.show(10) //將查詢到的數據存儲到一個OSS的文件中 data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3") }finally { spark.stop() } } }
Cluster模式下的示例代碼。
package com.aliyun.odps.spark.examples.userpakage import org.apache.spark.sql.{SaveMode, SparkSession} object SparkODPS2OSS { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("SparkODPS2OSS") .getOrCreate() try{ //通過SparkSql查詢表 val data = spark.sql("select * from user_detail") //展示查詢數據 data.show(10) //將查詢到的數據存儲到一個OSS的文件中 data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3") }finally { spark.stop() } } }
提交方式:
Local模式下的代碼通過IntelliJ IDEA開發、測試并提交。
在DataWorks上通過ODPS Spark節點提交并運行。詳情請參見開發ODPS Spark任務。
說明Spark開發環境的配置請參見Spark在MaxCompute的運行方式。
讀OSS外部表
Spark on MaxCompute支持讀PARQUET、TEXTFILE、 ORC、AVRO和SEQUENCEFILE格式的OSS外部表,創建OSS外部表詳情請參見創建OSS外部表。
當Spark版本為2.x版本時,需要添加如下參數:
spark.sql.odps.enableExternalTable=true spark.sql.odps.enableExternalProject=true
當Spark版本為3.x版本時,需要添加如下參數:
spark.sql.catalog.odps.enableExternalTable=true spark.sql.catalog.odps.enableExternalProject=true
讀OSS外部表示例用到mc_table的MaxCompute表數據信息如下。
1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
1,6,9,1,46.81006,-92.08174,9/15/2014 0:00,S
1,7,53,1,46.81006,-92.08174,9/15/2014 0:00,N
1,8,63,1,46.81006,-92.08174,9/15/2014 0:00,SW
1,9,4,1,46.81006,-92.08174,9/15/2014 0:00,NE
1,10,31,1,46.81006,-92.08174,9/15/2014 0:00,N
讀PARQUET格式的OSS外部表示例
在MaxCompute中使用如下命令通過內置開源解析器創建PARQUET格式OSS外部表。
create external table if not exists mc_oss_parquet_external( vehicleId STRING , recordId STRING, patientId STRING, calls STRING, locationLatitute STRING, locationLongtitue STRING, recordTime string, direction string) stored as parquet location '<oss_location>' ;
導入數據。
insert into table mc_oss_parquet_external select * from mc_table;
使用Spark on MaxCompute查詢數據。
import org.apache.spark.sql.SparkSession object externalTable_rds { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") .getOrCreate() // 訪問外部表parquet print("=====讀parquet表=====") spark.sql("select * from <project_name>.mc_oss_parquet_external").show(1000) } }
查詢結果如下。
讀TEXTFILE格式的OSS外部表示例。
關聯TEXT數據。
在MaxCompute中使用如下命令通過內置開源解析器創建TEXTFILE(TEXT)格式OSS外部表。
create external table if not exists mc_oss_textfile_external( vehicleId STRING , recordId STRING, patientId STRING, calls STRING, locationLatitute STRING, locationLongtitue STRING, recordTime string, direction string) stored as textfile location '<oss_location>' ;
導入數據。
insert into table mc_oss_textfile_external select * from mc_table;
使用Spark on MaxCompute查詢數據。
import org.apache.spark.sql.SparkSession object externalTable_rds { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") .getOrCreate() // 訪問外部表 stored as textfile; print("=====讀外部表-textfile=====") spark.sql("select * from <project_name>.mc_oss_textfile_external").show(1000) } }
查詢結果如下。
關聯CSV數據。
在MaxCompute中使用如下命令通過內置開源解析器創建TEXTFILE(CSV)格式OSS外部表。
create external table if not exists mc_oss_csv_external( vehicleId STRING , recordId STRING, patientId STRING, calls STRING, locationLatitute STRING, locationLongtitue STRING, recordTime string, direction string) row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' with serdeproperties ( "separatorChar" = ",", "quoteChar"= '"', "escapeChar"= "\\" ) stored as textfile location '<oss_location>' tblproperties ( "skip.header.line.count"="1", "skip.footer.line.count"="1" ) ;
導入數據。
insert into table mc_oss_csv_external select * from mc_table;
使用Spark on MaxCompute查詢數據。
import org.apache.spark.sql.SparkSession object externalTable_rds { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") .getOrCreate() // 訪問外部表 csv print("=====讀 csv 外部表=====") spark.sql("select * from <project_name>.mc_oss_csv_external").show(1000) } }
查詢結果如下。
關聯JSON數據。
在MaxCompute中使用如下命令通過內置開源解析器創建TEXTFILE(JSON)格式OSS外部表。
create external table if not exists mc_oss_json_external( vehicleId STRING , recordId STRING, patientId STRING, calls STRING, locationLatitute STRING, locationLongtitue STRING, recordTime string, direction string) row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' stored as textfile location '<oss_location>' ;
導入數據。
insert into table mc_oss_json_external select * from mc_table;
使用Spark on MaxCompute查詢數據。
import org.apache.spark.sql.SparkSession object externalTable_rds { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") .getOrCreate() // 訪問外部表 stored as textfile; print("=====讀外部表-json=====") spark.sql("select * from <project_name>.mc_oss_json_external").show(1000) } }
查詢結果如下。
讀ORC格式的OSS外部表示例。
在MaxCompute中使用如下命令通過內置開源解析器創建ORC格式OSS外部表。
create external table if not exists mc_oss_orc_external( vehicleId STRING , recordId STRING, patientId STRING, calls STRING, locationLatitute STRING, locationLongtitue STRING, recordTime string, direction string) stored as orc location '<oss_location>' ;
導入數據。
insert into table mc_oss_orc_external select * from mc_table;
使用Spark on MaxCompute查詢數據。
import org.apache.spark.sql.SparkSession object externalTable_rds { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") .getOrCreate() // 訪問外部表 orc; print("=====讀orc外部表=====") spark.sql("select * from <project_name>.mc_oss_orc_external").show(1000) } }
查詢結果如下。
讀AVRO格式的OSS外部表示例。
在MaxCompute中使用如下命令通過內置開源解析器創建AVRO格式OSS外部表。
create external table if not exists mc_oss_avro_external( vehicleId STRING , recordId STRING, patientId STRING, calls STRING, locationLatitute STRING, locationLongtitue STRING, recordTime string, direction string) stored as avro location '<oss_location>' ;
導入數據。
insert into table mc_oss_avro_external select * from mc_table;
使用Spark on MaxCompute查詢數據。
import org.apache.spark.sql.SparkSession object externalTable_rds { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") .getOrCreate() // 訪問外部表 avro; print("=====讀avro外部表=====") spark.sql("select * from <project_name>.mc_oss_avro_external").show(1000) } }
查詢結果如下。
讀SEQUENCEFILE格式的OSS外部表示例。
在MaxCompute中使用如下命令通過內置開源解析器創建SEQUENCEFILE格式OSS外部表。
create external table if not exists mc_oss_sequencfile_external( vehicleId STRING , recordId STRING, patientId STRING, calls STRING, locationLatitute STRING, locationLongtitue STRING, recordTime string, direction string) stored as sequencfile location '<oss_location>' ;
導入數據。
insert into table mc_oss_sequencfile_external select * from mc_table;
使用Spark on MaxCompute查詢數據。
import org.apache.spark.sql.SparkSession object externalTable_rds { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") .getOrCreate() // 訪問外部表 sequencefile; print("=====sequencefile=====") spark.sql("select * from <project_name>.mc_oss_sequencfile_external").show(1000) } }
查詢結果如下。
參數說明
參數
說明
oss_location
數據文件所在OSS路徑。格式為
oss://<oss_endpoint>/<Bucket名稱>/<OSS目錄名稱>/
。MaxCompute默認會讀取該路徑下的所有數據文件。oss_endpoint:OSS訪問域名信息。建議您使用OSS提供的內網域名,否則將產生OSS流量費用。更多OSS內網域名信息,請參見訪問域名和數據中心。
說明建議數據文件存放的OSS地域與MaxCompute項目所在地域保持一致。由于MaxCompute只在部分地域部署,跨地域的數據連通性可能存在問題。
Bucket名稱:OSS存儲空間名稱,即Bucket名稱。更多查看存儲空間名稱信息,請參見列舉存儲空間。
目錄名稱:OSS目錄名稱。目錄后不需要指定文件名。
--正確用法。 oss://oss-cn-shanghai-internal.aliyuncs.com/oss-mc-test/Demo1/ --錯誤用法。 http://oss-cn-shanghai-internal.aliyuncs.com/oss-mc-test/Demo1/ -- 不支持HTTP連接。 https://oss-cn-shanghai-internal.aliyuncs.com/oss-mc-test/Demo1/ -- 不支持HTTPS連接。 oss://oss-cn-shanghai-internal.aliyuncs.com/Demo1 -- 連接地址錯誤。 oss://oss-cn-shanghai-internal.aliyuncs.com/oss-mc-test/Demo1/vehicle.csv -- 不需要指定文件名。
project_name
創建OSS外部表所在的MaxCompute項目名稱。
讀Hologres外部表
前提條件
在Hologres實例內已創建內部表,詳情請參見新建內部表。
在MaxCompute實例內已創建Hologres外部表,詳情請參見Hologres外部表。
說明當Spark版本為2.x版本時,需要添加如下參數:
spark.sql.odps.enableExternalTable=true spark.sql.odps.enableExternalProject=true
當Spark版本為3.x版本時,需要添加如下參數:
spark.sql.catalog.odps.enableExternalTable=true spark.sql.catalog.odps.enableExternalProject=true
使用示例
-- 配置項 -- 當前默認關閉對于外表和外部project的支持 spark.sql.odps.enableExternalTable=true -- 指定spark版本 spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0 -- 如果出現中文亂碼,需要加如下配置 spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -- 代碼 import org.apache.spark.sql.SparkSession object externalTable_holo { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") .getOrCreate() // 訪問外部表 Hologres; print("=====hologres=====") spark.sql("select * from <tablename_holo_ext>").show(1000) } }
tablename_holo_ext為創建的Hologres外部表名稱。
讀HBase外部表
前提條件
在HBase內已創建表,詳情請參見HBase Shell使用介紹。
在MaxCompute實例內已創建HBase外部表,詳情請參見HBase外部表(HBase標準版或增強版)。
說明當Spark版本為2.x版本時,需要添加如下參數:
spark.sql.odps.enableExternalTable=true spark.sql.odps.enableExternalProject=true
當Spark版本為3.x版本時,需要添加如下參數:
spark.sql.catalog.odps.enableExternalTable=true spark.sql.catalog.odps.enableExternalProject=true
使用示例
-- 配置項 -- 當前默認關閉對于外表和外部project的支持 spark.sql.odps.enableExternalTable=true -- 指定spark版本 spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0 -- 如果出現中文亂碼,需要加如下配置 spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -- 代碼 import org.apache.spark.sql.SparkSession object externalTable_hbase { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") .getOrCreate() // 訪問外部表 HBase; print("=====HBase=====") spark.sql("select * from <tablename_hbase_ext>").show(1000) } }
tablename_hbase_ext為創建的HBase外部表名稱。