本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
詳細內容請參考Databricks官網文章:表流讀寫
有關演示這些功能的Databricks筆記本,請參閱入門筆記本二。
Delta Lake通過readStream和writeStream與Spark結構化流式處理深度集成。Delta Lake克服了許多流式處理系統和文件相關的常見限制,例如:
合并低延遲引入產生的小文件
保持多個流(或并發批處理作業)執行“僅一次”處理
使用文件作為流源時,可以有效地發現哪些文件是新文件
Delta表作為流源
當您將Delta表加載為流源并在流式查詢中使用它時,該查詢將處理表中存在的所有數據以及流啟動后到達的所有新數據。
您可以將路徑和表都作為流加載。
Scala
%spark
spark.readStream.format("delta").load("/mnt/delta/events")
或
Scala
%spark
spark.readStream.format("delta").table("events")
你也可以執行以下操作:
通過設置maxFilesPerTrigger選項,控制Delta Lake提供給流的任何微批處理的最大大小。這指定了每個觸發器中要考慮的新文件的最大數量。默認值為1000。
通過設置maxBytesPerTrigger選項來限制每個微批處理的數據量的速率。這將設置一個“軟最大值”,這意味著批處理大約此數量的數據,并可能處理超過該限制的數據量。如果你使用Trigger。如果Trigger.Once用于流式傳輸,則忽略此選項。如果將此選項與maxFilesPerTrigger結合使用,則微批處理將處理數據,直到達到maxFilesPerTrigger或maxBytesPerTrigger限制。
忽略更新和刪除
結構化流式處理不處理非追加的輸入,如果在用作源的表上進行了任何修改,則引發異常。有兩種主要策略可以處理無法自動向下游傳播的更改:
您可以刪除輸出和檢查點,并從頭開始重新啟動流。
您可以設置以下兩個選項之一:
ignoreDeletes:忽略在分區邊界刪除數據的事務。
ignoreChanges:如果由于更新、合并、刪除(在分區內)或覆蓋等數據更改操作而必須重寫源表中的文件,則重新處理更新。未更改的行可能仍會發出,因此您的下游使用者應該能夠處理重復項。刪除不會傳播到下游。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,則源表的刪除或更新不會中斷流。
示例
例如,假設您有一個表user_events,其中包含date、user_email和action列,這些列是按日期分區的。由于GDPR的原因,您需要從user_events表中刪除數據。
當您在分區邊界處執行刪除(即,WHERE在分區列上)操作時,文件已經按值分段,因此刪除操作只是從元數據中刪除這些文件。因此,如果只想從某些分區刪除數據,可以使用:
Scala
%spark
events.readStream
.format("delta")
.option("ignoreDeletes", "true")
.load("/mnt/delta/user_events")
但是,如果您必須基于user_email刪除數據,則需要使用:
Scala
%spark
events.readStream
.format("delta")
.option("ignoreChanges", "true")
.load("/mnt/delta/user_events")
如果使用update語句更新user_email,則會重寫包含user_email相關的文件。使用ignoreChanges時,新記錄將與位于同一文件中的所有其他未更改記錄一起傳播到下游。邏輯應該能夠處理這些傳入的重復記錄。
指定初始位置
該功能在Databricks Runtime 7.3 LTS及更高版本上可用。
您可以使用以下選項來指定Delta Lake流式處理源的起點,而無需處理整個表。
startingVersion:Delta Lake版本開始。從該版本(包括該版本)開始的所有表更改都將由流式處理源讀取。您可以從命令“ DESCRIBE HISTORY events”輸出的version列中獲取提交版本。
要僅返回最新更改,請在Databricks Runtime 7.4及更高版本中指定latest。
startingTimestamp:開始的時間戳。流式處理源將讀取在時間戳(包括時間戳)或之后提交的所有表更改。可以是以下任何一項:
'2018-10-18T22:15:12.013Z',即可以轉換為時間戳的字符串
cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18',即日期字符串
本身就是時間戳或可強制轉換為時間的任何其他表達式,如:current_timestamp() - interval 12 hour sdate_sub(current_date(), 1)
您不能同時設置兩個選項。您只需要使用其中之一個選項即可。它們僅在啟動新的流查詢時才生效。如果流式處理查詢已啟動并且進度已記錄在其檢查點中,則將忽略這些選項。
盡管可以從指定的版本或時間戳啟動流式處理源,但是流式處理源的架構始終是Delta表的最新架構。您必須確保在指定的版本或時間戳記之后,不對Delta表進行不兼容的架構更改。否則,當使用不正確的架構讀取數據時,流式傳輸源可能會返回不正確的結果。
示例
例如,假設您有一個表格User_events。如果要閱讀版本5之后的更改,可以使用:
Scala
%spark
events.readStream
.format("delta")
.option("startingVersion", "5")
.load("/mnt/delta/user_events")
如果您想閱讀自2018年10月18日以來的更改,可以使用:
Scala
%spark
events.readStream
.format("delta")
.option("startingTimestamp", "2018-10-18")
.load("/mnt/delta/user_events")
用作接收器的Delta表
您也可以使用結構化流將數據寫入Delta表。事務日志使Delta Lake能夠保證"僅一次"處理,即使針對該表同時運行其他流或批查詢。
追加模式
默認情況下,流以追加模式運行,這會將新記錄添加到表中。
您可以使用路徑方法:
Python
%pyspark
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
.start("/delta/events")
Scala
%spark
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
.start("/mnt/delta/events")
或表格方法:
Python
%pyspark
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
.table("events")
Scala
%spark
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
.table("events")
完整模式
您還可以使用結構化流式處理技術將整個批替換為每個批。一個示例用例是使用聚合來計算摘要:
Scala
%spark
spark.readStream
.format("delta")
.load("/mnt/delta/events")
.groupBy("customerId")
.count()
.writeStream
.format("delta")
.outputMode("complete")
.option("checkpointLocation", "/mnt/delta/eventsByCustomer/_checkpoints/streaming-agg")
.start("/mnt/delta/eventsByCustomer")
前面的示例不斷更新包含客戶的事件總數的表。
對于延遲要求更寬松的應用程序,您可以使用一次性觸發器來節省計算資源。使用這些更新按給定的時間表更新匯總聚合表,僅處理自上次更新以來已到達的新數據。