日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

表流讀寫

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

說明

詳細內容請參考Databricks官網文章:表流讀寫

有關演示這些功能的Databricks筆記本,請參閱入門筆記本二

Delta Lake通過readStream和writeStream與Spark結構化流式處理深度集成。Delta Lake克服了許多流式處理系統和文件相關的常見限制,例如:

  • 合并低延遲引入產生的小文件

  • 保持多個流(或并發批處理作業)執行“僅一次”處理

  • 使用文件作為流源時,可以有效地發現哪些文件是新文件

Delta表作為流源

當您將Delta表加載為流源并在流式查詢中使用它時,該查詢將處理表中存在的所有數據以及流啟動后到達的所有新數據。

您可以將路徑和表都作為流加載。

Scala

%spark
spark.readStream.format("delta").load("/mnt/delta/events")

Scala

%spark
spark.readStream.format("delta").table("events")

你也可以執行以下操作:

  • 通過設置maxFilesPerTrigger選項,控制Delta Lake提供給流的任何微批處理的最大大小。這指定了每個觸發器中要考慮的新文件的最大數量。默認值為1000。

  • 通過設置maxBytesPerTrigger選項來限制每個微批處理的數據量的速率。這將設置一個“軟最大值”,這意味著批處理大約此數量的數據,并可能處理超過該限制的數據量。如果你使用Trigger。如果Trigger.Once用于流式傳輸,則忽略此選項。如果將此選項與maxFilesPerTrigger結合使用,則微批處理將處理數據,直到達到maxFilesPerTrigger或maxBytesPerTrigger限制。

忽略更新和刪除

結構化流式處理不處理非追加的輸入,如果在用作源的表上進行了任何修改,則引發異常。有兩種主要策略可以處理無法自動向下游傳播的更改:

  • 您可以刪除輸出和檢查點,并從頭開始重新啟動流。

  • 您可以設置以下兩個選項之一:

    • ignoreDeletes:忽略在分區邊界刪除數據的事務。

    • ignoreChanges:如果由于更新、合并、刪除(在分區內)或覆蓋等數據更改操作而必須重寫源表中的文件,則重新處理更新。未更改的行可能仍會發出,因此您的下游使用者應該能夠處理重復項。刪除不會傳播到下游。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,則源表的刪除或更新不會中斷流。

示例

例如,假設您有一個表user_events,其中包含date、user_email和action列,這些列是按日期分區的。由于GDPR的原因,您需要從user_events表中刪除數據。

當您在分區邊界處執行刪除(即,WHERE在分區列上)操作時,文件已經按值分段,因此刪除操作只是從元數據中刪除這些文件。因此,如果只想從某些分區刪除數據,可以使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("ignoreDeletes", "true")
  .load("/mnt/delta/user_events")

但是,如果您必須基于user_email刪除數據,則需要使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("ignoreChanges", "true")
  .load("/mnt/delta/user_events")

如果使用update語句更新user_email,則會重寫包含user_email相關的文件。使用ignoreChanges時,新記錄將與位于同一文件中的所有其他未更改記錄一起傳播到下游。邏輯應該能夠處理這些傳入的重復記錄。

指定初始位置

說明

該功能在Databricks Runtime 7.3 LTS及更高版本上可用。

您可以使用以下選項來指定Delta Lake流式處理源的起點,而無需處理整個表。

  • startingVersion:Delta Lake版本開始。從該版本(包括該版本)開始的所有表更改都將由流式處理源讀取。您可以從命令“ DESCRIBE HISTORY events”輸出的version列中獲取提交版本。

    • 要僅返回最新更改,請在Databricks Runtime 7.4及更高版本中指定latest。

  • startingTimestamp:開始的時間戳。流式處理源將讀取在時間戳(包括時間戳)或之后提交的所有表更改。可以是以下任何一項:

    • '2018-10-18T22:15:12.013Z',即可以轉換為時間戳的字符串

    • cast('2018-10-18 13:36:32 CEST' as timestamp)

    • '2018-10-18',即日期字符串

    • 本身就是時間戳或可強制轉換為時間的任何其他表達式,如:current_timestamp() - interval 12 hour sdate_sub(current_date(), 1)

您不能同時設置兩個選項。您只需要使用其中之一個選項即可。它們僅在啟動新的流查詢時才生效。如果流式處理查詢已啟動并且進度已記錄在其檢查點中,則將忽略這些選項。

警告

盡管可以從指定的版本或時間戳啟動流式處理源,但是流式處理源的架構始終是Delta表的最新架構。您必須確保在指定的版本或時間戳記之后,不對Delta表進行不兼容的架構更改。否則,當使用不正確的架構讀取數據時,流式傳輸源可能會返回不正確的結果。

示例

例如,假設您有一個表格User_events。如果要閱讀版本5之后的更改,可以使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("startingVersion", "5")
  .load("/mnt/delta/user_events")

如果您想閱讀自2018年10月18日以來的更改,可以使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/mnt/delta/user_events")

用作接收器的Delta表

您也可以使用結構化流將數據寫入Delta表。事務日志使Delta Lake能夠保證"僅一次"處理,即使針對該表同時運行其他流或批查詢。

追加模式

默認情況下,流以追加模式運行,這會將新記錄添加到表中。

您可以使用路徑方法:

Python

%pyspark
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events")

Scala

%spark
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .start("/mnt/delta/events")

或表格方法:

Python

%pyspark
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .table("events")

Scala

%spark
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .table("events")

完整模式

您還可以使用結構化流式處理技術將整個批替換為每個批。一個示例用例是使用聚合來計算摘要:

Scala

%spark
spark.readStream
  .format("delta")
  .load("/mnt/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/mnt/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/mnt/delta/eventsByCustomer")
從結構化的輸入流中讀取數據,經過處理后結構化流輸出到delta文件

前面的示例不斷更新包含客戶的事件總數的表。

對于延遲要求更寬松的應用程序,您可以使用一次性觸發器來節省計算資源。使用這些更新按給定的時間表更新匯總聚合表,僅處理自上次更新以來已到達的新數據。