Delta Lake 快速開始二
本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業(yè)務造成影響,請務必仔細閱讀。
本文介紹如何創(chuàng)建數(shù)據(jù)庫、表批讀寫、表流讀寫、表刪除、更新、合并以及版本控制等功能。
前提條件
通過主賬號登錄阿里云 Databricks控制臺。
已創(chuàng)建集群,具體請參見創(chuàng)建集群。
已使用OSS管理控制臺創(chuàng)建非系統(tǒng)目錄存儲空間,詳情請參見控制臺創(chuàng)建存儲空間。
首次使用DDI產(chǎn)品創(chuàng)建的Bucket為系統(tǒng)目錄Bucket,不建議存放數(shù)據(jù),您需要再創(chuàng)建一個Bucket來讀寫數(shù)據(jù)。
DDI訪問OSS路徑結(jié)構(gòu):oss://BucketName/Object
BucketName為您的存儲空間名稱。
Object為上傳到OSS上的文件的訪問路徑。
例:讀取在存儲空間名稱為databricks-demo-hangzhou文件路徑為demo/The_Sorrows_of_Young_Werther.txt的文件
// 從oss地址讀取文本文檔
val dataRDD = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt")
創(chuàng)建Notebook、導入數(shù)據(jù)、進行數(shù)據(jù)分析
創(chuàng)建數(shù)據(jù)庫
不指定路徑創(chuàng)建數(shù)據(jù)庫,創(chuàng)建的數(shù)據(jù)庫會存儲在當前集群hive路徑中
%pyspark # 創(chuàng)建數(shù)據(jù)庫目錄,你也可以選擇自定義目錄 database="db_test" spark.sql("DROP DATABASE IF EXISTS {} CASCADE".format(database)) spark.sql("CREATE DATABASE {}".format(database)) spark.sql("USE {}".format(database))
指定路徑創(chuàng)建數(shù)據(jù)庫,可以使用OSS路徑創(chuàng)建
%pyspark database="db_dome" #指定路徑創(chuàng)建庫和表;本掩飾路徑為dome路徑,您可以使用真實的路徑 location='oss://dome-test/case6/'; spark.sql("DROP DATABASE IF EXISTS {} CASCADE".format(database)) spark.sql("CREATE DATABASE {} location 'oss://dome-test/case6/' ".format(database)) spark.sql("USE {}".format(database))
說明您可以在Databricks數(shù)據(jù)洞察控制臺中的元數(shù)據(jù)管理中查看創(chuàng)建的數(shù)據(jù)庫和數(shù)據(jù)表
創(chuàng)建delta表/delta分區(qū)表/parquet格式表/元存儲表
%pyspark from pyspark.sql.functions import expr from pyspark.sql.functions import from_unixtime, to_date # 定義路徑 inputPath = "/dome-test/events_data.json" deltaPath = "/dome-test/delta/events" partitionDeltaPath = "/dome-test/delta/events_partition" parquetPath = "/dome-test/parquet/events" jsonPath = "/dome-test/json/events" events = spark.read \ .option("inferSchema", "true") \ .json(inputPath) \ .withColumn("date", to_date('date', 'yyyy-MM-dd')) events.show() events.printSchema() # 創(chuàng)建delta表/delta分區(qū)表/parquet格式表/元存儲表 events.write.format("delta").mode("overwrite").save(deltaPath) events.write.format("delta").mode("overwrite").partitionBy("date").save(partitionDeltaPath) events.write.format("parquet").mode("overwrite").save(parquetPath) events.write.format("delta").saveAsTable("events")
將parquet文件轉(zhuǎn)換為delta
%sql --可以將parquet文件轉(zhuǎn)換為delta SELECT * FROM parquet.`/dome-test/parquet/events`; CONVERT TO DELTA parquet.`/dome-test/parquet/events`; SELECT * FROM delta.`/dome-test/parquet/events`;
將delta文件還原為parquet文件
清空delta數(shù)據(jù)
%sql -- 可以將delta表還原為parquet表,需要先vacuum操作清空delta數(shù)據(jù)文件,然后刪除_delta_log目錄 SELECT * FROM delta.`/dome-test/parquet/events`; set spark.databricks.delta.retentionDurationCheck.enabled = false; vacuum delta.`/dome-test/parquet/events` retain 0 hours
前往OSS路徑中找到_delta_log目錄并刪除
查詢轉(zhuǎn)換之后的delta表數(shù)據(jù)
%sql -- 此操作前要刪除表對應的_delta_log目錄 SELECT * FROM delta.`/dome-test/parquet/events`;
查詢表
%sql select * from events;
通過元存儲創(chuàng)建表并顯示表中內(nèi)容
%pyspark # 通過路徑在元存儲中創(chuàng)建表 spark.sql("CREATE TABLE IF NOT EXISTS events_partition USING DELTA LOCATION '/dome-test/delta/events_partition'") # 從元存儲和路徑中讀取表 df1 = spark.table("events") df2 = spark.read.format("delta").load("/dome-test/delta/events_partition") df1.show() df2.show()
查詢表的舊快照
%sql DESCRIBE HISTORY events; -- 您需要將以下日期時間修改為具體日期 SELECT * FROM events TIMESTAMP AS OF '2020-12-09 19:15:30'; SELECT * FROM events VERSION AS OF 0
更新表中數(shù)據(jù),然后查詢表更改歷史時間戳和版本信息
%sql -- 更新表中數(shù)據(jù),然后查詢表更改歷史時間戳和版本信息 UPDATE events SET data = 'update-case1' where eventId = 1; UPDATE delta.`/dome-test/delta/events` SET data = 'update-case1' where eventId = 1; DESCRIBE HISTORY delta.`/dome-test/delta/events`;
通過不同時間戳表達式讀取歷史版本
%pyspark # 可以讀取歷史版本數(shù)據(jù),time travel,以下時間為對應表的歷史時間 d1 = spark.read.format("delta").option("timestampAsOf", "2020-12-09 20:23:40").load("/dome-test/delta/events") d11 = spark.read.format("delta").load("/dome-test/delta/events@20201209202340000") d2 = spark.read.format("delta").option("versionAsOf", 0).load("/dome-test/delta/events") d22 = spark.read.format("delta").load("/dome-test/delta/events@v0") # 讀取最新版本數(shù)據(jù)或者其它版本數(shù)據(jù) latest_version = spark.sql("SELECT * FROM (DESCRIBE HISTORY delta.`/dome-test/delta/events`)").collect() df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/dome-test/delta/events") d1.show() d11.show() d2.show() d22.show() df.show()
向表中追加數(shù)據(jù)
%pyspark # 往表中追加數(shù)據(jù) df = spark.createDataFrame([("case21", '2020-10-12', 21, 'INFO'),("case22", '2020-10-13', 22, 'INFO')], ['data', 'date', 'eventId', 'eventType']) df1 = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType') df1.write.format("delta").mode("append").saveAsTable("events") spark.table("events").show(30)
替換部分數(shù)據(jù)
%pyspark df = spark.createDataFrame([("case21", '2020-10-12', 21, 'INFO'),("case22", '2020-10-13', 22, 'INFO')], ['data', 'date', 'eventId', 'eventType']) df1 = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType') # 可以替換分區(qū)部分數(shù)據(jù)(replaceWhere中字段必須是分區(qū)字段);此處替換大于等于10-4的數(shù)據(jù),并將10-12、10-13號的數(shù)據(jù)填入 df1.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2020-10-4'").saveAsTable("events_partition") spark.table("events_partition").show(50)
添加列和修改列
%sql ALTER TABLE events ADD COLUMNS (name string COMMENT '名稱' FIRST); ALTER TABLE events CHANGE COLUMN name COMMENT '姓名';
重寫表
修改表中字段的類型或名稱需要重寫表,需要加上overwriteSchema選項
%pyspark # 修改表中字段的類型或者名稱需要重寫表,需要加上overwriteSchema選項 spark.read.table("events").withColumnRenamed("name", "fullname") \ .write.format("delta").mode("overwrite") \ .option("overwriteSchema", "true") \ .saveAsTable("events")
%pyspark from pyspark.sql.functions import col spark.read.table("events").withColumn("fullname2", col("fullname").cast("int")) \ .write.format("delta").mode("overwrite") \ .option("overwriteSchema", "true") \ .saveAsTable("events")
展示歷史版本表
%sql describe detail events; describe history events;
查詢版本為2的表
%sql select * from events@v2;
查詢?nèi)掌诖笥诘扔?0月10號的數(shù)據(jù)
%sql delete from events where date >= '2020-10-10';
從結(jié)構(gòu)化的輸入流中讀取數(shù)據(jù),經(jīng)過處理后結(jié)構(gòu)化流輸出到delta文件
%pyspark # 從結(jié)構(gòu)化的輸入流中讀取數(shù)據(jù),經(jīng)過處理后結(jié)構(gòu)化流輸出到delta文件 spark.readStream.format("delta").table("events").groupBy("date").count() \ .writeStream.format("delta").outputMode("complete").option("checkpointLocation", "/dome-test/delta/eventsByDate/_checkpoints/streaming-agg").start("/dome-test/delta/eventsByDate")
查詢上述文件輸出結(jié)果
%pyspark # 上述流文件輸出完成之后,可以查詢生成結(jié)果 events = spark.read.option("inferSchema", "true").format("delta").load("/dome-test/delta/eventsByDate") events.show()
更新表
%pyspark from delta.tables import * from pyspark.sql.functions import * deltaTable = DeltaTable.forPath(spark, "/dome-test/delta/events/") deltaTable.delete("date > '2020-10-03'") deltaTable.toDF().show() deltaTable.update("eventType = 'Error'", { "eventType": "'error'" }) deltaTable.toDF().show()
合并表
%pyspark from delta.tables import * deltaTable = DeltaTable.forPath(spark, "/dome-test/delta/events/") df = spark.createDataFrame([("update-case2", '2020-10-12', 2, 'INFO'),("case25", '2020-10-13', 25, 'INFO')], ['data', 'date', 'eventId', 'eventType']) updatesDF = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType') # 合并表,eventId相等則更新數(shù)據(jù),不相等就插入數(shù)據(jù) deltaTable.alias("events").merge( updatesDF.alias("updates"), "events.eventId = updates.eventId") \ .whenMatchedUpdate(set = { "data" : "updates.data" } ) \ .whenNotMatchedInsert(values = { "date": "updates.date", "eventId": "updates.eventId", "data": "updates.data", "eventType": "updates.eventType" } ).execute() deltaTable.toDF().show()