詳細內容請參考Databricks官網文章:表刪除,更新和合并
有關演示這些功能的Databricks筆記本,請參閱入門筆記本二。
Delta Lake支持多個語句,以方便從Delta表中刪除數據和更新數據。
從表中刪除
從最新版本的Delta表中刪除數據,但直到顯示清除舊版本后才從物理存儲中刪除數據。例如,要刪除2017年之前的所有事件,可以運行以下命令:
SQL
%sql
DELETE FROM events WHERE date < '2017-01-01'
DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'
Python
%pyspark
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("date < '2017-01-01'") # predicate using SQL formatted string
deltaTable.delete(col("date") < "2017-01-01") # predicate using Spark SQL functions
Scala
%spark
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.delete(col("date") < "2017-01-01") // predicate using Spark SQL functions and implicits
如果可能,請在分區的Delta表的分區列上提供謂詞,因為這樣的謂詞可以顯著加快操作速度。
有關詳細信息,請參見API參考。
更新表格
可以更新與Delta表中謂詞匹配的數據。例如,要修復eventType中的拼寫錯誤,可以運行以下命令:
SQL
%sql
UPDATE events SET eventType = 'click' WHERE eventType = 'clck'
UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'
Python
%pyspark
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.update("eventType = 'clck'", { "eventType": "'click'" } ) # predicate using SQL formatted string
deltaTable.update(col("eventType") == "clck", { "eventType": lit("click") } ) # predicate using Spark SQL functions
Scala
%spark
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string
"eventType = 'clck'",
Map("eventType" -> "'click'")
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.update( // predicate using Spark SQL functions and implicits
col("eventType") === "clck",
Map("eventType" -> lit("click")));
有關詳細信息,請參見API參考。
與刪除類似,在分區上使用謂詞可以顯著提高更新操作的速度。
使用合并操作在表中執行更新插入
您可以使用該合并操作將數據從源表,視圖或DataFrame插入目標Delta表。此操作類似于SQL MERGE INTO命令,但對更新、插入和刪除中刪除操作有額外的支持。
假設您有一個Spark DataFrame,其中包含evenId的事件的新數據,其中一些事件可能已經存在于events表中,需要更新匹配的行(即even Id已經存在)并插入新行(即even Id不存在)。您可以運行以下命令:
SQL
%sql
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET events.data = updates.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
Python
%pyspark
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
df = spark.createDataFrame([("update-case2", '2020-10-12', 2, 'INFO'),("case25", '2020-10-13', 25, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
updatesDF = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType')
# 合并表,eventId相等則更新數據,不相等就插入數據
deltaTable.alias("events").merge(
updatesDF.alias("updates"),
"events.eventId = updates.eventId") \
.whenMatchedUpdate(set = { "data" : "updates.data" } ) \
.whenNotMatchedInsert(values =
{
"date": "updates.date",
"eventId": "updates.eventId",
"data": "updates.data"
}
) \
.execute()
Scala
%spark
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
操作語義
這是merge編程操作的詳細說明。
可以有任意數量的whenMatched和whenNotMatched子句。
在Databricks Runtime 7.2及更低版本中,merge最多可以有2個whenMatched子句,最多可以有1個whenNotMatched子句。
當源行根據匹配條件與目標表行匹配時,將執行whenMatched子句。這些子句具有以下語義。
whenMatched子句最多只能有一個更新和一個刪除操作。merge中的更新操作只更新匹配的目標行的指定列(類似于更新操作)。刪除操作刪除匹配的行。
每個whenMatched子句都可以有一個可選條件。如果存在此子句條件,則僅當子句條件為true時,才對任何匹配的源-目標行對行執行更新或刪除操作。
如果有多個whenMatched子句,則按照指定的順序(即,子句的順序很重要)對它們進行求值。除最后一個外,所有whenMatched子句都必須有條件。
如果兩個whenMatched子句都有條件,并且對于匹配的源-目標行對,兩個條件都不為true,則匹配的目標行保持不變。
要用源數據集的相應列更新目標Delta表的所有列,請使用whenMatched(…).updateAll()。這相當于:
Scala
%spark whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
對于目標Delta表的所有列。因此,此操作假定源表與目標表中的列相同,否則查詢將引發分析錯誤。
說明當啟用自動架構遷移時,此行為將更改。有關詳細信息,請參見自動模式演化。
當源行與基于匹配條件的任何目標行都不匹配時,將執行whenNotMatched子句。這些子句具有以下語義。
whenNotMatched子句只能有insert操作。新行是根據指定的列和相應的表達式生成的。不需要指定目標表中的所有列。對于未指定的目標列,將插入NULL。
說明在Databricks Runtime 6.5及更低版本中,您必須在目標表中提供該INSERT操作的所有列。
每個whenNotMatched子句可以有一個可選條件。如果存在子句條件,則僅當源條件對該行為true時才插入該行。否則,將忽略源列。
要將目標Delta表的所有列與源數據集的相應列一起插入,請使用whenNotMatched(...).insertAll()。這相當于:
Scala
%spark whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
目標Delta表的所有列。因此,此操作假定源表的列與目標表的列相同,否則查詢將引發分析錯誤。
重要啟用自動模式遷移后,此行為將更改。有關詳細信息,請參見自動模式演變。
架構驗證
merge自動驗證通過插入和更新表達式生成的數據的架構是否與表的架構兼容。它使用以下規則來確定merge操作是否兼容:
對于update和insert操作,目標Delta表中必須存在指定的目標列。
對于updateAll和insertAll動作,源數據集必須具有目標Delta表的所有列。源數據集可以包含額外的列,它們將被忽略。
對于所有操作,如果由生成目標列的表達式生成的數據類型與目標Delta表中的對應列不同,merge會嘗試將其轉換為表中的類型。
自動架構演變
merge 中的架構演變在Databricks Runtime 6.6及更高版本中可用。
默認情況下,updateAll和insertAll使用來自源數據集的同名列來分配目標Delta表中的所有列。將忽略源數據集中與目標表中的列不匹配的任何列。但是,在某些用例中,需要自動將源列添加到目標Delta表中。要在使用updateAll和insertAll(至少其中一個)執行merge操作期間自動更新表架構,可以在運行merge操作之前設置Spark會話配置spark.databricks.delta.schema.autoMerge.enabled為true。
架構演變僅在同時存在一個updateAll或一個insertAll動作或兩者同時存在時發生。
在merge中的模式演化過程中,只有頂層列(即不是嵌套字段)會被更改。
update和insert操作不能顯式引用目標表中不存在的目標列(即使其中有updateAll或insertAll作為子句之一)。請參閱下面的示例。
這里有幾個例子說明了在模式演化和沒有模式演化的情況下merge操作的效果。
列 | 查詢(在Scala中) | 沒有架構演變的行為(默認值) | 有架構演變行為 |
目標列: key, value源列: key, value, newValue |
| 表架構保持不變;僅已更新/插入列key,value | 表架構更改為(key,value,newValue)。updateAll更新列value和newValue,insertAll插入行(key、value、newValue)。 |
目標列: key, oldValue 源列: key, newValue |
| update和insertAll由于目標列oldValue不在源中,因此操作會引發錯誤。 | 表架構更改為(key,oldValue,newValue)。updateAll更新列key和newValue,而oldValue保持不變,insertAll插入行(key、NULL、newValue)(也就是說,oldValue作為NULL插入)。 |
目標列: key, oldValue 源列: key, newValue |
| update引發錯誤,因為目標表中不存在該列newValue。 | update仍然會引發錯誤,因為目標表中不存在該列newValue。 |
目標列: key, oldValue 源列: key, newValue |
| insert引發錯誤,因為目標表中不存在該列newValue。 | insert由于目標表中不存在該列newValue,因此仍然會引發錯誤。 |
性能調優
您可以使用以下方法減少merge所花費的時間:
減少匹配項的搜索空間:默認情況下,merge操作搜索整個Delta表以在源表中查找匹配項。加速merge的一種方法是通過在匹配條件中添加已知約束來減少搜索空間。例如,假設您有一個按countrt/date分區的表,并且希望使用merge更新最后一天和特定國家/地區的信息。添加條件
SQL
%sql events.date = current_date() AND events.country = 'USA'
將加快查詢速度,因為它僅在相關分區中查找匹配項。此外,這還將減少與其他并發操作發生沖突的機會。有關更多詳細信息,請參見并發控制。
緊湊文件:如果數據存儲在許多小文件中,則讀取數據以搜索匹配項可能會變慢。您可以將小文件壓縮為更大的文件,以提高讀取吞吐量。有關詳細信息,請參見壓縮文件。
控制寫入的無序分區:merge操作多次對數據進行隨機排列以計算和寫入更新的數據。用于隨機的任務數由Spark會話配置spark.sql.shuffle.partitions控制。設置此參數不僅可以控制并發度,還可以確定輸出文件的數量。增加該值會提高并發度,但也會生成大量較小的數據文件。
啟用優化寫入:對于分區表,meage可以生成比隨機分區數量多得多的小文件。這是因為每個隨機任務都可以在多個分區中寫入多個文件,并可能成為性能瓶頸。您可以通過啟用優化寫入來優化這一點。
合并范例
以下是一些有關如何merge在不同情況下使用的示例。
在這個部分:
寫入Delta表時的重復數據刪除
緩慢將數據(SCD)類型2操作更改為Delta表
將更改數據寫入Delta表
使用Upsert 從流式處理查詢foreachBatch
寫入Delta表時的重復數據刪除
一個常見的ETL用例是通過將日志附加到表中來將日志收集到Delta表中。但是,源通常可以生成重復的日志記錄,因此需要下游重復數據刪除步驟來處理它們。使用Merge,您可以避免插入重復的記錄。
SQL
%sql
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
%pyspark
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
%spark
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
包含新日志的數據集需要在其內部進行重復數據刪除。通過合并的SQL語義,它將新數據與表中的現有數據進行匹配并刪除重復數據,但是如果新數據集中存在重復數據,則將其插入。因此,在合并到表之前,對新數據進行重復數據刪除。
如果您知道幾天之內可能會得到重復的記錄,則可以通過按日期對表進行分區,然后指定要匹配的目標表的日期范圍來進一步優化查詢。
SQl
%sql
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
%pyspark
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
%pyspark
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()、
這比以前的命令效率更高,因為它僅在日志的最后7天而不是整個表中查找重復項。此外,您可以將此僅插入合并與結構化流一起使用,以執行日志的連續重復數據刪除。
在流查詢中,可以使用foreachBatch中的merge操作將具有重復數據刪除功能的所有流數據連續寫入Delta表。
在另一個流式查詢中,可以連續從該Delta表中讀取已刪除重復的數據。這是可能的,因為insert-only merge只向Delta表追加新數據。
insert-only merge被優化為僅在Databricks Runtime 6.2及更高版本中追加數據。在Databricks Runtime 6.1及更低版本中,insert-only merge操作的寫入不能作為流讀取。
緩慢將數據(SCD)Type2操作更改為Delta表
另一個常見的操作是SCD Type2,它維護維度表中每個鍵所做的所有更改的歷史記錄。這樣的操作需要更新現有行以將key的以前值標記為舊值,并將新行作為最新值插入。給定一個包含更新的源表和包含維度數據的目標表,SCD Type2可以用merge表示。
下面是一個具體的例子,它維護客戶的地址歷史以及每個地址的活動日期范圍。當客戶的地址需要更新時,您必須將以前的地址標記為非當前地址,更新其活動日期范圍,并將新地址添加為當前地址。
將更改數據寫入Delta表
與SCD類似,另一個常見用例,通常稱為變更數據捕獲(CDC),是將從外部數據庫生成的所有數據更改應用到Delta表中。換句話說,應用于外部表的更新、刪除和插入集需要應用于Delta表。您可以使用merge執行以下操作。
使用Merge筆記本寫入更改數據
Note鏈接地址:CDC using Merge(Scala)
使用Upsert 從流式處理查詢foreachBath
您可以結合使用merge和foreachBatch(有關更多信息,請參閱foreachBatch)將流式處理查詢中的復雜更新操作寫入Delta表。例如:
在更新模式下寫入流式聚合:這比Complete Mode要高效得多。
將數據庫更改流寫入Delta表:用于寫入更改數據的合并查詢可用于foreachBatch中,以連續地將更改流應用于Delta表。
使用重復數據刪除將流數據寫入Delta表:用于重復數據刪除的 insert-only merge 查詢可以在 foreachBatch 中使用自動重復數據刪除功能將數據(帶有重復項)連續寫入到 Delta 表中。
確保foreachBatch中的merge語句是等冪的,因為流查詢的重新啟動可以多次對同一批數據應用該操作。
在foreachBatch中使用merge時,流式查詢的輸入數據速率(通過StreamingQueryProgress報告并在筆記本速率圖中可見)可以報告為在源處生成數據的實際速率的倍數。這是因為merge多次讀取輸入數據,導致輸入指標成倍增加。如果這是一個瓶頸,可以在merge之前緩存批處理數據幀,然后在merge后取消緩存。
使用merge和foreachBatch筆記本以更新模式編寫流式聚合
Note鏈接地址:Upsert streaming aggregates using foreachBatch and Merge(Scala)