Flink動態(tài)CEP快速入門
實時計算Flink版支持通過DataStream作業(yè)的方式運行支持規(guī)則動態(tài)更新的Flink CEP作業(yè)。本文結(jié)合實時營銷中的反作弊場景,為您介紹如何基于Flink全托管快速構(gòu)建一個動態(tài)加載最新規(guī)則來處理上游Kafka數(shù)據(jù)的Flink CEP作業(yè)。
背景信息
在電商平臺投放廣告時,廣告主通常有預(yù)算限制。例如對于按點擊次數(shù)計算費用的廣告而言,如果有黑灰產(chǎn)構(gòu)造虛假流量,攻擊廣告主,則會很快消耗掉正常廣告主的預(yù)算,使得廣告內(nèi)容被提前下架。在這種情況下,廣告主的利益受到了損害,容易導(dǎo)致后續(xù)的投訴與糾紛。
為了應(yīng)對上述作弊場景,我們需要快速辨識出惡意流量,采取針對性措施(例如限制惡意用戶、向廣告主發(fā)送告警等)來保護用戶權(quán)益。同時考慮到可能有意外因素(例如達人推薦、熱點事件引流)導(dǎo)致流量驟變,我們也需要動態(tài)調(diào)整用于識別惡意流量的規(guī)則,避免損害正常用戶的利益。
本文為您演示如何使用Flink動態(tài)CEP解決上述問題。我們假設(shè)客戶的行為日志會被存放入消息隊列Kafka中,F(xiàn)link CEP作業(yè)會消費Kafka數(shù)據(jù),同時會去輪詢RDS數(shù)據(jù)庫中的規(guī)則表,拉取策略人員添加到數(shù)據(jù)庫的最新規(guī)則,并用最新規(guī)則去匹配事件。針對匹配到的事件,F(xiàn)link CEP作業(yè)會發(fā)出告警或?qū)⑾嚓P(guān)信息寫入其他數(shù)據(jù)存儲中。示例中整體數(shù)據(jù)鏈路如下圖所示。
實際演示中,我們會先啟動Flink CEP作業(yè),然后插入規(guī)則1:連續(xù)3條action為0的事件發(fā)生后,下一條事件的action仍非1,其業(yè)務(wù)含義為連續(xù)3次訪問該產(chǎn)品后最后沒有購買。在匹配到相應(yīng)事件并進行處理后,我們會動態(tài)更新規(guī)則1內(nèi)容為連續(xù)5條action為0或2的事件發(fā)生后,下一條事件的action仍非1,來應(yīng)對流量整體增加的場景,同時插入一條規(guī)則2,它將和規(guī)則1的初始規(guī)則一樣,用于輔助展示多規(guī)則支持等功能。當然,您也可以添加一個全新規(guī)則。
前提條件
如果您使用RAM用戶或RAM角色等身份訪問,需要確認已具有Flink控制臺相關(guān)權(quán)限,詳情請參見權(quán)限管理。
已創(chuàng)建Flink工作空間,詳情請參見開通實時計算Flink版。
上下游存儲
已創(chuàng)建RDS MySQL實例,詳情請參見創(chuàng)建RDS MySQL實例。
已創(chuàng)建消息隊列Kafka實例,詳情請參見概述。
僅實時計算引擎VVR 6.0.2及以上版本支持動態(tài)CEP功能。
操作流程
本文為您介紹如何編寫Flink CEP作業(yè)檢測行為日志中的異常事件序列去發(fā)現(xiàn)惡意流量,并演示如何進行規(guī)則的動態(tài)更新。具體的操作流程如下:
步驟一:準備測試數(shù)據(jù)
準備上游Kafka Topic
創(chuàng)建一個名稱為demo_topic的Topic,存放模擬的用戶行為日志。
操作詳情請參見步驟一:創(chuàng)建Topic。
準備RDS數(shù)據(jù)庫
在DMS數(shù)據(jù)管理控制臺上,準備RDS MySQL的測試數(shù)據(jù)。
使用高權(quán)限賬號登錄RDS MySQL。
詳情請參見通過DMS登錄RDS MySQL。
創(chuàng)建rds_demo規(guī)則表,用來記錄Flink CEP作業(yè)中需要應(yīng)用的規(guī)則。
在已登錄的SQLConsole窗口,輸入如下命令后,單擊執(zhí)行。
CREATE DATABASE cep_demo_db; USE cep_demo_db; CREATE TABLE rds_demo ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) );
每行代表一條規(guī)則,包含id、version等用于區(qū)分不同規(guī)則與每個規(guī)則不同版本的字段、描述CEP API中的模式對象的pattern字段,以及描述如何處理匹配模式的事件序列的function字段。
步驟二:配置IP白名單
為了讓Flink能訪問RDS MySQL實例,您需要將Flink全托管工作空間的網(wǎng)段添加到在RDS MySQL的白名單中。
獲取Flink全托管工作空間的VPC網(wǎng)段。
登錄實時計算控制臺。
在目標工作空間右側(cè)操作列,選擇 。
在工作空間詳情對話框,查看Flink全托管虛擬交換機的網(wǎng)段信息。
在RDS MySQL的IP白名單中,添加Flink全托管網(wǎng)段信息。
操作步驟詳情請參見設(shè)置IP白名單。
步驟三:開發(fā)并啟動Flink CEP作業(yè)
本文中所有代碼都可以在Github倉庫下載。本文檔接下來會描述重點部分實現(xiàn),方便您參考。
配置Maven項目中的pom.xml文件所使用的倉庫。
pom.xml文件的配置詳情,請參見Kafka DataStream Connector。
在作業(yè)的Maven POM文件中添加flink-cep作為項目依賴。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-cep</artifactId> <version>1.15-vvr-6.0.2-api</version> <scope>provided</scope> </dependency>
開發(fā)作業(yè)代碼。
構(gòu)建Kafka Source。
代碼編寫詳情,請參見Kafka DataStream Connector。
構(gòu)建CEP.dynamicPatterns()。
為支持CEP規(guī)則動態(tài)變更與多規(guī)則匹配,阿里云實時計算Flink版定義了CEP.dynamicPatterns() API。該API定義代碼如下。
public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns( DataStream<T> input, PatternProcessorDiscovererFactory<T> discovererFactory, TimeBehaviour timeBehaviour, TypeInformation<R> outTypeInfo)
使用該API時,所需參數(shù)如下。您可以跟進實際使用情況,更新相應(yīng)的參數(shù)取值。
參數(shù)
說明
DataStream<T> input
輸入事件流。
PatternProcessorDiscovererFactory<T> discovererFactory
工廠對象。工廠對象負責構(gòu)造一個探查器(PatternProcessorDiscoverer),探查器負責獲取最新規(guī)則,即構(gòu)造一個PatternProcessor接口。
TimeBehaviour timeBehaviour
描述Flink CEP作業(yè)如何處理事件的時間屬性。參數(shù)取值如下:
TimeBehaviour.ProcessingTime:代表按照Processing Time處理事件。
TimeBehaviour.EventTime:代表按照Event Time處理事件。
TypeInformation<R> outTypeInfo
描述輸出流的類型信息。
關(guān)于DataStream、TimeBehaviour、TypeInformation等Flink作業(yè)常見概念詳情,請參見DataStream、TimeBehaviour和TypeInformation。
這里重點介紹PatternProcessor接口,一個PatternProcessor包含一個確定的模式(Pattern)用于描述如何去匹配事件,以及一個PatternProcessFunction用于描述如何處理一個匹配(例如發(fā)送警報)。除此之外,還包含id與version等用于標識PatternProcessor的信息。因此一個PatternProcessor既包含規(guī)則本身,又指明了規(guī)則觸發(fā)時,F(xiàn)link作業(yè)應(yīng)如何響應(yīng)。更多背景請參見提案。
而patternProcessorDiscovererFactory用于構(gòu)造一個探查器去獲取最新的PatternProcessor,我們在示例代碼中提供了一個默認的周期性掃描外部存儲的抽象類。它描述了如何啟動一個Timer去定時輪詢外部存儲拉取最新的PatternProcessor。
public abstract class PeriodicPatternProcessorDiscoverer<T> implements PatternProcessorDiscoverer<T> { ... @Override public void discoverPatternProcessorUpdates( PatternProcessorManager<T> patternProcessorManager) { // Periodically discovers the pattern processor updates. timer.schedule( new TimerTask() { @Override public void run() { if (arePatternProcessorsUpdated()) { List<PatternProcessor<T>> patternProcessors = null; try { patternProcessors = getLatestPatternProcessors(); } catch (Exception e) { e.printStackTrace(); } patternProcessorManager.onPatternProcessorsUpdated(patternProcessors); } } }, 0, intervalMillis); } ... }
實時計算Flink版提供了JDBCPeriodicPatternProcessorDiscoverer的實現(xiàn),用于從支持JDBC協(xié)議的數(shù)據(jù)庫(例如RDS或者Hologres等)中拉取最新的規(guī)則。在使用時,您需要指定如下參數(shù)。
參數(shù)
說明
jdbcUrl
數(shù)據(jù)庫JDBC連接地址。
jdbcDriver
數(shù)據(jù)庫驅(qū)動類類名。
tableName
數(shù)據(jù)庫表名。
initialPatternProcessors
當數(shù)據(jù)庫的規(guī)則表為空時,使用的默認PatternProcessor。
intervalMillis
輪詢數(shù)據(jù)庫的時間間隔。
在實際代碼中您可以按如下方式使用,作業(yè)將會匹配到的規(guī)則打印到Flink TaskManager的輸出中。
// import ...... public class CepDemo { public static void main(String[] args) throws Exception { ...... // DataStream Source DataStreamSource<Event> source = env.fromSource( kafkaSource, WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner((event, ts) -> event.getEventTime()), "Kafka Source"); env.setParallelism(1); // keyBy userId and productionId // Notes, only events with the same key will be processd to see if there is a match KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream = source.keyBy( new KeySelector<Event, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> getKey(Event value) throws Exception { return Tuple2.of(value.getId(), value.getProductionId()); } }); SingleOutputStreamOperator<String> output = CEP.dynamicPatterns( keyedStream, new JDBCPeriodicPatternProcessorDiscovererFactory<>( JDBC_URL, JDBC_DRIVE, TABLE_NAME, null, JDBC_INTERVAL_MILLIS), TimeBehaviour.ProcessingTime, TypeInformation.of(new TypeHint<String>() {})); output.print(); // Compile and submit the job env.execute("CEPDemo"); } }
說明為了方便演示,我們在Demo代碼里將輸入數(shù)據(jù)流按照id和product id做了一步keyBy,再與
CEP.dynamicPatterns()
連接使用。這意味著只有具有相同id和product id的事件會被納入到規(guī)則匹配的考慮中,Key不同的事件之間不會產(chǎn)生匹配。
在實時計算控制臺上,上傳JAR包并部署JAR作業(yè),具體操作詳情請參見部署作業(yè)。
為了讓您可以快速測試使用,您需要下載實時計算Flink版測試JAR包。部署時需要配置的參數(shù)填寫說明如下表所示。
說明由于目前我們上游的Kafka Source暫無數(shù)據(jù),并且數(shù)據(jù)庫中的規(guī)則表為空。因此作業(yè)運行起來之后,暫時會沒有輸出。
配置項
說明
部署模式
選擇為流模式。
部署名稱
填寫對應(yīng)的JAR作業(yè)名稱。
引擎版本
從VVR 3.0.3版本(對應(yīng)Flink 1.12版本)開始,VVP支持同時運行多個不同引擎版本的JAR作業(yè)。如果您的作業(yè)已使用了Flink 1.12及更早版本的引擎,您需要按照以下情況進行處理:
Flink 1.12版本:停止后啟動作業(yè),系統(tǒng)將自動將引擎升級為vvr-3.0.3-flink-1.12版本。
Flink 1.11或Flink 1.10版本:手動將作業(yè)引擎版本升級到vvr-3.0.3-flink-1.12或vvr-4.0.8-flink-1.13版本后重啟作業(yè),否則會在啟動作業(yè)時超時報錯。
JAR URL
上傳打包好的JAR包,或者直接上傳我們提供的測試JAR包。
Entry Point Class
填寫為
com.alibaba.ververica.cep.demo.CepDemo
。Entry Point Main Arguments
如果您是自己開發(fā)的作業(yè),已經(jīng)配置了相關(guān)上下游存儲的信息,則此處可以不填寫。但是,如果您是使用的我們提供的測試JAR包,則需要配置該參數(shù)。代碼信息如下。
--kafkaBrokers YOUR_KAFKA_BROKERS --inputTopic YOUR_KAFKA_TOPIC --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD --tableName YOUR_TABLE_NAME --jdbcIntervalMs 3000
其中涉及的參數(shù)及含義如下:
kafkaBrokers:Kafka Broker地址。
inputTopic:Kafka Topic名稱。
inputTopicGroup:Kafka消費組。
jdbcUrl:數(shù)據(jù)庫JDBC連接地址。
說明本示例所使用的JDBC URL中對應(yīng)的賬號和密碼需要為普通賬號和密碼,且密碼里僅支持英文字母和數(shù)字。在實際場景中,您可根據(jù)您的需求在作業(yè)中使用不同的鑒權(quán)方式。
tableName:目標表名稱。
jdbcIntervalMs:輪詢數(shù)據(jù)庫的時間間隔。
說明需要將以上參數(shù)的取值修改為您實際業(yè)務(wù)上下游存儲的信息。
參數(shù)信息長度不要大于1024,且不建議用來傳復(fù)雜參數(shù),復(fù)雜參數(shù)指包括了換行、空格或者其他特殊字符的參數(shù)(僅支持英文字母和數(shù)字)。如果您需要傳入復(fù)雜參數(shù),請使用附加依賴文件來傳輸。
在部署詳情頁簽中的其他配置,添加如下作業(yè)運行參數(shù)。
kubernetes.application-mode.classpath.include-user-jar: 'true' classloader.resolve-order: parent-first
運行參數(shù)配置步驟詳情請參見運行參數(shù)配置。
在 頁面,單擊目標作業(yè)操作列下的啟動。
作業(yè)啟動參數(shù)配置詳情請參見作業(yè)啟動。
步驟四:插入規(guī)則
啟動Flink CEP作業(yè),然后插入規(guī)則1:連續(xù)3條action為0的事件發(fā)生后,下一條事件的action仍非1,其業(yè)務(wù)含義為連續(xù)3次訪問該產(chǎn)品后最后沒有購買。
使用普通賬號登錄RDS MySQL。
詳情請參見通過DMS登錄RDS MySQL。
插入動態(tài)更新規(guī)則。
將JSON字符串與id、version、function類名等拼接后插入到RDS中。
INSERT INTO rds_demo ( `id`, `version`, `pattern`, `function` ) values( '1', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}', 'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction') ;
為了方便您使用并提高數(shù)據(jù)庫中的Pattern字段的可讀性,實時計算Flink版定義了一套JSON格式的規(guī)則描述,詳情請參見動態(tài)CEP中規(guī)則的JSON格式定義。上述SQL語句中的pattern字段的值就是按照JSON格式的規(guī)則,給出的序列化后的pattern字符串。它的物理意義是去匹配這樣的模式:連續(xù)3條action為0的事件發(fā)生后,下一條事件的action仍非1。
說明在下文的EndCondition對應(yīng)的代碼中,定義了action仍非1。
對應(yīng)的CEP API描述如下。
Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3) .followedBy("end") .where(new EndCondition());
對應(yīng)的JSON字符串如下。
{ "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ], "times": null, "untilCondition": null }, "condition": null, "nodes": [ { "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ], "times": null, "untilCondition": null }, "condition": { "className": "com.alibaba.ververica.cep.demo.condition.EndCondition", "type": "CLASS" }, "type": "ATOMIC" }, { "name": "start", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "LOOPING" ], "times": { "from": 3, "to": 3, "windowTime": null }, "untilCondition": null }, "condition": { "expression": "action == 0", "type": "AVIATOR" }, "type": "ATOMIC" } ], "edges": [ { "source": "start", "target": "end", "type": "SKIP_TILL_NEXT" } ], "window": null, "afterMatchStrategy": { "type": "SKIP_PAST_LAST_EVENT", "patternName": null }, "type": "COMPOSITE", "version": 1 }
通過Kafka Client向demo_topic中發(fā)送消息。
在本Demo中,您也可以使用消息隊列Kafka提供的快速體驗消息收發(fā)頁面發(fā)送測試消息。
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022778000 1,Ken,0,1,1662022779000 1,Ken,0,1,1662022780000
demo_topic字段說明如下表所示。
字段
說明
id
用戶ID。
username
用戶名。
action
用戶動作,取值如下:
0代表瀏覽操作。
1代表購買動作。
2代表分享操作。
product_id
商品ID。
event_time
該行為發(fā)生的事件時間。
查看JobManager日志中打印的最新規(guī)則和TaskManager日志中打印的匹配。
在JobManager日志中,通過JDBCPeriodicPatternProcessorDiscoverer關(guān)鍵詞搜索,查看最新規(guī)則。
在TaskManager中以.out結(jié)尾的日志文件中,通過
A match for Pattern of (id, version): (1, 1)
關(guān)鍵詞搜索,查看日志中打印的匹配。
步驟五:更新匹配規(guī)則,并查看更新的規(guī)則是否生效
在匹配到相應(yīng)事件并進行處理后,動態(tài)更新規(guī)則1內(nèi)容為連續(xù)5條action為0或為2的事件發(fā)生后,下一條事件的action仍非1,來應(yīng)對流量整體增加的場景,同時插入一條規(guī)則2,它將和規(guī)則1的初始規(guī)則一樣,用于輔助展示多規(guī)則支持等功能。
使用在RDS控制臺上,更新匹配規(guī)則。
使用普通賬號登錄RDS MySQL。
詳情請參見通過DMS登錄RDS MySQL。
將StartCondition中的action == 0修改為action == 0 || action == 2,并且我們將重復(fù)出現(xiàn)的次數(shù)從>=3改為>=5,對應(yīng)SQL語句如下。
INSERT INTO rds_demo(`id`, `version`, `pattern`, `function`) values('1', 2, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":5,"to":5,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0 || action == 2","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
再插入一條記錄的id為2新規(guī)則。
它和規(guī)則1的版本1一樣,其StartCondition仍為action == 0且重復(fù)出現(xiàn)的次數(shù)為>=3。
INSERT INTO rds_demo(`id`, `version`, `pattern`, `function`) values('2', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
在Kafka控制臺上發(fā)送8條簡單的消息,來觸發(fā)匹配。
8條簡單的消息示例如下。
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,2,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,2,1,1662022777000
在TaskManager中以.out結(jié)尾的日志文件中,查看匹配結(jié)果。
如果要搜規(guī)則1在更新為版本2之后的匹配,可以通過
A match for Pattern of (id, version): (1, 2)
關(guān)鍵詞,查匹配結(jié)果。如果要搜規(guī)則2在版本為1的匹配,可以通過
A match for Pattern of (id, version): (2, 1)
關(guān)鍵詞,查匹配結(jié)果。
如上圖中藍框內(nèi)結(jié)果所示,F(xiàn)link CEP作業(yè)按照id為1,version為2的規(guī)則匹配到1次5個action為0或2的事件+1個action非1的1個事件的事件序列后輸出結(jié)果,代表動態(tài)修改的規(guī)則成功生效;而對于id為2,version為1的規(guī)則,如上圖中橙色框內(nèi)結(jié)果所示,F(xiàn)link CEP作業(yè)匹配到2次3個action為0的事件+1個action非1的1個事件的事件序列后輸出結(jié)果,代表動態(tài)新增的規(guī)則也在作業(yè)中被采用。