Python時間序列異常檢測ADTK
1. adtk簡介
智能運維AIOps的數(shù)據(jù)基本上都是時間序列形式的,而異常檢測告警是AIOps中重要組成部分。筆者最近在處理時間序列數(shù)據(jù)時有使用到adtk這個python庫,在這里和大家做下分享。
什么是adtk?
adtk(Anomaly Detection Toolkit)是無監(jiān)督異常檢測的python工具包,它提供常用算法和處理函數(shù):
- 簡單有效的異常檢測算法(detector)
- 異常特征加工(transformers)
- 處理流程控制(Pipe)
2. 安裝
- pip install adtk
3. adtk數(shù)據(jù)要求
時間序列的數(shù)據(jù)主要包括時間和相應的指標(如cpu,內存,數(shù)量等)。python中數(shù)據(jù)分析一般都是pandas的DataFrame,adtk要求輸入數(shù)據(jù)的索引必須是DatetimeIndex。
pandas提供了時間序列的時間生成和處理方法。
- pd.date_range
- stamps = pd.date_range("2012-10-08 18:15:05", periods=4, freq="D")
- # DatetimeIndex(['2012-10-08 18:15:05', '2012-10-09 18:15:05',
- # '2012-10-10 18:15:05', '2012-10-11 18:15:05'],
- # dtype='datetime64[ns]', freq='D')
- pd.Timestamp
- tmp = pd.Timestamp("2018-01-05") + pd.Timedelta("1 day")
- print(tmp, tmp.timestamp(), tmp.strftime('%Y-%m-%d'))
- # 2018-01-06 00:00:00 1515196800.0 2018-01-06
- pd.Timestamp( tmp.timestamp(), unit='s', tz='Asia/Shanghai')
- # Timestamp('2018-01-06 08:00:00+0800', tz='Asia/Shanghai')
- pd.to_datetime
adtk提供是validate_series來驗證時間序列數(shù)據(jù)的有效性,如是否按時間順序
- import pandas as pd
- from adtk.data import validate_series
- from adtk.visualization import plot
- df = pd.read_csv('./data/nyc_taxi.csv', index_col="timestamp", parse_dates=True)
- df = validate_series(df)
- plot(df)
4. 異常特征加工(transformers)
adtk中transformers提供了許多時間序列特征加工的方法:
- 一般我們獲取時間序列的特征,通常會按照時間窗口在滑動,采集時間窗口上的統(tǒng)計特征;
- 還有對于季節(jié)性趨勢做分解,區(qū)分哪些是季節(jié)性的部分,哪些是趨勢的部分
- 時間序列降維映射:對于細粒度的時間序列數(shù)據(jù),數(shù)據(jù)量大,對于檢測算法來說效率不高。降維方法能保留時間序列的主要趨勢等特征同時,降低維數(shù),提供時間效率。這個對于用CNN的方式來進行時間序列分類特別有效,adtk主要提供基于pca的降維和重構方法,主要應用于多維時間序列。
4.1 滑動窗口
atdk提供單個寬口RollingAggregate和2個窗口DoubleRollingAggregate的滑動方式。統(tǒng)計特征支持均值,中位數(shù),匯總,最大值,最小值,分位數(shù), 方差,標準差,偏度,峰度,直方圖 等,['mean', 'median', 'sum', 'min', 'max', 'quantile', 'iqr', 'idr', 'count', 'nnz', 'nunique', 'std', 'var', 'skew', 'kurt', 'hist']其中
- 'iqr': 是分位數(shù) 75% 和 25%差值
- 'idr': 是分位數(shù) 90% 和 10%插值
- RollingAggregate
- import pandas as pd
- from adtk.data import validate_series
- from adtk.transformer import RollingAggregate
- from adtk.transformer import DoubleRollingAggregate
- s = pd.read_csv('./data/nyc_taxi.csv', index_col="timestamp", parse_dates=True)
- s = validate_series(s)
- s_transformed = RollingAggregate(agg='quantile',agg_params={"q": [0.25, 0.75]}, window=5).transform(s)
- DoubleRollingAggregate 提供了兩個窗口之間統(tǒng)計特征的差異特征,如前5分鐘和后5分鐘,均值的差值等。agg參數(shù)和RollingAggregate中一致,新增的參數(shù)diff主要衡量差距的函數(shù):
- import pandas as pd
- from adtk.data import validate_series
- from adtk.transformer import DoubleRollingAggregate
- s = pd.read_csv('./data/ec2_cpu_utilization_53ea38.csv', index_col="timestamp", parse_dates=True)
- s = validate_series(s)
- s_transformed = DoubleRollingAggregate(
- agg="median",
- window=5,
- diff="diff").transform(s)
- 'diff': 后減去前
- 'rel_diff': Relative difference between values of aggregated metric (right minus left divided left). Only applicable if the aggregated metric is scalar.
- 'abs_rel_diff': (后-前)/前, 相對差值
- 'l1': l1正則
- 'l2': l2正則
4.2 季節(jié)性拆解
時間序列可拆解成趨勢性,季節(jié)性和殘差部分。atdk中ClassicSeasonalDecomposition提供了這三個部分拆解,并移除趨勢和季節(jié)性部分,返回殘差部分。
- freq: 設置季節(jié)性的周期
- trend:可以設置是否保留趨勢性
- from adtk.transformer import ClassicSeasonalDecomposition
- s = pd.read_csv('./data/nyc_taxi.csv', index_col="timestamp", parse_dates=True)
- s = validate_series(s)
- s_transformed = ClassicSeasonalDecomposition().fit_transform(s)
- s_transformed = ClassicSeasonalDecomposition(trend=True).fit_transform(s)
4.3 降維和重構
adtk提供的pca對數(shù)據(jù)進行降維到主成分PcaProjection和重構方法PcaReconstruction。
- df = pd.read_csv('./data/generator.csv', index_col="Time", parse_dates=True)
- df = validate_series(df)
- from adtk.transformer import PcaProjection
- s = PcaProjection(k=1).fit_transform(df)
- plot(pd.concat([df, s], axis=1), ts_linewidth=1, ts_markersize=3, curve_group=[("Speed (kRPM)", "Power (kW)"), "pc0"]);
- from adtk.transformer import PcaReconstruction
- df_transformed = PcaReconstruction(k=1).fit_transform(df).rename(columns={"Speed (kRPM)": "Speed reconstruction (kRPM)", "Power (kW)": "Power reconstruction (kW)"})
- plot(pd.concat([df, df_transformed], axis=1), ts_linewidth=1, ts_markersize=3, curve_group=[("Speed (kRPM)", "Power (kW)"), ("Speed reconstruction (kRPM)", "Power reconstruction (kW)")]);
- ../_images/notebooks_demo_99_0.png
5. 異常檢測算法(detector)
adtk提供的主要是無監(jiān)督或者基于規(guī)則的時間序列檢測算法,可以用于常規(guī)的異常檢測。
- 檢測離群點 離群點是和普通數(shù)據(jù)差異極大的數(shù)據(jù)點。adtk主要提供了包括 adtk.detector.ThresholdAD adtk.detector.QuantileAD adtk.detector.InterQuartileRangeAD adtk.detector.GeneralizedESDTestAD的檢測算法。
- ThresholdAD
- adtk.detector.ThresholdAD(low=None, high=None)
- 參數(shù):
- low:下限,小于此值,視為異常
- high:上限,大于此值,視為異常
- 原理:通過認為設定上下限來識別異常
- 總結:固定閾值算法
- from adtk.detector import ThresholdAD
- threshold_ad = ThresholdAD(high=30, low=15)
- anomalies = threshold_ad.detect(s)
- QuantileAD
- adtk.detector.QuantileAD(low=None, high=None)
- 參數(shù):
- low:分位下限,范圍(0,1),當low=0.25時,表示Q1
- high:分位上限,范圍(0,1),當low=0.25時,表示Q3
- 原理:通過歷史數(shù)據(jù)計算出給定low與high對應的分位值Q_low,Q_high,小于Q_low或大于Q_high,視為異常
- 總結:分位閾值算法
- from adtk.detector import QuantileAD
- quantile_ad = QuantileAD(high=0.99, low=0.01)
- anomalies = quantile_ad.fit_detect(s)
- InterQuartileRangeAD
- adtk.detector.InterQuartileRangeAD(c=3.0)
- 參數(shù):
- c:分位距的系數(shù),用來確定上下限,可為float,也可為(float,float)
- 原理:
- 當c為float時,通過歷史數(shù)據(jù)計算出 Q3+c*IQR 作為上限值,大于上限值視為異常
- 當c=(float1,float2)時,通過歷史數(shù)據(jù)計算出 (Q1-c1*IQR, Q3+c2*IQR) 作為正常范圍,不在正常范圍視為異常
- 總結:箱線圖算法
- from adtk.detector import InterQuartileRangeAD
- iqr_ad = InterQuartileRangeAD(c=1.5)
- anomalies = iqr_ad.fit_detect(s)
- GeneralizedESDTestAD
- adtk.detector.GeneralizedESDTestAD(alpha=0.05)
- 參數(shù):
- alpha:顯著性水平 (Significance level),alpha越小,表示識別出的異常約有把握是真異常
- 原理:將樣本點的值與樣本的均值作差后除以樣本標準差,取最大值,通過t分布計算閾值,對比閾值確定異常點
- 計算步驟簡述:
- 設置顯著水平alpha,通常取0.05
- 指定離群比例h,若h=5%,則表示50各樣本中存在離群點數(shù)為2
- 計算數(shù)據(jù)集的均值mu與標準差sigma,將所有樣本與均值作差,取絕對值,再除以標準差,找出最大值,得到esd_1
- 在剩下的樣本點中,重復步驟3,可以得到h個esd值
- 為每個esd值計算critical value: lambda_i (采用t分布計算)
- 統(tǒng)計每個esd是否大于lambda_i,大于的認為你是異常
- from adtk.detector import GeneralizedESDTestAD
- esd_ad = GeneralizedESDTestAD(alpha=0.3)
- anomalies = esd_ad.fit_detect(s)
- 突變:Spike and Level Shift 異常的表現(xiàn)形式不是離群點,而是通過和臨近點的比較,即突增或者突降。adtk提供adtk.detector.PersistAD 和 adtk.detector.LevelShiftAD檢測方法
- PersistAD
- adtk.detector.PersistAD(window=1, c=3.0, side='both', min_periods=None, agg='median')
- 參數(shù):
- window:參考窗長度,可為int, str
- c:分位距倍數(shù),用于確定上下限范圍
- side:檢測范圍,為'positive'時檢測突增,為'negative'時檢測突降,為'both'時突增突降都檢測
- min_periods:參考窗中最小個數(shù),小于此個數(shù)將會報異常,默認為None,表示每個時間點都得有值
- agg:參考窗中的統(tǒng)計量計算方式,因為當前值是與參考窗中產生的統(tǒng)計量作比較,所以得將參考窗中的數(shù)據(jù)計算成統(tǒng)計量,默認'median',表示去參考窗的中位值
- 原理:
- 用滑動窗口遍歷歷史數(shù)據(jù),將窗口后的一位數(shù)據(jù)與參考窗中的統(tǒng)計量做差,得到一個新的時間序列s1;
- 計算s1的(Q1-c*IQR, Q3+c*IQR) 作為正常范圍;
- 若當前值與它參考窗中的統(tǒng)計量之差,不在2中的正常范圍內,視為異常。
- 調參:
- window:越大,模型越不敏感,不容易被突刺干擾
- c:越大,對于波動大的數(shù)據(jù),正常范圍放大較大,對于波動較小的數(shù)據(jù),正常范圍放大較小
- min_periods:對缺失值的容忍程度,越大,越不允許有太多的缺失值
- agg:統(tǒng)計量的聚合方式,跟統(tǒng)計量的特性有關,如 'median'不容易受極端值影響
- 總結:先計算一條新的時間序列,再用箱線圖作異常檢測
- from adtk.detector import PersistAD
- persist_ad = PersistAD(c=3.0, side='positive')
- anomalies = persist_ad.fit_detect(s)
- LevelShiftAD
- adtk.detector.LevelShiftAD(window, c=6.0, side='both', min_periods=None)
- 參數(shù):
- window:支持(10,5),表示使用兩個相鄰的滑動窗,左側的窗中的中位值表示參考值,右側窗中的中位值表示當前值
- c:越大,對于波動大的數(shù)據(jù),正常范圍放大較大,對于波動較小的數(shù)據(jù),正常范圍放大較小,默認6.0
- side:檢測范圍,為'positive'時檢測突增,為'negative'時檢測突降,為'both'時突增突降都檢測
- min_periods:參考窗中最小個數(shù),小于此個數(shù)將會報異常,默認為None,表示每個時間點都得有值
- 原理:
- 該模型用于檢測突變情況,相比于PersistAD,其抗抖動能力較強,不容易出現(xiàn)誤報
- from adtk.detector import LevelShiftAD
- level_shift_ad = LevelShiftAD(c=6.0, side='both', window=5)
- anomalies = level_shift_ad.fit_detect(s)
- 季節(jié)性
- adtk.detector.SeasonalAD
- adtk.detector.SeasonalAD(freq=None, side='both', c=3.0, trend=False)
- SeasonalAD主要是根據(jù)ClassicSeasonalDecomposition來處理,判斷。
- 參數(shù):
- freq:季節(jié)性周期
- c:越大,對于波動大的數(shù)據(jù),正常范圍放大較大,對于波動較小的數(shù)據(jù),正常范圍放大較小,默認6.0
- side:檢測范圍,為'positive'時檢測突增,為'negative'時檢測突降,為'both'時突增突降都檢測
- trend: 是否考慮趨勢
- from adtk.detector import SeasonalAD
- seasonal_ad = SeasonalAD(c=3.0, side="both")
- anomalies = seasonal_ad.fit_detect(s)
- plot(s, anomaly=anomalies, ts_markersize=1, anomaly_color='red', anomaly_tag="marker", anomaly_markersize=2);
- pipe 組合算法
- from adtk.pipe import Pipeline
- steps = [
- ("deseasonal", ClassicSeasonalDecomposition()),
- ("quantile_ad", QuantileAD(high=0.995, low=0.005))
- ]
- pipeline = Pipeline(steps)
- anomalies = pipeline.fit_detect(s)
- plot(s, anomaly=anomalies, ts_markersize=1, anomaly_markersize=2, anomaly_tag="marker", anomaly_color='red');
6. 總結
本文介紹了時間序列異常檢測的無監(jiān)督算法工具包ADTK。ADTK提供了簡單的異常檢測算法和時間序列特征加工函數(shù),希望對你有幫助。總結如下:
- adtk要求輸入數(shù)據(jù)為datetimeIndex,validate_series來驗證數(shù)據(jù)有效性,使得時間有序
- adtk單窗口和double窗口滑動,加工統(tǒng)計特征
- adtk分解時間序列的季節(jié)部分,獲得時間序列的殘差部分,可根據(jù)這個判斷異常點
- adtk支持離群點、突變和季節(jié)性異常檢測。通過fit_detect 獲取異常點序列,也可以通過Pipeline聯(lián)通多部異常檢測算法