日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Flink動態(tài)CEP快速入門

實時計算Flink版支持通過DataStream作業(yè)的方式運行支持規(guī)則動態(tài)更新的Flink CEP作業(yè)。本文結(jié)合實時營銷中的反作弊場景,為您介紹如何基于Flink全托管快速構(gòu)建一個動態(tài)加載最新規(guī)則來處理上游Kafka數(shù)據(jù)的Flink CEP作業(yè)。

背景信息

在電商平臺投放廣告時,廣告主通常有預(yù)算限制。例如對于按點擊次數(shù)計算費用的廣告而言,如果有黑灰產(chǎn)構(gòu)造虛假流量,攻擊廣告主,則會很快消耗掉正常廣告主的預(yù)算,使得廣告內(nèi)容被提前下架。在這種情況下,廣告主的利益受到了損害,容易導(dǎo)致后續(xù)的投訴與糾紛。

為了應(yīng)對上述作弊場景,我們需要快速辨識出惡意流量,采取針對性措施(例如限制惡意用戶、向廣告主發(fā)送告警等)來保護用戶權(quán)益。同時考慮到可能有意外因素(例如達人推薦、熱點事件引流)導(dǎo)致流量驟變,我們也需要動態(tài)調(diào)整用于識別惡意流量的規(guī)則,避免損害正常用戶的利益。

本文為您演示如何使用Flink動態(tài)CEP解決上述問題。我們假設(shè)客戶的行為日志會被存放入消息隊列Kafka中,F(xiàn)link CEP作業(yè)會消費Kafka數(shù)據(jù),同時會去輪詢RDS數(shù)據(jù)庫中的規(guī)則表,拉取策略人員添加到數(shù)據(jù)庫的最新規(guī)則,并用最新規(guī)則去匹配事件。針對匹配到的事件,F(xiàn)link CEP作業(yè)會發(fā)出告警或?qū)⑾嚓P(guān)信息寫入其他數(shù)據(jù)存儲中。示例中整體數(shù)據(jù)鏈路如下圖所示。Flink整體數(shù)據(jù)鏈路示意圖

實際演示中,我們會先啟動Flink CEP作業(yè),然后插入規(guī)則1:連續(xù)3條action為0的事件發(fā)生后,下一條事件的action仍非1,其業(yè)務(wù)含義為連續(xù)3次訪問該產(chǎn)品后最后沒有購買。在匹配到相應(yīng)事件并進行處理后,我們會動態(tài)更新規(guī)則1內(nèi)容為連續(xù)5條action為0或2的事件發(fā)生后,下一條事件的action仍非1,來應(yīng)對流量整體增加的場景,同時插入一條規(guī)則2,它將和規(guī)則1的初始規(guī)則一樣,用于輔助展示多規(guī)則支持等功能。當然,您也可以添加一個全新規(guī)則。

前提條件

  • 如果您使用RAM用戶或RAM角色等身份訪問,需要確認已具有Flink控制臺相關(guān)權(quán)限,詳情請參見權(quán)限管理。

  • 已創(chuàng)建Flink工作空間,詳情請參見開通實時計算Flink版

  • 上下游存儲

  • 僅實時計算引擎VVR 6.0.2及以上版本支持動態(tài)CEP功能。

操作流程

本文為您介紹如何編寫Flink CEP作業(yè)檢測行為日志中的異常事件序列去發(fā)現(xiàn)惡意流量,并演示如何進行規(guī)則的動態(tài)更新。具體的操作流程如下:

步驟一:準備測試數(shù)據(jù)

準備上游Kafka Topic

  1. 登錄云消息隊列 Kafka 版控制臺。

  2. 創(chuàng)建一個名稱為demo_topic的Topic,存放模擬的用戶行為日志。

    操作詳情請參見步驟一:創(chuàng)建Topic。

準備RDS數(shù)據(jù)庫

在DMS數(shù)據(jù)管理控制臺上,準備RDS MySQL的測試數(shù)據(jù)。

  1. 使用高權(quán)限賬號登錄RDS MySQL。

    詳情請參見通過DMS登錄RDS MySQL。

  2. 創(chuàng)建rds_demo規(guī)則表,用來記錄Flink CEP作業(yè)中需要應(yīng)用的規(guī)則。

    在已登錄的SQLConsole窗口,輸入如下命令后,單擊執(zhí)行。

    CREATE DATABASE cep_demo_db;
    USE cep_demo_db;
    
    CREATE TABLE rds_demo (
      `id` VARCHAR(64),
      `version` INT,
      `pattern` VARCHAR(4096),
      `function` VARCHAR(512)
    );

    每行代表一條規(guī)則,包含id、version等用于區(qū)分不同規(guī)則與每個規(guī)則不同版本的字段、描述CEP API中的模式對象的pattern字段,以及描述如何處理匹配模式的事件序列的function字段。

步驟二:配置IP白名單

為了讓Flink能訪問RDS MySQL實例,您需要將Flink全托管工作空間的網(wǎng)段添加到在RDS MySQL的白名單中。

  1. 獲取Flink全托管工作空間的VPC網(wǎng)段。

    1. 登錄實時計算控制臺

    2. 在目標工作空間右側(cè)操作列,選擇更多 > 工作空間詳情。

    3. 工作空間詳情對話框,查看Flink全托管虛擬交換機的網(wǎng)段信息。

      網(wǎng)段信息

  2. 在RDS MySQL的IP白名單中,添加Flink全托管網(wǎng)段信息。

    操作步驟詳情請參見設(shè)置IP白名單。RDS白名單

步驟三:開發(fā)并啟動Flink CEP作業(yè)

說明

本文中所有代碼都可以在Github倉庫下載。本文檔接下來會描述重點部分實現(xiàn),方便您參考。

  1. 配置Maven項目中的pom.xml文件所使用的倉庫。

    pom.xml文件的配置詳情,請參見Kafka DataStream Connector。

  2. 在作業(yè)的Maven POM文件中添加flink-cep作為項目依賴。

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-cep</artifactId>
        <version>1.15-vvr-6.0.2-api</version>
        <scope>provided</scope>
    </dependency>
  3. 開發(fā)作業(yè)代碼。

    1. 構(gòu)建Kafka Source。

      代碼編寫詳情,請參見Kafka DataStream Connector。

    2. 構(gòu)建CEP.dynamicPatterns()。

      為支持CEP規(guī)則動態(tài)變更與多規(guī)則匹配,阿里云實時計算Flink版定義了CEP.dynamicPatterns() API。該API定義代碼如下。

      public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(
               DataStream<T> input,
               PatternProcessorDiscovererFactory<T> discovererFactory,
               TimeBehaviour timeBehaviour,
               TypeInformation<R> outTypeInfo)

      使用該API時,所需參數(shù)如下。您可以跟進實際使用情況,更新相應(yīng)的參數(shù)取值。

      參數(shù)

      說明

      DataStream<T> input

      輸入事件流。

      PatternProcessorDiscovererFactory<T> discovererFactory

      工廠對象。工廠對象負責構(gòu)造一個探查器(PatternProcessorDiscoverer),探查器負責獲取最新規(guī)則,即構(gòu)造一個PatternProcessor接口。

      TimeBehaviour timeBehaviour

      描述Flink CEP作業(yè)如何處理事件的時間屬性。參數(shù)取值如下:

      • TimeBehaviour.ProcessingTime:代表按照Processing Time處理事件。

      • TimeBehaviour.EventTime:代表按照Event Time處理事件。

      TypeInformation<R> outTypeInfo

      描述輸出流的類型信息。

      關(guān)于DataStream、TimeBehaviour、TypeInformation等Flink作業(yè)常見概念詳情,請參見DataStreamTimeBehaviourTypeInformation。

      這里重點介紹PatternProcessor接口,一個PatternProcessor包含一個確定的模式(Pattern)用于描述如何去匹配事件,以及一個PatternProcessFunction用于描述如何處理一個匹配(例如發(fā)送警報)。除此之外,還包含id與version等用于標識PatternProcessor的信息。因此一個PatternProcessor既包含規(guī)則本身,又指明了規(guī)則觸發(fā)時,F(xiàn)link作業(yè)應(yīng)如何響應(yīng)。更多背景請參見提案

      而patternProcessorDiscovererFactory用于構(gòu)造一個探查器去獲取最新的PatternProcessor,我們在示例代碼中提供了一個默認的周期性掃描外部存儲的抽象類。它描述了如何啟動一個Timer去定時輪詢外部存儲拉取最新的PatternProcessor。

      public abstract class PeriodicPatternProcessorDiscoverer<T>
              implements PatternProcessorDiscoverer<T> {
      
          ...
          @Override
          public void discoverPatternProcessorUpdates(
                  PatternProcessorManager<T> patternProcessorManager) {
              // Periodically discovers the pattern processor updates.
              timer.schedule(
                      new TimerTask() {
                          @Override
                          public void run() {
                              if (arePatternProcessorsUpdated()) {
                                  List<PatternProcessor<T>> patternProcessors = null;
                                  try {
                                      patternProcessors = getLatestPatternProcessors();
                                  } catch (Exception e) {
                                      e.printStackTrace();
                                  }
                                  patternProcessorManager.onPatternProcessorsUpdated(patternProcessors);
                              }
                          }
                      },
                      0,
                      intervalMillis);
          }
      
          ...
      }

      實時計算Flink版提供了JDBCPeriodicPatternProcessorDiscoverer的實現(xiàn),用于從支持JDBC協(xié)議的數(shù)據(jù)庫(例如RDS或者Hologres等)中拉取最新的規(guī)則。在使用時,您需要指定如下參數(shù)。

      參數(shù)

      說明

      jdbcUrl

      數(shù)據(jù)庫JDBC連接地址。

      jdbcDriver

      數(shù)據(jù)庫驅(qū)動類類名。

      tableName

      數(shù)據(jù)庫表名。

      initialPatternProcessors

      當數(shù)據(jù)庫的規(guī)則表為空時,使用的默認PatternProcessor。

      intervalMillis

      輪詢數(shù)據(jù)庫的時間間隔。

      在實際代碼中您可以按如下方式使用,作業(yè)將會匹配到的規(guī)則打印到Flink TaskManager的輸出中。

      // import ......
      public class CepDemo {
      
          public static void main(String[] args) throws Exception {
      
              ......
              // DataStream Source
              DataStreamSource<Event> source =
                      env.fromSource(
                              kafkaSource,
                              WatermarkStrategy.<Event>forMonotonousTimestamps()
                                      .withTimestampAssigner((event, ts) -> event.getEventTime()),
                              "Kafka Source");
      
              env.setParallelism(1);
              // keyBy userId and productionId
              // Notes, only events with the same key will be processd to see if there is a match
              KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream =
                      source.keyBy(
                              new KeySelector<Event, Tuple2<Integer, Integer>>() {
      
                                  @Override
                                  public Tuple2<Integer, Integer> getKey(Event value) throws Exception {
                                      return Tuple2.of(value.getId(), value.getProductionId());
                                  }
                              });
      
              SingleOutputStreamOperator<String> output =
                  CEP.dynamicPatterns(
                  keyedStream,
                  new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                      JDBC_URL, JDBC_DRIVE, TABLE_NAME, null, JDBC_INTERVAL_MILLIS),
                  TimeBehaviour.ProcessingTime,
                  TypeInformation.of(new TypeHint<String>() {}));
      
              output.print();
              // Compile and submit the job
              env.execute("CEPDemo");
          }
      }
      說明

      為了方便演示,我們在Demo代碼里將輸入數(shù)據(jù)流按照id和product id做了一步keyBy,再與CEP.dynamicPatterns()連接使用。這意味著只有具有相同id和product id的事件會被納入到規(guī)則匹配的考慮中,Key不同的事件之間不會產(chǎn)生匹配。

  4. 實時計算控制臺上,上傳JAR包并部署JAR作業(yè),具體操作詳情請參見部署作業(yè)

    為了讓您可以快速測試使用,您需要下載實時計算Flink版測試JAR包。部署時需要配置的參數(shù)填寫說明如下表所示。

    說明

    由于目前我們上游的Kafka Source暫無數(shù)據(jù),并且數(shù)據(jù)庫中的規(guī)則表為空。因此作業(yè)運行起來之后,暫時會沒有輸出。

    配置項

    說明

    部署模式

    選擇為流模式。

    部署名稱

    填寫對應(yīng)的JAR作業(yè)名稱。

    引擎版本

    從VVR 3.0.3版本(對應(yīng)Flink 1.12版本)開始,VVP支持同時運行多個不同引擎版本的JAR作業(yè)。如果您的作業(yè)已使用了Flink 1.12及更早版本的引擎,您需要按照以下情況進行處理:

    • Flink 1.12版本:停止后啟動作業(yè),系統(tǒng)將自動將引擎升級為vvr-3.0.3-flink-1.12版本。

    • Flink 1.11或Flink 1.10版本:手動將作業(yè)引擎版本升級到vvr-3.0.3-flink-1.12或vvr-4.0.8-flink-1.13版本后重啟作業(yè),否則會在啟動作業(yè)時超時報錯。

    JAR URL

    上傳打包好的JAR包,或者直接上傳我們提供的測試JAR包。

    Entry Point Class

    填寫為com.alibaba.ververica.cep.demo.CepDemo。

    Entry Point Main Arguments

    如果您是自己開發(fā)的作業(yè),已經(jīng)配置了相關(guān)上下游存儲的信息,則此處可以不填寫。但是,如果您是使用的我們提供的測試JAR包,則需要配置該參數(shù)。代碼信息如下。

    --kafkaBrokers YOUR_KAFKA_BROKERS 
    --inputTopic YOUR_KAFKA_TOPIC 
    --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP 
    --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD
    --tableName YOUR_TABLE_NAME  
    --jdbcIntervalMs 3000

    其中涉及的參數(shù)及含義如下:

    • kafkaBrokers:Kafka Broker地址。

    • inputTopic:Kafka Topic名稱。

    • inputTopicGroup:Kafka消費組。

    • jdbcUrl:數(shù)據(jù)庫JDBC連接地址。

      說明

      本示例所使用的JDBC URL中對應(yīng)的賬號和密碼需要為普通賬號和密碼,且密碼里僅支持英文字母和數(shù)字。在實際場景中,您可根據(jù)您的需求在作業(yè)中使用不同的鑒權(quán)方式。

    • tableName:目標表名稱。

    • jdbcIntervalMs:輪詢數(shù)據(jù)庫的時間間隔。

    說明
    • 需要將以上參數(shù)的取值修改為您實際業(yè)務(wù)上下游存儲的信息。

    • 參數(shù)信息長度不要大于1024,且不建議用來傳復(fù)雜參數(shù),復(fù)雜參數(shù)指包括了換行、空格或者其他特殊字符的參數(shù)(僅支持英文字母和數(shù)字)。如果您需要傳入復(fù)雜參數(shù),請使用附加依賴文件來傳輸。

  5. 部署詳情頁簽中的其他配置,添加如下作業(yè)運行參數(shù)。

    kubernetes.application-mode.classpath.include-user-jar: 'true' 
    classloader.resolve-order: parent-first

    運行參數(shù)配置步驟詳情請參見運行參數(shù)配置

  6. 運維中心 > 作業(yè)運維頁面,單擊目標作業(yè)操作列下的啟動

    作業(yè)啟動參數(shù)配置詳情請參見作業(yè)啟動。

步驟四:插入規(guī)則

啟動Flink CEP作業(yè),然后插入規(guī)則1:連續(xù)3條action為0的事件發(fā)生后,下一條事件的action仍非1,其業(yè)務(wù)含義為連續(xù)3次訪問該產(chǎn)品后最后沒有購買。

  1. 使用普通賬號登錄RDS MySQL。

    詳情請參見通過DMS登錄RDS MySQL

  2. 插入動態(tài)更新規(guī)則。

    將JSON字符串與id、version、function類名等拼接后插入到RDS中。

    INSERT INTO rds_demo (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;

    為了方便您使用并提高數(shù)據(jù)庫中的Pattern字段的可讀性,實時計算Flink版定義了一套JSON格式的規(guī)則描述,詳情請參見動態(tài)CEP中規(guī)則的JSON格式定義。上述SQL語句中的pattern字段的值就是按照JSON格式的規(guī)則,給出的序列化后的pattern字符串。它的物理意義是去匹配這樣的模式:連續(xù)3條action為0的事件發(fā)生后,下一條事件的action仍非1。

    說明

    在下文的EndCondition對應(yīng)的代碼中,定義了action仍非1。

    • 對應(yīng)的CEP API描述如下。

      Pattern<Event, Event> pattern =
          Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
              .where(new StartCondition("action == 0"))
              .timesOrMore(3)
              .followedBy("end")
              .where(new EndCondition());
    • 對應(yīng)的JSON字符串如下。

      {
        "name": "end",
        "quantifier": {
          "consumingStrategy": "SKIP_TILL_NEXT",
          "properties": [
            "SINGLE"
          ],
          "times": null,
          "untilCondition": null
        },
        "condition": null,
        "nodes": [
          {
            "name": "end",
            "quantifier": {
              "consumingStrategy": "SKIP_TILL_NEXT",
              "properties": [
                "SINGLE"
              ],
              "times": null,
              "untilCondition": null
            },
            "condition": {
              "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
              "type": "CLASS"
            },
            "type": "ATOMIC"
          },
          {
            "name": "start",
            "quantifier": {
              "consumingStrategy": "SKIP_TILL_NEXT",
              "properties": [
                "LOOPING"
              ],
              "times": {
                "from": 3,
                "to": 3,
                "windowTime": null
              },
              "untilCondition": null
            },
            "condition": {
              "expression": "action == 0",
              "type": "AVIATOR"
            },
            "type": "ATOMIC"
          }
        ],
        "edges": [
          {
            "source": "start",
            "target": "end",
            "type": "SKIP_TILL_NEXT"
          }
        ],
        "window": null,
        "afterMatchStrategy": {
          "type": "SKIP_PAST_LAST_EVENT",
          "patternName": null
        },
        "type": "COMPOSITE",
        "version": 1
      }
  3. 通過Kafka Client向demo_topic中發(fā)送消息。

    在本Demo中,您也可以使用消息隊列Kafka提供的快速體驗消息收發(fā)頁面發(fā)送測試消息。

    1,Ken,0,1,1662022777000
    1,Ken,0,1,1662022778000
    1,Ken,0,1,1662022779000
    1,Ken,0,1,1662022780000

    發(fā)消息

    demo_topic字段說明如下表所示。

    字段

    說明

    id

    用戶ID。

    username

    用戶名。

    action

    用戶動作,取值如下:

    • 0代表瀏覽操作。

    • 1代表購買動作。

    • 2代表分享操作。

    product_id

    商品ID。

    event_time

    該行為發(fā)生的事件時間。

  4. 查看JobManager日志中打印的最新規(guī)則和TaskManager日志中打印的匹配。

    • 在JobManager日志中,通過JDBCPeriodicPatternProcessorDiscoverer關(guān)鍵詞搜索,查看最新規(guī)則。日志規(guī)則

    • 在TaskManager中以.out結(jié)尾的日志文件中,通過A match for Pattern of (id, version): (1, 1)關(guān)鍵詞搜索,查看日志中打印的匹配。作業(yè)日志

步驟五:更新匹配規(guī)則,并查看更新的規(guī)則是否生效

在匹配到相應(yīng)事件并進行處理后,動態(tài)更新規(guī)則1內(nèi)容為連續(xù)5條action為0或為2的事件發(fā)生后,下一條事件的action仍非1,來應(yīng)對流量整體增加的場景,同時插入一條規(guī)則2,它將和規(guī)則1的初始規(guī)則一樣,用于輔助展示多規(guī)則支持等功能。

  1. 使用在RDS控制臺上,更新匹配規(guī)則。

    1. 使用普通賬號登錄RDS MySQL。

      詳情請參見通過DMS登錄RDS MySQL。

    2. 將StartCondition中的action == 0修改為action == 0 || action == 2,并且我們將重復(fù)出現(xiàn)的次數(shù)從>=3改為>=5,對應(yīng)SQL語句如下。

      INSERT INTO rds_demo(`id`, `version`, `pattern`, `function`) values('1', 2, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":5,"to":5,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0 || action == 2","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
    3. 再插入一條記錄的id為2新規(guī)則。

      它和規(guī)則1的版本1一樣,其StartCondition仍為action == 0且重復(fù)出現(xiàn)的次數(shù)為>=3。

      INSERT INTO rds_demo(`id`, `version`, `pattern`, `function`) values('2', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
  2. 在Kafka控制臺上發(fā)送8條簡單的消息,來觸發(fā)匹配。

    8條簡單的消息示例如下。

    1,Ken,0,1,1662022777000
    1,Ken,0,1,1662022777000
    1,Ken,0,1,1662022777000
    1,Ken,2,1,1662022777000
    1,Ken,0,1,1662022777000
    1,Ken,0,1,1662022777000
    1,Ken,0,1,1662022777000
    1,Ken,2,1,1662022777000
  3. 在TaskManager中以.out結(jié)尾的日志文件中,查看匹配結(jié)果。

    • 如果要搜規(guī)則1在更新為版本2之后的匹配,可以通過A match for Pattern of (id, version): (1, 2)關(guān)鍵詞,查匹配結(jié)果。匹配結(jié)果1

    • 如果要搜規(guī)則2在版本為1的匹配,可以通過A match for Pattern of (id, version): (2, 1)關(guān)鍵詞,查匹配結(jié)果。匹配結(jié)果2

    如上圖中藍框內(nèi)結(jié)果所示,F(xiàn)link CEP作業(yè)按照id為1,version為2的規(guī)則匹配到1次5個action為0或2的事件+1個action非1的1個事件的事件序列后輸出結(jié)果,代表動態(tài)修改的規(guī)則成功生效;而對于id為2,version為1的規(guī)則,如上圖中橙色框內(nèi)結(jié)果所示,F(xiàn)link CEP作業(yè)匹配到2次3個action為0的事件+1個action非1的1個事件的事件序列后輸出結(jié)果,代表動態(tài)新增的規(guī)則也在作業(yè)中被采用。