Delta Lake快速入門概述了使用Delta Lake的基礎知識。此快速入門演示如何生成管道,以便將JSON數據讀入Delta表、修改表、讀取表、顯示表歷史記錄,以及優化表。
有關演示這些功能的Databricks筆記本,請參閱入門筆記本。
創建表
若要創建一個delta表,可以使用現有的Apache Spark SQL代碼,也可以將parquet、csv、json等數據格式轉換為delta。
對于所有文件類型,您將文件讀入DataFrame并將格式轉為delta:
Python
%pyspark
events = spark.read.json("/xz/events_data.json")
events.write.format("delta").save("/xz/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/xz/delta/events/'")
SQL
%sql
CREATE TABLE events
USING delta
AS SELECT *
FROM json.`/data/events/`
這些操作使用從JSON數據推斷出的架構創建一個新的非托管表。有關創建新Delta表時可用的全套選項的信息,請參見創建表和寫入表。
如果源文件是Parquet格式,則可以使用SQL Convert to Delta語句就地轉換文件,以創建非托管表
SQL
%sql
CONVERT TO DELTA parquet.`/mnt/delta/events`
分區數據
要加快包含涉及分區列的謂詞查詢,可以對數據進行分區。
Python
%pyspark
events = spark.read.json("/databricks-datasets/structured-streaming/events/")
events.write.partitionBy("date").format("delta").save("/mnt/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
SQL
%sql
CREATE TABLE events (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING delta
PARTITIONED BY (date)
修改表格
Delta Lake支持一組豐富的操作來修改表。
對流寫入表
您可以使用結構化流式處理將數據寫入Delta表。即使有其他流或批查詢同時運行表,Delta Lake事務日志也可以保證一次性處理。默認情況下,流在附加模式下運行,這會將新記錄添加到表中。
Python
%pyspark from pyspark.sql.types import * inputPath = "/databricks-datasets/structured-streaming/events/" jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ]) eventsDF = ( spark .readStream .schema(jsonSchema) # Set the schema of the JSON data .option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time .json(inputPath) ) (eventsDF.writeStream .outputMode("append") .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json") .table("events") )
有關Delta Lake與結構化流式處理集成的更多信息,請參閱表流式處理讀寫。
批處理更新
要將一組更新和插入合并到現有表中,請使用merge-into語句。例如,以下語句獲取更新流并將其合并到表中。當已經存在具有相同事件ID的事件時,Delta Lake將使用給定的表達式更新數據列。如果沒有匹配事件時,Delta Lake將添加一個新行。
SQL
%sql MERGE INTO events USING updates ON events.eventId = updates.eventId WHEN MATCHED THEN UPDATE SET events.data = updates.data WHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
執行INSERT時(例如,當現有數據集中沒有匹配的行時),必須為表中的每一列指定一個值。但是,您不需要更新所有值。
讀一個表
在這個部分:
顯示表格歷史記錄
查詢表的早期版本(時間行程)
您可以通過在DBFS("/mnt/delta/events")或表名("event")上指定路徑來訪問Delta表中的數據:
Scala
%spark SELECT * FROM delta.`/mnt/delta/events`
或
%spark val events = spark.table("events")
SQL
%sql SELECT * FROM delta.`/mnt/delta/events`
或
%sql SELECT * FROM events
顯示表的歷史記錄
使用DESCRIBE HISTORY語句,查看表的歷史記錄。該語句提供每次為寫入表出處信息,包括表版本、操作、用戶等。檢索增量表歷史記錄
查詢表的早期版本(按時間順序查詢)
使用Delta Lake時間行程,您可以查詢Delta表的舊快照。
對于timestamp_string,只接受日期或時間戳字符串。例如,“2019-01-01”和“2019-01-01'T'00:00:00.000Z”。
若要查詢較早版本的表,請在語句中指定版本或時間戳SELECT。例如,若要從上述歷史記錄中查詢版本0,請使用
SQL
%sql SELECT * FROM events VERSION AS OF 0
or
%sql SELECT * FROM events VERSION AS OF 0
注意因為版本1的時間戳為'2019-01-29 00:38:10',要查詢版本0,您可以使用范圍為'2019-01-29 00:37:58'到'2019-01-29 00:38:09'的任何時間戳。
使用DataFrameReader選項,您可以通過Delta table中一個固定的特定版本表創建DataFrame。
Python
%pyspark df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/mnt/delta/events") df2 = spark.read.format("delta").option("versionAsOf", version).load("/mnt/delta/events")
有關詳細信息,請參閱查詢表的舊快照(按時間順序查詢)。
優化表
對表執行多次更改后,可能會有很多小文件。為了提高讀取查詢的速度,可以使用OPTIMIZE將小文件折疊為較大的文件:
SQL
%sql OPTIMIZE delta.`/mnt/delta/events`
或
%sql OPTIMIZE events
Z-order排序
為了進一步提高讀取性能,可以通過Z-Ordering在同一組文件中共同定位相關信息。Delta Lake數據跳過算法會自動使用這種共區域性來顯著減少需要讀取的數據量。對于Z-Order數據,您可以在子句中指定要排序的列。例如:要通過共同定位,請運行:ZORDER BY Clause
SQL
%sql OPTIMIZE events ZORDER BY (eventType)
清理快照
Delta Lake為讀取提供快照隔離,這意味著即使其他用戶或作業正在查詢表時,也可以安全地運行OPTIMIZE。但是最終,您應該清理舊快照。您可以通過運行以下VACUUM命令來執行此操作
SQL
%sql VACUUM events
您可以使用 RETAIN<N>HOURS 選項來控制最新快照的保留時間:
SQL
%sql VACUUM events RETAIN 24 HOURS
有關VACUUM有效使用的詳細信息,請參閱刪除不再由Delta表引用的文件。
有關本文章詳細信息,請參考官方文檔:Delta快速入門