日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MaxCompute

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

本文介紹如何使用Databricks 讀寫MaxCompute數據。

前提條件

使用Databricks 讀寫MaxCompute數據

  • 讀取maxCompute數據集的dwd_product_movie_basic_info表中ds=20170519分區的數據到DataFrame中,代碼實現。

說明

警告

odpsUrl和tunnelUrl都需要設置為VPC內網訪問格式,否則提交job的時候會因為集群中worker節點(無公網ip)連接不上odps導致job超時。

data
%spark

val akId = "your akId"
val aks = "your aks"
val project = "maxcompute_public_data"
val table = "dwd_product_movie_basic_info"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api"  //參考文檔
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"  //參考文檔
//spark讀取dwd_product_movie_basic_info表中ds=20170519分區的數據到DataFrame中;
val readDF = spark.read.format("org.apache.spark.aliyun.odps.datasource").option("odpsUrl", odpsUrl).option("tunnelUrl", tunnelUrl).option("project",project).option("table",table).option("accessKeySecret",aks).option("accessKeyId", akId).option("partitionSpec", "ds=20170519").load()
readDF.show()
data
  • 通過自定義Schema創建MaxCompute分區表數據,

說明

MC數據讀取依賴ddi-datasources_shaded_2.11-1.0-SNAPSHOT.jar,該包中封裝了spark調用odps數據的sdk,并封裝了MaxCompute建表的工具類OdpsUtils。

%spark
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.aliyun.odps.{Column, OdpsType, TableSchema}
import org.apache.spark.aliyun.utils.OdpsUtils
//定義參數
val akId = "your akId"
val aks = "your aks"
val project = "your project"
val table = "table_movie"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api"  //參考文檔
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"  //參考文檔
val flag=true
//創建schema
val schema = new TableSchema
schema.addColumn(new Column("movie_name", OdpsType.STRING))
schema.addColumn(new Column("dirctor", OdpsType.STRING))
schema.addColumn(new Column("scriptwriter", OdpsType.STRING))
schema.addColumn(new Column("area", OdpsType.STRING))
schema.addColumn(new Column("actors", OdpsType.STRING))
schema.addColumn(new Column("type", OdpsType.STRING))
schema.addColumn(new Column("movie_length", OdpsType.STRING))
schema.addColumn(new Column("movie_date", OdpsType.STRING))
schema.addColumn(new Column("movie_language", OdpsType.STRING))
schema.addColumn(new Column("imdb_url", OdpsType.STRING))
schema.addPartitionColumn(new Column("ds", OdpsType.STRING))
//databricks整合了odps建表工具類OdpsUtils
val odpsUtils= OdpsUtils(akId, aks, odpsUrl)
//創建MC表
odpsUtils.createTable(project, table, schema, flag)
//參數說明:flag:是否覆蓋原有的表(true:覆蓋,false:不覆蓋)
  • 寫入數據到分區表table_movie中

%spark
val akId = "your akId"
val aks = "your aks"
val project = "your project"
val table = "table_movie"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api"  //參考文檔
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"  //參考文檔
//寫入數據到maxcompute新創建的分區表中
readDF.write.format("org.apache.spark.aliyun.odps.datasource")
  .option("odpsUrl", odpsUrl)
  .option("tunnelUrl", tunnelUrl)
  .option("table", table)
  .option("project",project)
  .option("accessKeySecret", aks)
  .option("accessKeyId", akId)
  .option("partitionSpec", "ds='20190522'").mode(SaveMode.Overwrite).save()
  • 查看數據是否寫入成功

%spark
val project = "your project"
val table = "table_movie"
//讀取分區ds=20190522數據
val DF = spark.read.format("org.apache.spark.aliyun.odps.datasource")
        .option("odpsUrl", odpsUrl)
        .option("tunnelUrl", tunnelUrl)
        .option("project",project)
        .option("table",table)
        .option("accessKeySecret",aks)
        .option("accessKeyId", akId)
        .option("inferSchema","true")
        .option("partitionSpec", "ds=20190522")
        .load()
DF.show()
data