本文為您介紹Flink批處理的一些基本原理和配置調優。
背景信息
作為支持流處理和批處理的統一計算框架,Flink能夠同時處理兩種不同的數據模式。盡管Flink在流處理和批處理模式下共享許多核心執行機制,但兩種模式在作業執行機制、配置參數和性能調優方面存在一些關鍵差異。本文將專門針對Flink批處理作業,為您介紹其獨特的執行機制、配置參數。通過深入理解這些差異,您將能夠更加高效地對作業進行調優,以及排查和解決在使用Flink批處理作業中遇到的問題。
實時計算Flink版也對Flink批處理進行了專門的支持,提供了作業開發、作業運維、作業編排、資源隊列管理、數據結果探查等能力,您可以通過Flink批處理快速入門快速地了解上手。
批作業和流作業的比較
在介紹Flink批處理作業的配置參數和調優方法之前,首先需要了解Flink批處理與流處理作業在執行機制上的差異。
執行模式
流處理作業:流處理模式專注于處理持續不斷的無界數據流,其核心在于實現低延遲的數據處理。在這種模式下,數據會以流水線模式在節點間即時傳遞并被處理。因此,流處理作業所有節點的子任務會同時部署和執行。
批處理作業:批處理模式專注于處理有界數據集,重點在于提供高吞吐量的數據處理。在這種執行模式下,作業通常由多個階段組成,互不依賴的階段可以并行執行,以提高資源利用率;對于存在數據依賴的階段,下游任務需等待上游任務完成后才能啟動。
數據傳輸
流處理作業:為了實現低延遲,流處理作業的中間數據保留在內存中并直接通過網絡進行傳輸,不會持久化。如果下游節點處理能力不足,則可能會導致上游節點遭遇反壓。
批處理作業:批處理作業的中間結果會寫入到外部存儲系統中以供下游使用。默認情況下,這些結果文件存儲在TaskManager的本地磁盤;如果使用遠端Shuffle服務,則數據文件會存儲在遠端Shuffle服務中。
資源需求
流處理作業:流處理作業在啟動時需要預先分配所有資源,以確保所有子任務能夠同時部署并運行。
批處理作業:批處理作業在運行時不需要一次性獲取所有資源。Flink可以分批調度輸入數據已經就緒的任務,從而能夠更高效地利用現有資源,即使在資源受限(甚至單Slot)的情況下也能順利執行。
任務失敗重啟
流處理作業:流處理作業在遇到故障時可以從最近的檢查點或保存點恢復,這樣作業進度回退的程度較小。但因為中間結果不持久化,恢復時需要重新啟動所有任務。
批處理作業:批處理作業的中間結果數據會落盤,因此當任務出錯重啟時,這些中間結果能夠被再次利用,這意味著只需重啟失敗的任務以及它的下游任務即可,無需全局回溯。這樣可以減少因故障而需要重新執行的任務數,提高恢復效率。不過由于批作業沒有檢查點機制,這些重啟的任務需要從頭開始運行。
關鍵配置參數及調優方式
本章節將為您介紹Flink批作業的關鍵配置。
資源配置
CPU和Memory
在作業的資源配置窗口中,您可以為作業設置單個JobManager和TaskManager的CPU和內存資源,以下是一些配置建議:
JobManager資源配置:建議為JobManager分配1個CPU核心和至少4 GiB的內存資源,以確保其順利執行作業調度與管理。
TaskManager資源配置:建議根據Slot數量分配相應的資源。具體來說,建議為每個Slot配備1個CPU核心和4 GiB內存。如果一個TaskManager擁有n個Slot,那么總共應為其分配n個CPU核心和4n*GiB內存。
實時計算引擎中的批處理作業默認為每個TaskManager分配一個Slot。為了降低調度和管理TaskManager的開銷,您可以考慮將每個TaskManager的Slot數量增加到2或4。
然而需要注意的是,每個TaskManager的可用磁盤空間是有限的,與其分配的CPU核心數是成比例的。具體來說,會給每個CPU核心配額20 GiB的磁盤空間。TaskManager最低磁盤空間為20 GiB,最大磁盤空間為200 GiB。
因此增加每個TaskManager上的Slot數量意味著更多的任務將在同一TaskManager節點上運行,這可能會加劇本地磁盤空間的緊張狀況,甚至可能導致磁盤空間不足。如果磁盤空間不足,則會導致作業失敗并重啟。
對于規模較大或拓撲結構復雜的作業,JobManager和TaskManager可能需要更高規格的資源配置。在這些情況下,應根據作業的具體需求適當提高資源配置,以確保作業能夠高效且穩定地運行。
此外,如果您在作業執行過程中遇到資源相關的問題,可以參考此文檔進行故障診斷和解決:
Apache Flink Memory Troubleshooting。
為了保證作業穩定運行,每個JobManager和TaskManager至少需要配置0.5個CPU核心和2 GiB內存。
最大Slot個數
配置Flink作業允許分配的最大slot數量。由于Flink批作業在資源受限的情況下也可以運行,在資源受限的環境中,通過設置最大Slot數量,可以限制Flink批作業所使用的最大資源量。這有助于避免批作業占用過多資源,從而影響其他作業的運行。
并行度配置
在作業的資源配置中,支持為作業設置全局并行度或自動推斷并行度。
全局并行度:全局并行度決定作業中任務的最大并行執行數量。您可以直接在頁面上填寫作業的并行度,作業將使用該值作為全局默認并行度。
自動推斷:配置自動推斷后,Flink批作業將通過分析每個節點的消費總數據量和每個子任務期望處理的平均數據量來自動推導并行度,幫助您優化并行度配置。
此外,實時計算引擎VVR 8.0及以上版本提供了以下配置項(作業運行參數配置區域配置),使您能夠對自動并行度推導進行更精細的調優:
在實時計算引擎VVR 8.0及以上版本中,Flink批作業默認開啟自動推導并行度功能,并使用您配置的全局并行度作為自動推導并行度的上限。建議您使用實時計算引擎VVR 8.0及以上版本,以獲得Flink批處理作業更優的性能表現。
配置項 | 說明 | 默認值 |
execution.batch.adaptive.auto-parallelism.enabled | 是否啟用自動并行度推導。 | true |
execution.batch.adaptive.auto-parallelism.min-parallelism | 允許自動設置的并行度最小值。 | 1 |
execution.batch.adaptive.auto-parallelism.max-parallelism | 允許自動設置的并行度的最大值。如果未配置此參數,將采用全局并行度作為默認值。 | 128 |
execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task | 期望每個任務平均處理的數據量大小。Flink將根據此配置和節點實際需要處理的數據量來動態決定節點并行度。 | 16MiB |
execution.batch.adaptive.auto-parallelism.default-source-parallelism | Source算子的默認并行度。目前Flink無法很好的感知到Source節點要讀取的數據量,因此需要您自行配置其并行度。如果未配置,會取全局并行度。 | 1 |
常見問題
并行度與Slot數的區別
并行度是指作業中能夠同時執行的任務實例的最大數量。它反映了作業處理能力的理論上限。Slot是Flink作業中的資源分配單元,Slot的數量決定了Flink作業能夠同時處理多少個任務實例。
由于流處理作業一次性獲取全部的資源來同時運行所有任務,在默認開啟Slot-sharing的情況下,申請的Slot數量通常與作業全局并行度數一致,以確保所有任務都能獲得必要的資源。
批處理作業處理的是有限數據集,不需要所有任務一次性獲取所有資源。全局并行度表示作業各節點的最大并行任務數,而實際的并行執行數量取決于當前可用的Slot。
批作業運行卡住如何定位
您可以參見查看作業性能文檔了解如何監控TaskManager的內存、CPU和線程使用情況。
內存問題排查:首先檢查內存使用情況,判斷是否存在內存不足導致的頻繁垃圾回收(GC)。如果確認存在內存不足,應增加TaskManager的內存配置,以減少因頻繁垃圾回收導致的性能問題。
CPU使用分析:檢查是否存在個別線程占用了大量CPU資源,這可能是導致作業卡頓的原因。
線程棧跟蹤:利用線程棧信息,分析當前節點運行的瓶頸所在。
報錯No space left on device
當您在實時計算引擎中運行批處理作業時,如果遇到No space left on device的報錯,這通常意味著TaskManager用于存儲中間結果文件的本地磁盤空間已被耗盡。每個TaskManager的可用磁盤空間是有限的,與其分配的CPU核心數是成比例的。具體來說,會給每個CPU核心配額20GiB的磁盤空間。TaskManager最低磁盤空間為20GiB,最大磁盤空間為200GiB。
解決建議:
減少每個TaskManager上的Slot數量,可以降低單個節點上的并行任務數,從而減少對本地磁盤空間的需求。
提高TaskManager的CPU核心數,從而提高TaskManager的磁盤空間大小。
相關文檔
利用實時計算Flink版關鍵功能進行數據批處理快速入門,請參見Flink批處理快速入門。
運行參數配置方法,請參見如何配置作業運行參數?