Flink CEP在實時風控場景的落地與優化
一、Flink CEP 介紹
Flink CEP 是復雜事件處理(Complex Event Processing)的縮寫。它是基于Flink實現的復雜事件處理庫。它的核心功能是識別輸入數據流中符合特定模式,即Pattern的事件序列,并允許用戶針對這些序列進行針對性處理。
1、什么是Flink CEP
這里是一個簡單的例子,可以讓大家對 Flink CEP 做了什么事情有一個基礎了解。
模式
首先,假設我們對模式ABBC感興趣。它代表的實際含義可能是A類事件發生后,連續發生了兩次B類事件,最后發生了一次C類事件。我們不要求這些事件是嚴格連續的,中間可以插入一些無關事件。
事件流
我們針對這種模式使用Flink CEP的API編寫了相關代碼。當Flink CEP作業啟動后,遇到實際輸入事件流,如圖中的事件流,例如d1、a1、b1、b2、d2、c1、e1、a2,針對這樣的事件數據流,Flink嘗試識別定義的匹配ABBC,最終得到匹配結果。例如,這里的匹配結果是a1、b1、b2、c1。
匹配
識別出這樣的匹配后,用戶就可以在作業中定義如何針對這些匹配進行處理。常見的做法是將報警輸入到下游系統或其他地方,然后進行進一步處理。
2、Flink CEP的應用場景
了解了Flink CEP的具體用途后,再來看一下Flink CEP通常用于解決哪些問題。在實際場景中,Flink CEP得益于Flink的分布式特性以及毫秒級處理延遲和豐富的規則表達能力,得到了較多應用。
這里舉三個典型的例子。
實時風控
Flink CEP 可應用于風險用戶檢測,例如讀取并分析用戶行為日志,將 5 分鐘內轉賬次數超過 10 次且金額大于 10000 的客戶識別為異常用戶,并進行針對性的風險提示。
實時營銷
我們可以利用 Flink CEP 來優化營銷場景中的策略。比如檢測用戶行為日志,從而在電商大促時,找到“10 分鐘內,在購物車中添加超過 3 次的商品,但最終沒有付款”的用戶,針對性的調整營銷策略。類似的邏輯也可以應用在實時營銷的反作弊場景中,這已經在錢大媽以及阿里內部有一些具體的落地案例。
物聯網
Flink CEP 可以用于檢測異常狀態并發出告警,比如共享單車被騎出指定區域,且 15 分鐘內沒有回到指定區域時發出風險提示。如果和物聯網傳感器結合,還可以用于檢測工業生產中的流水線異常。比如檢測到三個時間周期內,溫度傳感器都反饋溫度超過設置閾值,就發布報警等等。
二、動態多規則支持
接下來我們介紹阿里云實時計算團隊基于Flink社區的FLIP-200所做的動態多規則支持。
1、動態規則支持的背景
在介紹我們為什么需要動態規則更新前,先看一下右邊這個圖,確定規則究竟包含哪些要素。我們認為 Flink CEP 中的規則,或者我們剛才提到的 Pattern,由三部分組成,即閾值、條件和事實。
下面我們以“五分鐘內通過廣告鏈接訪問某商品超過五次,但最終沒有購買”為例來介紹這三個要素。
- 閾值:例如5次就是我們定義的閾值,可以根據需要改成7次或者10次等。
- 事實:是規則針對的動作,例如通過廣告鏈接訪問某商品以及購買等。
- 條件:用于描述我們如何根據閾值和事實去過濾我們想要的事件或動作。
在明確了這三個組成要素之后,我們可以理解為什么需要支持動態規則更新。頻繁變化的現實場景要求我們對初始規則內容進行調整或添加新的規則。
比如有一個 CEP 作業會在某個用戶在一分鐘內連續進行某操作超過 10 次后將其認為是風險用戶。
在特殊場景,如流量暴增或舉辦某些活動時,預期訪問次數會比平常多一些。10次的閾值就不太合適,我們可能想改成20或者30。在當前的開源 Flink 實現下,如果想實現這一步,只能重新編寫 Java 代碼,然后重啟作業,以使最新的規則生效。
這種做法的問題很明顯:
- 規則生效的時間成本較高。因為要走完整的代碼開發和打包上線等一系列流程。而風控領域的作業通常對延遲比較敏感。對于這些對延遲敏感的作業來說,上述時間成本難以接受。
- 如果規則的時間窗口較長且狀態較大,重啟作業的代價會更高。
因此,我們需要支持動態規則更新,也就是所謂的不重啟Flink作業來更新作業中實際應用的規則。
為了實現這一點,我們列出了兩個關鍵問題。
- 如何讓 Flink 作業不停機地加載新規則。
- 如何解決規則(Pattern)的序列化和反序列化。這是從第一個問題衍生而來。如果想讓作業不停機加載,作業就必須從某個地方拿到動態拿到新的 Pattern,并生成對應的 Pattern 對象在作業中使用。
在其他公司的生產實踐中,我們也看到了針對上述兩個問題,大家提出了一些自己的解決方案。例如修改 Flink CEP 內部實現,即在 Flink CEP Operator 上添加注入規則的接口,使 Flink CEP Operator 在作業運行中可以不停機地加載新規則,以及基于 groovy 引擎動態生成 Pattern 對象,從而解決序列化和反序列化問題。
然而,我們也注意到,這樣的實現方案存在一定的缺點。
- 數據庫壓力增大:通常情況下,規則都存儲在數據庫中。對 Flink CEP Operator進行修改時,會讓 Flink CEP Operator 直接與數據庫交互,拉取最新規則。這樣一來,當 Flink CEP 的作業并發較多時,對于大作業中的每一個 Flink CEP 并發,都需要連接數據庫讀取規則,這會給數據庫帶來額外壓力。
- 拉取規則同一性無法嚴格保證:可能 Subtask1 取得了規則一的某一個版本,而Subtask2 由于網絡問題或其他各種問題,拉取到了規則一的其他版本。這會導致不同并發之間使用了不同的規則,最終導致整個 Flink CEP 作業在邏輯上的不一致。
- 不利于拓展多規則支持:在修改 Flink CEP 并添加注入規則接口時,通常只支持修改單條規則,這并不利于拓展到對多規則的支持。
使用 Groovy 引擎動態生成 Pattern 對象也存在一些有待提高之處。
- 表達能力有限:通常只能配合 Aviator 表達式動態修改閾值,但較難修改規則整體邏輯。
- Groovy 腳本的編輯需要較多編程知識:對于風控策略人員或運營人員來說,他們可能對 Groovy 腳本的語法不太了解,這會產生額外的學習成本。
- 生成規則耗時較長:Groovy 是一個較重量級的引擎,生成規則的耗時相對較長。
2、FLIP-200的提出
在考慮以上背景和問題后,我們在Flink社區提出了 FLIP-200提案,并在阿里按照FLIP-200提案實現了一版 Flink CEP 中動態規則的支持。接下來將詳細介紹我們是如何實現的,以及如何解決之前提到的一些問題。
首先我們新增了 PatternProcessor 接口,用于完整定義Flink CEP中的一條規則。
PatternProcessor 包含 getId,getVersion 等用于獲取該 Pattern 唯一標識符的方法;getTimestamp 等用于獲得時間戳,進行調度的方法;getPattern 對象用于拿到 PatternProcessor 所內嵌的規則;PatternProcessorFunction 用于描述如何處理找到的匹配。除此之外,為了功能的完整性,我們還添加了 PatternProcessorDiscoverer 和 PatternProcessorManager,用于描述如何發現和管理 Processor。
下面介紹一下在動態規則支持中的具體設計。
3、動態規則支持中的具體設計
首先,介紹一下 Flink 的 Operator Coordinator 機制。Operator Coordinator 顧名思義,它負責協調 Flink 作業中的各個 operator。Operator Coordinator 本身運行在 Job Manager 中,它可以向每個下游 Task Manager 中的每個 Operator 發送一些事件。
它之前主要在 Flink 的 Source 和 Sink 中應用,用于發現和分配讀寫的 workload,以確保不會出現過于嚴重的數據傾斜等問題。我們也復用了這一機制實現了 Dynamic CEP Operator Coordinator。它是一個在 JobManager 中運行的線程,它會調用我們剛才提到的 Pattern Processor Discover 接口從數據庫拿到序列化后的最新Pattern(如圖綠色圓圈里的P)。拿到之后,它會發送給下游與之關聯的 Dynamic CEP Operator。這些 Dynamic CEP Operator 會接受 Operator Coordinator發送的事件,并解析和反序列化成實際使用的 Pattern Processor,然后構造對應的 NFA(非確定有限狀態機)。之后即可使用新構造的NFA來處理上游發生的事件,并最終輸出到下游。另外,我們允許一個 CEP Operator 包含多個 NFA,這樣可以比較好的支持多規則。
基于這樣的設計,我們可以實現不停機更新規則內容,且僅有Operator Coordinator 會與外部規則數據庫進行交互,減少了對數據庫的訪問,并且由于Flink框架保證了 Operator 在處理來自 Operator Coordinator 的事件的一致性,我們也保證了各個 Subtask 所使用的規則的一致性。
4、動態規則支持中 Pattern 的序列化和反序列化
接下來為大家介紹動態規則支持:Pattern的序列化和反序列化。
Pattern 本質上是描述了規則匹配時使用的NFA(非確定狀態自動機)的狀態轉換圖, 即根據輸入事件如何從一個狀態轉移到另一個狀態。
有了這樣一個基礎觀察后,我們可以了解到NFA對應一個狀態轉換圖。我們可以稍作簡化,例如將一個復合 Pattern 規則定義為一個圖。在這個圖中,每個節點是一個子Pattern,而“邊”則是事件選擇策略,也就是說,我們如何從子 Pattern1 的匹配中跳轉到 Pattern2 的匹配。我們也可以將每個圖看作一個更大的圖的子節點,從而實現模式的嵌套。也就是說,某個模式的子 Pattern 本身也可以是一個完整的復合Pattern 。
那么我們該如何描述這個圖呢?在設計過程中,我們有一些基礎的想法或設計原則。
- 應具備完整的表達能力。
- 序列化和反序列化相對方便。
- 易于拓展,方便集成。
- 格式應該能讀取、編輯。
基于這些原則,我們最終選擇基于 JSON 來定義一套描述 Pattern 的規范。現在我們給出一個簡單的例子來展示我們所定義的這套 JSON 規范。
首先,我們可以用 Java API 大致定義這樣一個示例Pattern:當滿足 StartCondition 的事件出現大于等于三次之后,如果跟著一個滿足 EndtCondition 的事件,那么我們就認為這是一個匹配。我們看到這個 Pattern 有兩個子 Pattern,第一個是 StartPattern,第二個是 EndPattern。他們定義的一個圖可能類似于下面這樣一個狀態轉換圖。
這里P1對應著 Start Pattern,而P2對應著End Pattern。在這張狀態轉換圖中,有兩條邊,第一條是P1指向自己的邊,代表滿足P1規則的事件可以重復出現。另外也有一條從 StartCondition 指向 EndCondition 的邊。
右圖則是這個狀態轉換圖的 JSON 描述。第一個是 node 字段,它是一個數組,包含每個子 Pattern 的完整描述,比如這里我們用 times 字段表示這個子 Pattern 對應的 Condition,要被滿足大于等于三次。第二個是 edges 字段,它用于記錄邊的信息。
關于各個字段的完整定義、取值以及物理描述,可以參考阿里云官方官網文檔。
用戶可以通過這一套規范來使用 JSON 語法描述一個完整的 Pattern,從而實現 Pattern 的序列化和反序列化。在數據庫中,實際存儲的可能是一個對應的 JSON 字符串,它可以反序列化成一個對應的 Pattern Processor 對象。
5、動態規則支持:拓展Condition
在描述完剛才提到的序列化和反序列化之后,我們將繼續介紹如何將這些功能應用于動態修改 Condition 中的閾值。
Aviator Condition
我們基于 Aviator 表達式引擎定義了 AviatorCondition。它的構造函數會接受一個表達式字符串,并根據輸入的表達式字符串生成 AviatorExpression,然后在 filter 方法中通過反射來解析傳入的事件字段和閾值,執行 AviatorExpression,最后返回 true or false 作為 filter 這個方法的返回結果,用于判斷是否滿足 Aviator Condition。
我們這里舉一個例子,假設有一個叫 Event 的類,它有 price 和 action兩個字段。那么我們就可以構造一個這樣的 AviatorCondition,其參數是一個表達式字符串,這個字符串里描述了對 Event 中事件字段的取值要求。比如我們要求 actinotallow==1&&price>20。如果我們想要更新閾值,就直接修改表達式,變成 actinotallow==0&&price>50。
注意這個字符串是傳入的參數,它也可以在我們剛才介紹的 JSON 格式中定義和描述,所以我們也可以直接編輯數據庫中的字段進行閾值的動態更新。
Groovy Condition
除了 Aviator 之外,為了滿足不同用戶的使用習慣,我們也支持了 Groovy 語法,允許將 Groovy 表達式作為參數生成對應的 Condition。
Custom Args Condition
除此之外,為了更進一步增強表達能力,支持一些高度定制化的需求,我們還實現了 Custom Args Condition,它支持用戶傳入自定義參數,并允許用戶自定義參數的解析邏輯。例如我們可能想自定義解析邏輯支持更復雜的處理需求。我們之前提到的兩個 Condition(Aviator Condition和Groovy Condition)可以看作是 Custom Args Condition 的一個特殊實現。
6、動態規則支持——多規則支持
接下來介紹多規則的支持。
多規則的關鍵點在于如何在同一輸入數據流上使用多條規則。按照開源 Flink CEP 的方案,我們要在一個 Flink 作業中使用多條規則的話,需要定義多個 Pattern Stream(CEP API 提供的接口),對應生成多個 CEP Operator。因此上游 input Source 會向不同的 Flink CEP Operator 發送數據,這也代表著上游的數據需要多次網絡傳遞,從而帶來一些額外的開銷。
我們這里進行了一個優化,允許一個 Dynamic Flink CEP Operator 創建多個 NFA,這樣上游數據只需傳遞一次,避免了額外的拷貝和網絡數據傳輸,降低資源消耗。
7、動態規則支持:Demo
我們在阿里云官網上也提供了一個針對廣告投放中的實時反作弊場景實現的 demo,用于演示使用。相應的代碼也在阿里云上開源。如果大家有興趣,可以去這里的網址查看。
https://help.aliyun.com/document_detail/459880.html
https://github.com/RealtimeCompute/ververica-cep-demo
三、CEP SQL語法增強&性能優化
接下來介紹一下我們在 CEP SQL 方面所做的一些工作。
1、Flink CEP SQL:介紹
Flink CEP SQL 的核心是 MATCH_RECOGNIZE 語法,這是提到的 SQL2016 中定義的一套規范。它主要包含以下幾個部分:
PARTITION BY:用于定義邏輯,例如這里是 PARTITION BY user_name,它相當于 group by user_name 或者 Java API 中的 key_by。
ORDER_BY:用來定義輸入數據的排序方式。因為 CEP 需要在持續數據流中識別特定模式,所以它必須按時間順序進行排列。通常這里的字段是 row_time。
MEASURES:類似于普通SQL中的select操作,用于對識別出的序列進行映射聚合等操作,并定義最終輸出結果。在圖中,我們使用了 FIRST 、LAST 和 COUNT 這些內置函數,對循環模式匹配到的A序列進行了聚合計算。對于B序列,我們只做了一個映射操作。
PATTERN:它是 MATCH_RECOGNIZE 語法的核心,也就是我們所說的Pattern 定義。它類似于正則表達式的語法。例如 A+B 模式表示序列中允許先出現一個或連續多個 A 事件,再緊接著出現一個 B 事件。
DEFINE:它用于設定了A和B兩個模式變量對應的事件匹配條件。例如,A 對應的匹配條件是“event_type為A”。
2、Flink CEP SQL:示例
這里是一個 CEP SQL 的例子。假如我們有一個 Source 表,里面7條數據,username 取值中有 Alice、Bob 等。如果對于這個表來應用我們剛才上一個 PPT 中的A+B模式去做匹配,那么我們得到圖中這樣的結果表。
現在針對Alice進行具體說明:Alice用戶的事件序列包含4個對應事件,由于我們定義了 A+B 的 Pattern 以及 Flink CEP SQL 中使用的默認 After Match Strategy是 SKIP_TO _NEXT,所以實際輸出了三行。第一行對應序列 AAAB,第二行對應序列是 AAB,第三行對應序列是 AB。
3、Flink CEP SQL:語法增強
剛才是基礎的 Flink CEP SQL 語法,接下來介紹在阿里內部針對 CEP SQL 語法所做的一些增強。
輸出帶時間約束模式的匹配超時序列
在用戶行為模式識別的場景中,如果我們希望找到在10分鐘內完成一系列動作的高質量用戶,我們可以通過 WITHIN INTERVAL 語法來指定這個時間約束。
對于 Alice 用戶,它在10分鐘內完成了指定的三個動作,這滿足了我們所說的10分鐘內的時間約束,因此會有一條輸出。對于 Bob 來說,雖然也滿足了 ABC 的模式,但因為滿足條件A的事件是在8:02完成的,而滿足條件 C 的事件是在8:15完成的,相隔時間已經超過了10分鐘,所以它沒有對應輸出。
我們剛才提到的典型場景是不輸出 Bob 的場景。但有時候,我們可能想找到流程中斷的原因,例如為什么 Bob 在10分鐘內只做 A、B 兩件事而沒有做 C,即需要輸出超時的事件匹配序列(在時間限制內未完全匹配的事件序列)。
為此,我們支持了定義 ONE ROW PER MATCH SHOW TIMEOUT MATCHES 語法,它的含義是允許展示匹配超時的事件,而不僅僅是展示匹配成功的事件。在正常的 Java API 中,這些超時序列是通過 Java API 的側輸出流,即所謂 SideOutput輸出到另一個 datastream 來實現的。
在 SQL 中,我們無法使用側輸出流,因此會輸出在結果表中。但如果 C 動作沒有發生,也就是 Bob 在10分鐘內沒有完成這件事情,那么 action_c_time 列會出現 Null 值。這樣便于用戶在后續針對性地過濾出 Bob 這類特殊用戶。
定義事件之間的連續性
Flink CEP 支持不同種類的事件連續性。例如,A.next(B) 要求事件 A 和事件 B 之間必須連續出現,中間不能有任何其他類事件,如 C 類事件等。這類連續性我們稱之為嚴格連續。A.followedBy(B) 則允許 A 和 B 之間可以出現其他事件。這類連續性我們稱之為松散連續。而開源 CEP 只支持嚴格連續。我們通過添加{- x*?-}語法來支持使用松散連續。
其原理是 X 未在 DEFINE 當中定義,那么它代表任意匹配。也就是說,我們允許 A和 B 之間出現任意一個無關事件,對應我們所說的松散連續。
定義循環模式中的連續性和貪婪性
除了剛才的連續性增強外,我們還支持定義循環模式中的連續性和貪婪性。具體來說,循環模式中,例如這里的 A+,是一個單獨的循環模式。我們允許 A 類事件出現一次或多次,默認要求是嚴格連續的 A 類事件,即不能有任何其他類型的事件。同時該模式會貪婪地匹配,盡可能地匹配最長的 A 序列。
我們這里實現了2個增強:
- 支持非貪婪的語義。即嘗試盡可能匹配短序列,而非長序列。
- 允許模式內部的松散連續。
具體語法上,一個問號(例如 A+?)對應非貪婪,兩個問號(例如 A+??)對應的是松散連續且貪婪,而三個問號對(例如 A+???)應的松散連續且非貪婪。
循環模式指定的停止條件(Until)
這對應 Java 的 until 語法。表示允許在循環模式中,例如 A+{B}C 模式在匹配 A 類事件時,如果遇到 B 類事件,那么會立刻終止 A 類循環模式的匹配,進入到下一部分 C類事件匹配中。
組合模式(Group Pattern)
最后一部分增強是組合模式。我們可以將多個模式組合成一個整體用在 next()、followedBy()這些函數中,支持整體的嵌套循環。它對應著括號語法。
AFTER MATCH NO SKIP 策略
最后一個 CEP SQL 的增強則是對 AFTER MATCH NO SKIP 策略的支持。
Flink CEP SQL 中默認的策略是 SKIP_TO_NEXT_ROW,它會丟棄以相同事件開始的所有部分匹配。實際 java API 中默認的是 NO_SKIP 策略,它會把每個成功匹配都輸出出來。
這里有一個例子,我們可以比較這兩個模式的不同。例如對a b+模式來說,如果輸入的是a1、b1、b2、b3,如果是NO_SKIP策略,它會輸出三條匹配結果,即a1b1、a1b2、a1b2b3。而對于SKIP_TO_NEXT來說,它只會出a1b1。原因是我們在找到a1b1匹配之后,所有以a1開頭的匹配都會被丟棄,就是所謂的SKIP_TO_NEXT_ROW 。但 NO_SKIP 是更常見的使用策略,所以我們也拓展了對它的支持。
4、FLink CEP 性能優化
這部分簡要介紹一下我們在內部對 Flink CEP 進行的性能優化。
減少State訪問
通過增加 Cache、優化 OnEvent/ProcessingTime()實現,減少了大約30%的state 訪問,從而使大規模 Flink CEP 作業對 CPU 的消耗更少。
修復 State 泄露
這是針對社區的一個開源bug的修復。主要針對生命周期較短的 key,如果這些 key的相關狀態沒有及時清理,可能會導致 state 不斷增大。當 key 包含一些隨機字段,例如 timestamp 或隨機 ID 時,該問題非常容易出現。
小貼士
如果大家想使用 Flink CEP,盡量使用 ver1.16 以上的版本。在這些較新的版本中,社區有一個關鍵優化,可以減少 Timer 注冊,從而極大幅度地減少作業的 CPU 消耗。
四、風控場景實際案例
最后,介紹一下我們在支持客戶中遇到的風控場景的典型應用。
1、業務類場景應用
交易風控
例如客戶在電商交易或銀行交易中,可能會想檢測一些特殊用戶。如一段時間內某個IP退款次數超過一定金額,就觸發熔斷,禁止該IP進行額外交易。交易風控是我們在支持客戶中遇到的最多的應用場景。
內容風控
例如用戶在x分鐘內發布超過y條帖子,對賬號進行禁言或其他處理等。
物聯網風控
例如,檢測設備異常,如果某個設備連續發生超過10次以上的異常,且超過15分鐘內沒有恢復,則發出報警消息等。
網安風控
檢測到某臺電腦上的日志,例如用戶行為滿足點擊釣魚郵件、下載異常文件、執行隱藏代碼等條件后觸發報警。
2、新功能應用
初次接觸 FlinkCEP 的用戶可能對 Flink CEP 中的部分功能或用法不太熟悉。我們提供了2個比較常用的功能的使用示例,分別是用于獲取子 Pattern 之前匹配的事件 context.getEventsForPattern() 接口,以及 Flink1.16 引入的用于定義相鄰事件之間的時間間隔的新語法:WithinType.Previous_AND_CURRENT 。如果有類似需求的客戶也可以參考。