本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
本文介紹如何使用Databricks 讀寫MaxCompute數據。
前提條件
已創建MaxCompute實例,具體參見快速體驗MaxCompute
通過主賬號登錄阿里云 Databricks控制臺。
已創建DDI集群,具體請參見DDI集群創建。
創建集群并通過knox賬號訪問NoteBook。
使用Databricks 讀寫MaxCompute數據
讀取maxCompute數據集的dwd_product_movie_basic_info表中ds=20170519分區的數據到DataFrame中,代碼實現。
警告
odpsUrl和tunnelUrl都需要設置為VPC內網訪問格式,否則提交job的時候會因為集群中worker節點(無公網ip)連接不上odps導致job超時。
%spark
val akId = "your akId"
val aks = "your aks"
val project = "maxcompute_public_data"
val table = "dwd_product_movie_basic_info"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api" //參考文檔
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com" //參考文檔
//spark讀取dwd_product_movie_basic_info表中ds=20170519分區的數據到DataFrame中;
val readDF = spark.read.format("org.apache.spark.aliyun.odps.datasource").option("odpsUrl", odpsUrl).option("tunnelUrl", tunnelUrl).option("project",project).option("table",table).option("accessKeySecret",aks).option("accessKeyId", akId).option("partitionSpec", "ds=20170519").load()
readDF.show()
通過自定義Schema創建MaxCompute分區表數據,
說明
MC數據讀取依賴ddi-datasources_shaded_2.11-1.0-SNAPSHOT.jar,該包中封裝了spark調用odps數據的sdk,并封裝了MaxCompute建表的工具類OdpsUtils。
%spark
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.aliyun.odps.{Column, OdpsType, TableSchema}
import org.apache.spark.aliyun.utils.OdpsUtils
//定義參數
val akId = "your akId"
val aks = "your aks"
val project = "your project"
val table = "table_movie"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api" //參考文檔
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com" //參考文檔
val flag=true
//創建schema
val schema = new TableSchema
schema.addColumn(new Column("movie_name", OdpsType.STRING))
schema.addColumn(new Column("dirctor", OdpsType.STRING))
schema.addColumn(new Column("scriptwriter", OdpsType.STRING))
schema.addColumn(new Column("area", OdpsType.STRING))
schema.addColumn(new Column("actors", OdpsType.STRING))
schema.addColumn(new Column("type", OdpsType.STRING))
schema.addColumn(new Column("movie_length", OdpsType.STRING))
schema.addColumn(new Column("movie_date", OdpsType.STRING))
schema.addColumn(new Column("movie_language", OdpsType.STRING))
schema.addColumn(new Column("imdb_url", OdpsType.STRING))
schema.addPartitionColumn(new Column("ds", OdpsType.STRING))
//databricks整合了odps建表工具類OdpsUtils
val odpsUtils= OdpsUtils(akId, aks, odpsUrl)
//創建MC表
odpsUtils.createTable(project, table, schema, flag)
//參數說明:flag:是否覆蓋原有的表(true:覆蓋,false:不覆蓋)
寫入數據到分區表table_movie中
%spark
val akId = "your akId"
val aks = "your aks"
val project = "your project"
val table = "table_movie"
val odpsUrl = "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api" //參考文檔
val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com" //參考文檔
//寫入數據到maxcompute新創建的分區表中
readDF.write.format("org.apache.spark.aliyun.odps.datasource")
.option("odpsUrl", odpsUrl)
.option("tunnelUrl", tunnelUrl)
.option("table", table)
.option("project",project)
.option("accessKeySecret", aks)
.option("accessKeyId", akId)
.option("partitionSpec", "ds='20190522'").mode(SaveMode.Overwrite).save()
查看數據是否寫入成功
%spark
val project = "your project"
val table = "table_movie"
//讀取分區ds=20190522數據
val DF = spark.read.format("org.apache.spark.aliyun.odps.datasource")
.option("odpsUrl", odpsUrl)
.option("tunnelUrl", tunnelUrl)
.option("project",project)
.option("table",table)
.option("accessKeySecret",aks)
.option("accessKeyId", akId)
.option("inferSchema","true")
.option("partitionSpec", "ds=20190522")
.load()
DF.show()
文檔內容是否對您有幫助?