本文為您介紹CEP中規則的JSON格式相關信息。
目標人群
- 客戶風控平臺開發人員:對Flink CEP較熟悉的平臺研發人員應能快速學習本格式,并根據自身平臺需求判斷是否需要進一步封裝。
- 客戶風控策略人員:只熟悉具體策略但缺乏Java經驗的同學,在熟悉CEP概念的基礎上,也可快速上手本格式的使用來編寫新規則,使其在上線的風控作業中應用。
JSON格式定義
- 節點(Node)定義一個節點(Node)即一個完整的模式(Pattern),它包含如下屬性。
字段名 描述 類型 是否必填 備注 name Pattern名稱。 string 是 一個唯一的字符串。 說明 不同節點的名稱不能重復。type 該Node類型。 enum(string) 是 - 對于包含子Pattern的節點,該字段值為COMPOSITE。
- 對于無子Pattern的節點,該字段值為ATOMIC。
quantifier 量詞,用于描述如何匹配該Pattern,例如只匹配一次。 dict 是 請參見本文量詞(Quantifier)定義。 condition 條件。 dict 否 請參見本文條件(Condition)定義。 - 量詞(Quantifier)定義量詞的作用是描述對于滿足該Pattern的事件要如何匹配。例如模式
"A*"
對應的量詞properties為LOOPING,該Pattern內部的事件選擇策略為SKIP_TILL_ANY。字段名 描述 類型 是否必填 備注 consumingStrategy 事件選擇策略。 enum(string) 是 僅支持以下取值: - STRICT
- SKIP_TILL_NEXT
- SKIP_TILL_ANY
取值及含義請參見本文連續性定義。
times 用于描述該Pattern需要匹配多少次。 dict 否 取值示例如下。"times": { "from": 3, "to": 3, "windowTime": { "unit": "MINUTES", "size": 12 } },
其中from和to的數據類型均為integer,windowTime的單位可以為DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。說明 windowTime可以設為null,即"windowTime": null
。properties 描述該量詞所具有的屬性。 array of enumString 是 取值及含義請參見本文量詞屬性含義。 untilCondition 停止條件。 說明 僅可在LOOPING量詞修飾的Pattern后使用。dict 否 取值及含義請參見本文條件(Condition)定義。 - 條件(Condition)定義條件用于篩選符合某些要求的事件。例如要篩選瀏覽時長超過5分鐘的客戶,瀏覽時長超過5分鐘即為一個條件。
字段名 描述 類型 是否必填 備注 type 條件類型。 enum(string) 是 條件類型取值如下: - CLASS:對應用戶自定義的條件。
- AVIATOR:對應基于AVIATOR表達式的條件。
- GROOVY:對應基于GROOVY表達式的條件。
... 其他可序列化的自定義字段。 否 ... 目前我們支持以下幾種Condition:- Class類型Condition
字段名 描述 類型 是否必填 備注 type 條件類型。 enum(string) 是 固定值為Class。 className 類名。 string 是 該class完整類名,例如 com.alibaba.ververica.cep.demo.StartCondition
。 - 包含自定義參數的Condition
用戶在使用普通的Class類型Condition時,只能傳入類名(className),而無法動態地傳入參數。在動態CEP支持中,為了提供更豐富的Condition表達能力,我們設計并實現了包含自定義參數的Condition(即CustomArgsCondition),從而允許用戶在JSON中通過字符串數組來設置CustomArgsCondition所需參數, 進而動態構造CustomArgsCondition實例。這一特性允許用戶動態更新Condition的參數,而無需修改Java代碼。
字段名 描述 類型 是否必填 備注 type 條件類型。 enum(string) 是 固定值為Class。 className 類名。 string 是 該class完整類名,例如 com.alibaba.ververica.cep.demo.CustomMiddleCondition
。args 自定義參數。 array of string 是 一個字符串數組。 - 基于Aviator表達式的ConditionAviator是一個表達式求值引擎,可以動態地將表達式編譯成字節碼(詳情請參見aviatorscript)。因此我們可以在作業中使用基于Aviator表達式的Condition,使得條件的閾值也可以動態修改,而無需修改Java代碼重新編譯運行。
字段名 描述 類型 是否必填 備注 type 類名。 string 是 固定值為AVIATOR。 expression 表達式字符串。 string 是 形如price > 10這樣的表達式字符串(price變量名來自于Java代碼中定義的字段)。 您可以將該字符串在數據庫中的值進行修改。例如修改為price > 20,Flink CEP作業會動態加載price > 20構造新的AviatorCondition來處理之后的事件。
- 基于Groovy表達式的ConditionGroovy是一個基于JVM平臺的動態語言(Groovy語法可以參見syntax)。動態CEP支持使用Groovy表達式來定義條件(Condition),從而允許動態修改條件的閾值。
字段名 描述 類型 是否必填 備注 type 類名。 string 是 固定值為GROOVY。 expression 表達式字符串。 string 是 形如price > 5.0 && name.contains("mid")這樣的表達式字符串(price、name等變量名來自于Java代碼中定義的字段)。您可以將該字符串在數據庫中的值進行修改。例如修改為price > 20 && name.contains("end"),Flink CEP作業會動態加載新的Groovy字符串并構造新的GroovyCondition來處理之后的事件。
- 邊(Edge)定義
字段名 描述 類型 是否必填 備注 source 源模式名稱。 string 是 無。 target 目標模式名稱。 string 是 無。 type 事件選擇策略。 dict 是 支持以下取值: - STRICT
- SKIP_TILL_NEXT
- SKIP_TILL_ANY
- NOT_FOLLOW
- NOT_NEXT
取值及含義請參見本文連續性定義。
- 圖(GraphNode extends Node)定義
一個圖(GraphNode)代表一個完整的Pattern序列,它的節點(nodes)是各個獨立的Pattern,邊(edges)代表如何從一類Pattern的匹配轉移到另一類Pattern的匹配。
為了支持Pattern的嵌套(即GroupPattern),我們將一個GraphNode看作是Node的子類,即一個GraphNode可以作為一個更大的GraphNode中的Node。GraphNode相比于基礎Node,額外多了以下2類字段:- 描述圖的結構的nodes字段與edges字段。
- 描述圖內時間窗口策略的window字段與事件匹配后的跳出策略afterMatchSkipStrategy字段。
GraphNode的字段詳情請參見下表。字段名 描述 類型 是否必填 備注 name 該復合Pattern名稱。 String 是 一個唯一的字符串。 說明 不同Graph名稱不能重復。type 該Node類型。 enum(string) 是 固定值為COMPOSITE。 version 該Graph使用的JSON格式的版本號。 Int 是 默認值為1。 nodes 該Pattern內嵌套的子Pattern。 array of Node 是 不可以為空的array。 edges 嵌套的子Pattern的連接關系。 array of Edge 是 可以為空的array。 window - 當類型為FIRST_AND_LAST:代表該復合Pattern一次完整匹配之間的最大時間間隔。
- 當類型為PREVIOUS_AND_CURRENT:代表該相鄰2個子Pattern匹配之間的最大時間間隔。
dict 否 取值示例如下。 "window": { "type": "FIRST_AND_LAST", "time": { "unit": "DAYS", "size": 1 } }
單位可以為DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。數據類型為Long或Integer。
afterMatchSkipStrategy 該圖內所有事件匹配后的跳過策略。 dict 是 請參見本文事件匹配后的跳過策略(AfterMatchSkipStrategy)定義。 quantifier 量詞,用于描述如何匹配該Pattern,例如只匹配一次。 dict 是 請參見本文量詞(Quantifier)定義。 - 事件匹配后的跳過策略(AfterMatchSkipStrategy)定義
字段名 描述 類型 是否必填 備注 type 策略類型。 enum(string) 是 參數取值如下:- NO_SKIP(默認值):每個成功的匹配都會被輸出。
- SKIP_TO_NEXT:丟棄以相同事件開始的所有部分匹配。
- SKIP_PAST_LAST_EVENT:丟棄起始在這個匹配的開始和結束之間的所有部分匹配。
- SKIP_TO_FIRST:丟棄起始在這個匹配的開始和第一個出現的名稱為PatternName事件之間的所有部分匹配。
- SKIP_TO_LAST:丟棄起始在這個匹配的開始和最后一個出現的名稱為PatternName事件之間的所有部分匹配。
詳情請參見匹配后跳過策略
patternName 策略針對的模式的名稱。 string 否 一個唯一的字符串。 - 連續性定義
物理值 含義 STRICT 嚴格連續。 所有匹配的事件中間沒有任何不匹配的事件。 SKIP_TILL_NEXT 松散連續。允許匹配的事件之間出現不匹配的事件,不匹配的事件會被忽略。 SKIP_TILL_ANY 不確定松散連續。更進一步的松散連續,允許忽略掉一些匹配事件的附加匹配。 NOT_NEXT 緊接著的后續事件不能是某指定事件。 NOT_FOLLOW 某指定事件后續不出現。 相關示例請參見事件處理(CEP)文檔。
- 量詞屬性含義
取值 含義 SINGLE 代表該模式只出現一次。 LOOPING 代表該模式為循環模式,可能出現多次,類比正則表達式中的*與+。 TIMES 代表該模式會出現指定次數。 GREEDY 代表在匹配該模式時,會采用貪婪匹配策略,盡可能多地匹配。 OPTIONAL 代表該模式為可選模式。
示例一:普通Pattern示例
- 領取了某會場的優惠券。
- 在購物車中添加了超過3次的商品。
- 但最后沒有結賬付款。
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}
示例二:在Pattern中使用包含自定義參數的Condition
例如在實時營銷場景中,假設我們給用戶打上了一個人群標簽,之后會根據用戶所屬的標簽采取不同的營銷策略,例如對于A類用戶我們發送營銷短信,對于B類用戶我們發送優惠券等,而對于其他用戶,我們不采取營銷措施。針對上述需求,我們可以定義一個普通的Class類型Condition來解決,但當我們想調整策略,針對C類用戶也發送優惠券時,如果使用的是普通的Class類型Condition,那么我們必須改寫代碼,重新編譯并運行作業。這種情況下,我們可以使用包含自定義參數的Condition,在代碼中定義好如何根據傳入的參數進行策略的調整之后,我們只需要在數據庫中修改傳入的參數(即包含自定義參數的Condition的args字段的值),例如由["A", "B"] 改為["A", "B", "C"],即可實現營銷策略的動態更新。
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
關于該類Condition在具體業務場景的使用示例,詳情請參見Demo。