本文為您介紹實時計算Flink全托管的復雜事件處理(CEP)語句的詳情。
背景信息
相較于Apache Flink的CEP SQL,實時計算Flink版在其基礎上進行了增強。例如,支持輸出超時匹配、支持松散連接(followedBy)、支持指定事件之間的連續性等。關于Apache Flink CEP SQL的基本能力,詳情請參見Pattern Recognition。
使用限制
僅實時計算引擎vvr-6.0.2-flink-1.15及以上版本支持CEP SQL擴展語法。
僅實時計算引擎vvr-6.0.5-flink-1.15及以上版本支持組合模式和NO SKIP語法。
輸出超時匹配
假如有以下輸入事件序列:
+----+------+------------------+
| id | type | rowtime |
+----+------+------------------+
| 1 | A | 2022-09-19 12:00 |
| 2 | B | 2022-09-19 12:01 |
| 3 | A | 2022-09-19 12:02 |
| 4 | B | 2022-09-19 12:05 |
+----+------+------------------+
對于模式A B
,如果我們需要限制整個模式的匹配時間跨度在2分鐘之內,則可以在PATTERN語句之后聲明WITHIN INTERVAL '2' MINUTES
,代碼示例如下。
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
A.id AS aid,
B.id AS bid,
A.rowtime AS atime,
B.rowtime AS btime
PATTERN (A B) WITHIN INTERVAL '2' MINUTES
DEFINE
A AS type = 'A',
B AS type = 'B'
) AS T
如果不考慮WITHIN限制,我們可以得到兩個匹配id=1, id=2
、id=3, id=4
。加入WITHIN描述后,第二條匹配結果中A和B事件之間的時間間隔為3分鐘,超過了WITHIN限制中定義的2分鐘,該SQL最終得到的結果中只包含了前一個匹配結果,如下所示。
+-----+-----+------------------+------------------+
| aid | bid | atime | btime |
+-----+-----+------------------+------------------+
| 1 | 2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
+-----+-----+------------------+------------------+
當定義WITHIN語句后,超過時間限制的部分匹配序列會被作為匹配失敗的事件序列丟棄。如果需要輸出超時的事件匹配序列(在時間限制內未完全匹配的事件序列),則可以使用ONE ROW PER MATCH SHOW TIMEOUT MATCHES
語句,例如可以使用如下SQL語句來輸出超時序列。
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
A.id AS aid,
B.id AS bid,
A.rowtime AS atime,
B.rowtime AS btime
ONE ROW PER MATCH SHOW TIMEOUT MATCHES
PATTERN (A B) WITHIN INTERVAL '2' MINUTES
DEFINE
A AS type = 'A',
B AS type = 'B'
) AS T
該語句將輸出未匹配的序列,輸出結果如下。
+-----+--------+------------------+------------------+
| aid | bid | atime | btime |
+-----+--------+------------------+------------------+
| 1 | 2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
| 3 | <NULL> | 2022-09-19 12:00 | <NULL> |
+-----+--------+------------------+------------------+
由于id=4的B事件已經超過了WITHIN時間限制,并不會包括在匹配序列中,因此得到的bid和btime均為空值。
事件之間的連續性
開源Flink CEP Java API中支持通過嚴格連續(next()
)、松散連續(followedBy()
)、非確定性松散連續(followedByAny()
)、嚴格非連續(notNext()
)、松散非連續(notFollowedBy()
)等方式定義事件之間的連續性策略。
在Flink CEP SQL中默認使用的嚴格連續策略,即滿足模式匹配的相鄰事件之間需要嚴格連續,連續的輸入事件之間不能存在非匹配的事件。例如上述例子中的模式 (A B) 指定A和B事件之間需要緊密相連。阿里云實時計算Flink版對此進行了擴展,支持與Java API完全對等的表達能力。
例如對于a1, b1, a2, a3, b2, b3
輸入事件序列,不同模式對應的匹配序列如下表所示。
匹配過程中,After Match子句使用的是SKIP TO NEXT ROW策略,策略詳情請參見匹配后的策略。
Java API | SQL | 策略 | 匹配序列 |
|
| 嚴格連續:期望所有匹配的事件嚴格的一個接一個出現,中間沒有任何不匹配的事件。 |
|
|
其中C為DEFINE中未定義的符號,表示任意匹配。 | 松散連續:忽略匹配事件之間的不匹配事件。 |
|
|
其中C為DEFINE中未定義的符號,表示任意匹配。 | 非確定性松散連續:更進一步的松散連續,允許忽略掉一些匹配事件的附加匹配。 |
說明 本示例的結果基于CEP SQL默認的SKIP TO NEXT ROW策略,而Flink CEP Java API的默認策略為NO SKIP。如果您要使用NO SKIP,詳情請參見AFTER MATCH NO SKIP策略。 |
|
| 嚴格非連續:期望事件之后不緊接出現另一事件。 |
|
|
其中C為DEFINE中未定義的符號,表示任意匹配。 說明 在模式末尾使用notFollowedBy語法需要指定WITHIN條件。 | 松散非連續:期望一個事件不出現在兩個事件之間的任何地方,在模式末尾配合WITHIN條件使用表示一段時間之內沒有任何某類事件到來。 | 無匹配 |
循環模式中的連續性以及貪婪匹配
目前CEP SQL暫不支持循環模式中使用非確定性松散連續。
開源Flink CEP Java API支持定義循環模式匹配的連續性和貪婪策略,Apache Flink CEP SQL中默認使用嚴格連續且貪婪策略。例如A+匹配的多個A事件之間不允許有其他事件存在,并且匹配盡可能多的A事件。通過在循環量詞(例如*、+、{3, })后增加?符號來指定連續性和貪婪策略。
例如有事件序列a1, b1, a2, a3, c1,其中條件為A AS type = 'a', C AS type = 'a' or type = 'c'。針對不同的模式,對應的匹配序列如下表所示。
匹配過程中,After Match子句使用的是SKIP TO NEXT ROW策略,策略詳情請參見匹配后的策略。
標識符 | 連續性 | 貪婪性 | 示例模式 | 等效語義 | 匹配序列 |
無 | 嚴格連續 | 貪婪 |
|
|
|
? | 嚴格連續 | 非貪婪 |
|
|
|
?? | 松散連續 | 貪婪 |
|
|
|
??? | 松散連續 | 非貪婪 |
|
|
|
循環模式指定停止條件(Until)
開源Flink CEP Java API支持使用函數until(condition)
來為循環模式指定其停止條件,其效果是在匹配循環模式的過程中,若當前事件符合until條件,則立刻終止當前循環模式的匹配,并從當前事件開始匹配后續模式。在阿里云實時計算Flink版的SQL作業中可以在循環量詞如+
、*
、{3, }
后使用{ CONDITION }
語法來表達until語義。
例如針對事件序列a1, d1, a2, b1, a3, c1
,有匹配條件DEFINE A AS A.type = 'a' OR A.type = 'b', B AS B.type = 'b', C AS C.type = 'c'
,不同的模式對應的匹配序列如下表所示:
匹配過程中,使用AFTER MATCH SKIP TO NEXT ROW
策略,策略詳情請參見匹配后的策略。
模式 | 等效語義 | 匹配序列 | 說明 |
|
|
| 以a或b開頭的事件都能匹配A模式,A模式內部和AC之間為嚴格連續。由于a1、a2之間存在d1,無法從a1開始匹配 |
|
|
| A循環模式增加了until B條件,AC之間仍為嚴格連續。由于a2開始的循環模式需要在b1處結束,無法滿足與c1之間的嚴格連續要求。 |
|
|
| AC之間為松散連續,以a2開始的循環模式在b1處結束,并跳過b1、a3匹配c1。 |
|
|
| 循環模式A內部為松散連續,可跳過d1并結束于b1,匹配a1、a2。 |
組合模式(Group Pattern)
開源 Flink CEP Java API支持組合模式(group pattern),將多個模式組合為一個整體用在next()
、followedBy()
和followedByAny()
函數中,并支持整體的循環。在阿里云實時計算Flink版的SQL作業中使用SQL標準中的語法(...)
來定義組合模式,支持使用循環量詞如+
、*
、{3, }
等。
例如對于模式PATTERN (A (B C*)+? D)
,其中(B C*)
為一個組合模式,并指定該組合循環出現一次以上,?
表明為非貪婪匹配,其對應Java代碼如下:
Pattern.<String>begin("A").where(...)
.next(
Pattern.<String>begin("B").where(...)
.next("C").where(...).oneOrMore().optional().greedy().consecutive())
.oneOrMore().consecutive()
.next("D").where(...)
匹配成功時MEASURES
可以提取匹配結果中的某些部分信息來作為結果的一部分。例如某組合模式匹配到的結果為b1
,b2 c1
,b3 c2 c3
,則MEASURES
可以只獲取其中的一部分信息進行輸出(下文中的SQL所示),如果只關注事件b,則通過FIRST(B.id)
可以拿到第一組匹配結果的b,FIRST(B.id,1)
可以拿到第二組結果的b,以此類推,輸出的結果為b1,b2,b3
。
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
FIRST(B.id) AS b1_id,
FIRST(B.id,1) AS b2_id,
FIRST(B.id,2) AS b3_id
PATTERN (A (B C*)+? D)
DEFINE
A AS type = 'A',
B AS type = 'B',
C AS type = 'C',
D AS type = 'D'
) AS T
需要注意的是,組合模式與其前一個模式之間的連續性聲明作用于組合模式中的第一個模式而非整個組合模式。例如在PATTERN (A {- X*? -} (B C))
中模式A
和組合模式(B C)
之間使用了followedBy關系,則實際為聲明了A和B之間的followedBy關系,其執行效果為A
和(B C)
之間可以存在若干不匹配B
的事件,而非若干不匹配(B C)
的事件。如序列a1 b1 d1 b2 c1
對于該模式無輸出,因為b1
出現后匹配過程立刻進入組合模式(B C)
,而d1
無法匹配C
模式導致序列匹配失敗。
循環組合模式不支持貪婪匹配,例如
PATTERN ((A B)+)
。不支持在until和notNext的語法中使用組合模式,例如
PATTERN (A+{(B C)})
、PATTERN (A [^(B C)])
。組合模式的首模式不支持聲明為可選(optional),例如
PATTERN (A (B? C))
。
AFTER MATCH NO SKIP策略
Flink CEP Java API中After Match策略默認為NO_SKIP,CEP SQL中的默認策略則為SKIP_TO_NEXT_ROW。阿里云實時計算Flink版擴展了SQL標準中的AFTER MATCH語句,可通過AFTER MATCH NO SKIP
語句來聲明NO_SKIP策略,NO_SKIP策略在完成一條序列的匹配時,不會終止或丟棄其他已經開始的匹配過程。
NO_SKIP策略的一種常用場景為結合followedByAny跳過一些匹配事件進行更松散的匹配。例如對于序列a1 b1 b2 b3 c1
,PATTERN (A {- X* -} B {- Y*? -} C)
(等價于Pattern.begin("A").followedByAny("B").followedBy("C")
)在使用默認的AFTER MATCH SKIP TO NEXT ROW
時得到結果a1 b1 c1
,因為當a1 b1 c1
完成匹配時所有以a1
開頭的序列都會被丟棄,而使用AFTER MATCH NO SKIP
則能得到所有匹配序列a1 b1 c1
、a1 b2 c1
和a1 b3 c1
。