本文圍繞Flink SQL實時數據處理中的Changelog事件亂序問題,分析了Flink SQL中Changelog事件亂序問題的原因,并提供了解決方案以及處理Changelog事件亂序的建議。以幫助您更好地理解Changelog的概念和應用,更加高效地使用Flink SQL進行實時數據處理。
Flink SQL中的Changelog
Changelog介紹
在關系數據庫領域,MySQL使用binlog(二進制日志)記錄數據庫中所有修改操作,包括INSERT、UPDATE和DELETE操作。類似地,Flink SQL中的Changelog主要記錄數據變化,以實現增量數據處理。
在MySQL中,binlog可以用于數據備份、恢復、同步和復制。通過讀取和解析binlog中的操作記錄,可以實現增量數據同步和復制。變更數據捕獲(CDC)作為一種常用的數據同步技術,常被用于監控數據庫中的數據變化,并將其轉換為事件流進行實時處理。CDC工具可用于將關系數據庫中的數據變化實時傳輸到其他系統或數據倉庫,以支持實時分析和報告。當前常用的CDC工具包括Debezium和Maxwell。Flink通過FLINK-15331支持了CDC,可以實時地集成外部系統的CDC數據,并實現實時數據同步和分析。
Changelog事件生成和處理
Changelog介紹中提到的binlog和CDC是與Flink集成的外部Changelog數據源,Flink SQL內部也會生成Changelog數據。為了區分事件是否為更新事件,我們將僅包含INSERT類型事件的Changelog稱為追加流或非更新流,而同時包含其他類型(例如UPDATE)事件的Changelog稱為更新流。Flink中的一些操作(如分組聚合和去重)可以產生更新事件,生成更新事件的操作通常會使用狀態,這類操作被稱為狀態算子。需要注意的是,并非所有狀態算子都支持處理更新流。例如,Over窗口聚合和Interval Join暫不支持更新流作為輸入。
實時計算引擎VVR 6.0及以上版本的Query操作,對應的運行時算子、是否支持處理更新流消費以及是否產生更新,詳情請參見Query操作運行時信息說明。
Changelog的事件類型
FLINK-6047引入了回撤機制,使用INSERT和DELETE兩種事件類型(盡管數據源僅支持INSERT事件),實現了流SQL算子的增量更新算法。FLINK-16987以后,Changelog事件類型被重構為四種類型(如下),形成一個完整的Changelog事件類型體系,便于與CDC生態系統連接。
/**
* A kind of row in a Changelog.
*/
@PublicEvolving
public enum RowKind {
/**
* Insertion operation.
*/
INSERT,
/**
* Previous content of an updated row.
*/
UPDATE_BEFORE,
/**
* New content of an updated row.
*/
UPDATE_AFTER,
/**
* Deletion operation.
*/
DELETE
}
Flink不使用包含UPDATE_BEFORE和UPDATE_AFTER的復合UPDATE事件類型的原因主要有兩個方面:
拆分的事件無論是何種事件類型(僅RowKind不同)都具有相同的事件結構,這使得序列化更簡單。如果使用復合UPDATE事件,那么事件要么是異構的,要么是INSERT或DELETE事件對齊UPDATE事件(例如,INSERT事件僅含有UPDATE_AFTER,DELETE事件僅含有UPDATE_BEFORE)。
在分布式環境下,經常涉及數據shuffle(例如Join、聚合)。即使使用復合UPDATE事件,有時仍需將其拆分為單獨的DELETE和INSERT事件進行shuffle,例如下面的示例。
示例
下面是一個復合UPDATE事件必須拆分為DELETE和INSERT事件的場景示例。本文后續也將圍繞此SQL作業示例討論Changelog事件亂序問題并提供相應的解決方案。
-- CDC source tables: s1 & s2
CREATE TEMPORARY TABLE s1 (
id BIGINT,
level BIGINT,
PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
CREATE TEMPORARY TABLE s2 (
id BIGINT,
attr VARCHAR,
PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
-- sink table: t1
CREATE TEMPORARY TABLE t1 (
id BIGINT,
level BIGINT,
attr VARCHAR,
PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
-- join s1 and s2 and insert the result into t1
INSERT INTO t1
SELECT
s1.*, s2.attr
FROM s1 JOIN s2
ON s1.level = s2.id;
假設源表s1中id為1的記錄的Changelog在時間t0插入(id=1, level=10)
,然后在時間t1將該行更新為(id=1, level=20)
。這對應三個拆分事件:
s1 | 事件類型 |
+I(id=1,level=10) | INSERT |
-U(id=1,level=10) | UPDATE_BEFORE |
+U(id=1,level=20) | UPDATE_AFTER |
源表s1的主鍵是id,但Join操作需要按level列進行shuffle(見子句ON)。
如果Join算子的并發數為2,那么以上三個事件可能會被發送到兩個任務中。即使使用復合UPDATE事件,它們也需要在shuffle階段拆分,來保證數據的并行處理。
Changelog事件亂序問題
亂序原因
假設示例中表s2已有兩行數據進入Join算子(+I(id=10,attr=a1),+I(id=20,attr=b1)),Join運算符從表s1新接收到三個Changelog事件。在分布式環境中,實際的Join在兩個任務上并行處理,下游算子(示例中為Sink任務)接收的事件序列可能情況如下所示。
情況1 | 情況2 | 情況3 |
+I (id=1,level=10,attr='a1') -U (id=1,level=10,attr='a1') +U (id=1,level=20,attr='b1') | +U (id=1,level=20,attr='b1') +I (id=1,level=10,attr='a1') -U (id=1,level=10,attr='a1') | +I (id=1,level=10,attr='a1') +U (id=1,level=20,attr='b1') -U (id=1,level=10,attr='a1') |
情況1的事件序列與順序處理中的事件序列相同。情況2和情況3顯示了Changelog事件在Flink SQL中到達下游算子時的亂序情況。亂序情況可能會導致不正確的結果。在示例中,結果表聲明的主鍵是id,外部存儲進行upsert更新時,在情況2和3中,如果沒有其他措施,將從外部存儲不正確地刪除id=1的行,而期望的結果是(id=1, level=20, attr='b1')
。
使用SinkUpsertMaterializer解決
在示例中,Join操作生成更新流,其中輸出包含INSERT事件(+I)和UPDATE事件(-U和+U),如果不正確處理,亂序可能會導致正確性問題。
唯一鍵與upsert鍵
唯一鍵是指SQL操作后滿足唯一約束的列或列組合。在本示例中(s1.id)、(s1.id, s1.level)和(s1.id, s2.id)這三組都是唯一鍵。
Flink SQL的Changelog參考了binlog機制,但實現方式更加簡潔。Flink不再像binlog一樣記錄每個更新的時間戳,而是通過planner中的全局分析來確定主鍵接收到的更新歷史記錄的排序。如果某個鍵維護了唯一鍵的排序,則對應的鍵稱為upsert鍵。對于存在upsert鍵的情況,下游算子可以正確地按照更新歷史記錄的順序接收upsert鍵的值。如果shuffle操作破壞了唯一鍵的排序,upsert鍵將為空,此時下游算子需要使用一些算法(例如計數算法)來實現最終的一致性。
在示例中,表s1中的行根據列level進行shuffle。Join生成多個具有相同s1.id的行,因此Join輸出的upsert鍵為空(即Join后唯一鍵上不存在排序)。此時,Flink需存儲所有輸入記錄,然后檢查比較所有列以區分更新和插入。
此外,結果表的主鍵為列id。Join輸出的upsert鍵與結果表的主鍵不匹配,需要進行一些處理將Join輸出的行進行正確轉換為結果表所需的行。
SinkUpsertMaterializer
根據唯一鍵與upsert鍵的內容,當Join輸出的是更新流且其upsert鍵與結果表主鍵不匹配時,需要一個中間步驟來消除亂序帶來的影響,以及基于結果表的主鍵產生新的主鍵對應的Changelog事件。Flink在Join算子和下游算子之間引入了SinkUpsertMaterializer算子(FLINK-20374)。
結合亂序原因中的Changelog事件,可以看到Changelog事件亂序遵循著一些規則。例如,對于一個特定的upsert鍵(或upsert鍵為空則表示所有列),事件ADD(+I、+U)總是在事件RETRACT(-D、-U)之前發生;即使涉及到數據shuffle,相同upsert鍵的一對匹配的Changelog事件也總是被相同的任務處理。這些規則也說明了為什么示例僅存在亂序原因中三個Changelog事件的組合。
SinkUpsertMaterializer就是基于上述規則實現的,其工作原理如下圖所示。SinkUpsertMaterializer在其狀態中維護了一個RowData列表。當SinkUpsertMaterializer被觸發,在處理輸入行時,它根據推斷的upsert鍵或整個行(如果upsert鍵為空)檢查狀態列表中是否存在相同的行。在ADD的情況下添加或更新狀態中的行,在RETRACT的情況下從狀態中刪除行。最后,它根據結果表的主鍵生成Changelog事件,更多詳細信息請參見SinkUpsertMaterializer源代碼。
通過SinkUpsertMaterializer,將示例中Join算子輸出的Changelog事件處理并轉換為結果表主鍵對應的Changelog事件,結果如下圖所示。根據SinkUpsertMaterializer的工作原理,在情況2中,處理-U(id=1,level=10,attr='a1')
時,會將最后一行從狀態中移除,并向下游發送倒數第二行;在情況3中,當處理+U (id=1,level=20,attr='b1')
時,SinkUpsertMaterializer會將其原樣發出,而當處理-U(id=1,level=10,attr='a1')
時,將從狀態中刪除行而不發出任何事件。最終,通過SinkUpsertMaterializer算子情況2和3也會得到期望結果 (id=1,level=20,attr='b1')
。
常見場景
觸發 SinkUpsertMaterializer
算子的常見場景如下:
結果表定義主鍵,而寫入該結果表的數據丟失了唯一性。通常包括但不限于以下操作:
源表缺少主鍵,而結果表卻設置了主鍵。
向結果表插入數據時,忽略了主鍵列的選擇,或錯誤地使用了源表的非主鍵數據填充結果表的主鍵。
源表的主鍵數據在轉換或經過分組聚合后出現精度損失。例如,將BIGINT類型降為INT類型。
對源表的主鍵列或經過分組聚合之后的唯一鍵進行了運算,如數據拼接或將多個主鍵合并為單一字段。
CREATE TABLE students ( student_id BIGINT NOT NULL, student_name STRING NOT NULL, course_id BIGINT NOT NULL, score DOUBLE NOT NULL, PRIMARY KEY(student_id) NOT ENFORCED ) WITH (...); CREATE TABLE performance_report ( student_info STRING NOT NULL PRIMARY KEY NOT ENFORCED, avg_score DOUBLE NOT NULL ) WITH (...); CREATE TEMPORARY VIEW v AS SELECT student_id, student_name, AVG(score) AS avg_score FROM students GROUP BY student_id, student_name; -- 將分組聚合后的key進行拼接當作主鍵寫入結果表,但實際上已經丟失了唯一性約束 INSERT INTO performance_report SELECT CONCAT('id:', student_id, ',name:', student_name) AS student_info, avg_score FROM v;
結果表的確立依賴于主鍵的設定,然而在數據輸入過程中,其原有的順序性卻遭到破壞。例如本文的示例,雙流Join時若一方數據未通過主鍵與另一方關聯,而結果表的主鍵列又是基于另一方的主鍵列生成的,這便可能導致數據順序的混亂。
明確配置了
table.exec.sink.upsert-materialize
參數為'FORCE'
,配置詳情請參見下方的參數設置。
使用建議
正如前面所提到的,SinkUpsertMaterializer在其狀態中維護了一個RowData列表。這可能會導致狀態過大并增加狀態訪問I/O的開銷,最終影響作業的吞吐量。因此,應盡量避免使用它。
參數設置
SinkUpsertMaterializer可以通過table.exec.sink.upsert-materialize進行配置:
auto(默認值):Flink會從正確性的角度推斷出亂序是否存在,如果必要的話,則會添加SinkUpsertMaterializer。
none:不使用。
force:強制使用。即便結果表的DDL未指定主鍵,優化器也會插入SinkUpsertMaterializer狀態節點,以確保數據的物理化處理。
需要注意的是,設置為auto并不一定意味著實際數據是亂序的。例如,使用grouping sets語法結合coalesce轉換null值時,SQL planner可能無法確定由grouping sets與coalesce組合生成的upsert鍵是否與結果表的主鍵匹配。出于正確性的考慮,Flink將添加SinkUpsertMaterializer。如果一個作業可以在不使用SinkUpsertMaterializer的情況下生成正確的輸出,建議設置為none。
避免使用SinkUpsertMaterializer
為了避免使用SinkUpsertMaterializer,您可以:
確保在進行去重、分組聚合等操作時,所使用的分區鍵要與結果表的主鍵相同。
如果下游算子與上游的去重、分組聚合或其他算子相連,且在VVR 6.0以下版本中沒有出現數據準確性問題,那么可以參考原資源配置,并將table.exec.sink.upsert-materialize更改為none,將作業遷移到實時計算引擎VVR 6.0及以上版本,引擎升級請參見作業引擎版本升級。
若必須使用SinkUpsertMaterializer,需注意以下事項:
避免在寫入結果表時添加由非確定性函數(如CURRENT_TIMESTAMP、NOW)生成的列,可能會導致Sink輸入在沒有upsert鍵時,SinkUpsertMaterializer的狀態異常膨脹。
如果已出現SinkUpsertMaterializer算子存在大狀態的情況并影響了性能,請考慮增加作業并發度,操作步驟請參見配置作業資源。
使用注意事項
SinkUpsertMaterializer雖然解決了Changelog事件亂序問題,但可能引起持續狀態增加的問題。主要原因有:
狀態有效期過長(未設置或設置過長的狀態TTL)。但如果TTL設置過短,可能會導致FLINK-29225中描述的問題,即本應刪除的臟數據仍保留在狀態中。當消息的DELETE事件與其ADD事件之間的時間間隔超過配置的TTL時會出現這種情況,此時,Flink會在日志中產生一條如下警告信息。
int index = findremoveFirst(values, row); if (index == -1) { LOG.info(STATE_CLEARED_WARN_MSG); return; }
您可以根據業務需要設置合理的TTL,具體操作請參見運行參數配置,實時計算VVR 8.0.7及以上版本,支持為不同算子設置不同TTL,進一步節約大狀態作業的使用資源,具體操作請參見配置算子并發、Chain策略和TTL。
當SinkUpsertMaterializer輸入的更新流無法推導出upsert鍵,并且更新流中存在非確定性列時,將無法正確刪除歷史數據,這會導致狀態持續增加。
相關文檔
實時計算引擎版本和Apache Flink版本對應關系,詳情請參見功能發布記錄。