詳情請參考Databricks官網文章:并發控制。
Delta Lake在讀取和寫入之間提供ACID事務保證。這意味著:
跨多個集群的多個編寫器可以同時修改表分區,并查看表的一致性快照視圖,并且這些寫入操作將具有序列順序。
即使在作業過程中修改了某個表,讀取器仍會繼續查看Databricks 作業開始使用的表的一致快照視圖。
樂觀并發控制
Delta Lake使用樂觀并發控制來提供兩次寫入之間的事務保證。在這種機制下,寫操作分為三個階段:
讀取:讀取(如果需要)表的最新可用版本,以標識需要修改(即重寫)的文件。
寫入:通過寫入新數據文件來暫存所有更改。
驗證并提交:在提交更改之前,檢查建議的更改是否與自讀取快照后并發提交的任何其他更改沖突。如果沒有沖突,則所有已轉移的更改都將作為新版本的快照提交,并且成功寫入操作。但是,如果存在沖突,則寫入操作失敗,并出現并發修改異常,而不是像對Parquet表執行寫操作那樣損壞表。
表的隔離級別定義了必須將事務與并發操作所做的修改隔離的程度。有關在Databricks上的Delta Lake支持的隔離級別的信息,請參閱隔離級別。
寫沖突
下表描述了在每個隔離級別中哪些寫操作對可能發生沖突。
插入 | 更新,刪除,合并 | 平臺 | |
插入 | 不能沖突 | ||
更新,刪除,合并 | 在可序列化時可能會沖突,WriteSerializable 中不能發生沖突 | 在可序列化和 WriteSerializable 中可能發生沖突 | |
平臺 | 不能沖突 | 在可序列化和 WriteSerializable 中可能發生沖突 | 在可序列化和 WriteSerializable 中可能發生沖突 |
使用分區和非連續命令條件來避免沖突
在所有標記為“可能沖突”的情況下,這兩個操作是否會發生沖突取決于它們是否對同一組文件進行操作。 可以通過將表分區為與操作條件中使用的列相同的列來使兩組文件不相交。 例如, UPDATE table WHERE date > '2010-01-01' ...DELETE table WHERE date < '2010-01-01'
如果表未按日期分區,則這兩個命令和將發生沖突,因為這兩個命令都可以嘗試修改相同的一組文件。 對表進行分區 date
將避免沖突。 因此,根據通常用于命令的條件對表進行分區可以顯著減少沖突。 不過,按基數較高的列對表進行分區可能會導致由大量子目錄引起的其他性能問題。
沖突例外
發生事務沖突時,您將觀察到以下異常之一:
ConcurrentAppendException
ConcurrentDeleteReadException
ConcurrentDeleteDeleteException
MetadataChangedException
ConcurrentTransactionException
ProtocolChangedException
ConcurrentAppendException
當并發操作在操作讀取的同一分區(或未分區表中的任何位置)中添加文件時,會發生異常。該文件增加的部分可以由插入、刪除、更新或合并操作引起。
使用默認隔離級別為WriteSerializable,盲 INSERT操作(即,在未讀取任何數據的情況下盲目追加數據)添加的文件不會與任何操作沖突,即使它們接觸相同的分區(或未分區表中的任何位置)也是如此。如果隔離級別設置為Serializable,則盲追加加可能會沖突。
DELETE,UPDATE或MERGE并發操作經常引發異常。盡管并發操作可能會物理上更新不同的分區目錄,但其中一個操作可能會讀取與其他分區目錄同時更新的同一分區,從而導致沖突。可以通過在操作條件中進行分隔顯式來避免這種情況。 請考慮以下示例。
Scala
%spark
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
假設您在不同的日期或國家/地區同時運行上述代碼。由于每個作業都在目標Delta表上的獨立分區上運行,因此您不會遇到任何沖突。但是,該條件不夠明確,可以掃描整個表,并且可能與更新任何其他分區的并發操作沖突。相反,您可以重寫語句以將特定日期和國家/地區添加到合并條件中,如以下示例所示。
Scala
%spark
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
現在可以安全地在不同日期和國家/地區同時運行此操作。
ConcurrentDeleteReadException
當并發操作刪除您的操作讀取的文件時,會發生此異常。常見的原因是DELETE,UPDATE或MERGE操作時重寫文件。
ConcurrentDeleteDeleteException
當并發操作刪除了操作還刪除的文件時,會發生此異常。 這可能是由兩個并發操作重寫相同文件引起的。
MetadataChangedException
當并發事務更新增量表的元數據時,將發生此異常。 常見的原因是 ALTER TABLE
操作或寫入Delta表,用于更新表的架構
ConcurrentTransactionException
如果使用相同檢查點位置的流式處理查詢同時啟動多次,并嘗試同時寫入Delta表。 永遠不應讓兩個流式處理查詢使用相同的檢查點位置并同時運行。
ProtocolChangedException
當您的Delta表升級到新版本時,就會發生這種情況。為了使將來的操作成功,您可能需要升級Delta Lake版本。