Embedding空間中的時序異常檢測,你學會了嗎?
1.背景
在安全、反作弊等業務場景下,對流量、用戶行為進行異常檢測是基本的剛需。通常的做法是,在各個業務維度上,對流量、用戶行為進行統計分析,提取出相應的指標特征,然后在時間維度上,對這些指標特征進行建模分析。再利用相關的算法來檢測當前的指標值是否背離了該指標在歷史數據中的分布規律。
2.示例
假設某業務場景下,用戶有100個來源渠道,用戶使用產品時,有10種不同的操作方式,對于用戶的行為,我們可以簡單的撮取出PV、UV、失敗率等指標。那么我們可以建立這樣一個監控:
監控的維度:來源渠道 * 操作方式 = 100 * 10 = 1000個維度
監控的指標:PV、UV、失敗率...
統計周期: 小時
然后針對每個維度、時刻、指標,收集過去30天的數據做為訓練樣本,訓練異常檢測模型(如EllipticEnvelope等),然后對當前時刻的指標值,進行異常檢測。
上面的方法,通過合理的拆分監控維度,一方面可以有效的提高檢測的靈敏度,避免較少的異常流量淹沒在大盤監控在隨機波動中;另一方面,也可以對異常流量進行快速的定位,便于及時處理。
3.問題
上面的方法也存在諸多的限制,比如:
- 監控維度必需是離散、可枚舉的,否則無法建立歷史數據的統計模型;
- 監控維度的粒度必須合適,否則或是靈敏度不足,或是噪聲太多,無法有效檢測異常。
顯然,不是所有的業務場景都能滿足上述的要求。即便是能滿足上述要求的業務場景中,隨著對攻擊者的對抗不斷深入,攻擊者會嘗試降低攻擊的規模,并盡量將攻擊行為分散到更多的維度中,從而躲避我們的檢測手段。
4.解決思路
那么,能否不依賴業務維度拆分,直接對指標進行異常檢測呢?
首先,我們需要把待檢測的每一條日志、數據當做一個獨立的樣本。接下來,不難聯想到,這些樣本都可以映射到某個高維空間中,我們把這個空間叫做樣本空間。可以通過向量化、Embedding等方法,得到樣本在這個空間中的坐標。
樣本在這個空間中的分布必然不是完全隨機的,而是會存在一定的特點(分布特征)。若當前時刻樣本在這個空間中的分布特征與歷史數據中的分布特征不一致,則說明當前樣本存在異常。而分布在差異最大的區域中的樣本,則可以認為是異常樣本。
接下來的問題就變成了如何對這種分布特征進行建模?
最先想到的是,我們可以通過聚類算法,來對樣本進行劃分,再對每個Cluster,提取出統計特征。但在具體實現時還需要考慮以下問題:
- 支持的樣本數量要足夠多;
- 支持的Cluster數量要足夠多;
- 每個Cluster的樣本數量要盡可能均勻;
- Cluster的劃分要盡可能穩定,才能在時間維度上執行異常檢測。
再進一步,其實我們不需要執行完整的聚類算法,我們只需要對樣本空間設置足夠多的采樣點進行采樣,計算出采樣點附近的樣本的統計特征做為采集采樣點的分布特征,再對采樣點的特征進行時間維度的異常檢測,即可完成對整個樣本空間的異常檢測了。
圖片
5.算法實驗
5.1 數據準備
取某業務場景下近30天的用戶行為日志,約160萬條,利用其中的UserAgent信息,對其進行向量化處理。每條日志的向量長度為128維。
向量化算法:
def to_vector(ua):
if isinstance(ua, (list, tuple)):
return [to_vector(c) for c in ua]
else:
vec = np.zeros(128)
for c in ua:
vec[ord(c) % 128] += 1 # UserAgent中的字符絕大多數都是Ascll字符,所以取余128
l2 = np.sqrt(np.sum(vec * vec))
if l2 != 0:
vec /= l2
return vec.tolist()
將清洗好的數據保存到向量DB中備用:
for day in days:
for hour in hours:
event_day = day.strftime("%Y%m%d")
event_hour = "{:02d}".format(hour)
collection = chroma_client.get_or_create_collection(
name="{}_{}_{}".format(name_prefix, event_day, event_hour)
)
sub_df = df_ua_pv[(df_ua_pv.event_day == event_day) & (df_ua_pv.event_hour == event_hour)]
ids = [hashlib.md5(bytes(str(row), "utf-8")).hexdigest() for _, row in sub_df.iterrows()]
docs = [row.ua for _, row in sub_df.iterrows()]
metadatas = [{"pv": row.pv} for _, row in sub_df.iterrows()]
embeddings = [to_vector(row.ua) for _, row in sub_df.iterrows()]
batch_size = 10000
for batch_id in range(0, len(docs), batch_size):
collection.upsert(
ids=ids[batch_id : batch_id + batch_size],
documents=docs[batch_id : batch_id + batch_size],
metadatas=metadatas[batch_id : batch_id + batch_size],
embeddings=embeddings[batch_id : batch_id + batch_size],
)
print("{:>8d} / {}".format(batch_id + batch_size, len(docs)))
collections[event_day + event_hour] = collection
為了更方便的驗證算法的有效性,在數據集中,人工構造了一些異常樣本,包括:
- 個別隨機UA,PV增長:10%, 20%, 50%, 100%, 200%, 500%,1000%;數量:5;min_pv=100。
- 部分相似UA,PV增長:5%,10%,20%, 50%, 100%;數量:10, 20, 50, 100;min_pv=10。
- 生成相似UA,PV同比增長,數量:10, 20, 50, 100。
- 生成相似UA,整體PV不增長,數量:10, 20, 50, 100;min_pv=1。
5.2 算法實現
隨機生成采樣點:
query_ua_list = (
df_ua_pv[(df_ua_pv.event_day == event_day) & (df_ua_pv.event_hour == event_hour)].sample(100)["ua"].to_list()
)
在樣本空間進行鄰近采樣:
results = []
query_ua_vec = to_vector(query_ua_list)
for day in days:
for hour in hours:
res = get_collection(day, hour).query(query_embeddings=query_ua_vec, n_results=n_results)
for i in range(len(query_ua_list)):
for j in range(n_results):
row = [
query_ua_list[i],
res["metadatas"][i][j]["event_day"],
res["metadatas"][i][j]["event_hour"],
res["documents"][i][j],
res["metadatas"][i][j]["pv"],
res["distances"][i][j],
]
if extra_fields:
for field in extra_fields:
row.append(res["metadatas"][i][j].get(field))
results.append(row)
cols = ["ua", "day", "hour", "doc", "pv", "dist"]
if extra_fields:
cols += extra_fields
df_results = pd.DataFrame(results, columns=cols)
定義要檢測的字段:
AREA_EXP = [0, 2, 8]
MODEL_FIELDS = ["pv", "dist"]
MODEL_FIELDS += [f"dens_{i}" for i in AREA_EXP]
MODEL_FIELDS += ["dens_s"]
MODEL_AGGS = {}
for col in MODEL_FIELDS:
MODEL_AGGS[f"{col}_mean"] = (col, "mean")
MODEL_AGGS[f"{col}_std"] = (col, "std")
進行天維度的異常檢測:
df_query_results["dens_s"] = 1 / (df_query_results["dist"] ** 0.5 + 1)
df_res_agg = df_query_results.groupby(["ua", "day"], as_index=False).agg(
pv=("pv", "sum"),
dist=("dist", "mean"),
dens_s=("dens_s", "mean"),
)
for i in AREA_EXP:
df_res_agg["area_{}".format(i)] = (df_res_agg["dist"] * 10) ** i
df_res_agg["dens_{}".format(i)] = df_res_agg["pv"] / df_res_agg["area_{}".format(i)]
df_model = df_res_agg[df_res_agg.day <= last_event_day].groupby("ua").agg(**MODEL_AGGS)
df_check = df_res_agg.join(df_model, notallow="ua")
for col in MODEL_FIELDS:
df_check[f"{col}_sigma"] = (df_check[col] - df_check[f"{col}_mean"]) / df_check[f"{col}_std"]
df_check["dens_avg_sigma"] = df_check[["dens_s_sigma"] + [f"dens_{i}_sigma" for i in AREA_EXP]].mean(axis=1)
df_check["dens_max_sigma"] = df_check[["dens_s_sigma"] + [f"dens_{i}_sigma" for i in AREA_EXP]].max(axis=1)
df_check["dens_min_sigma"] = df_check[["dens_s_sigma"] + [f"dens_{i}_sigma" for i in AREA_EXP]].min(axis=1)
6.實驗效果
6.1 實驗一
個別隨機UA,PV增長:10%, 20%, 50%, 100%, 200%, 500%,1000%;數量:5;min_pv=100。
異常樣本與原始樣本的異常置信度分布對比如下圖,由上到下分別為:
- 天級檢測下異常樣本的置信度分布;
- 天級檢測下正常樣本的置信度分布;
- 小時級檢測下異常樣本的置信度分布;
- 小時級檢測下正常樣本的置信度分布。
圖片
天級檢測不同閾值下的準召情況:
圖片
小時級檢測不同閾值下的準召情況:
圖片
6.2 實驗二
部分相似UA,PV增長:5%,10%,20%, 50%, 100%;數量:5, 10, 20; min_pv=10。
異常樣本與原始樣本的異常置信度分布對比如下圖,由上到下分別為:
- 天級檢測下異常樣本的置信度分布;
- 天級檢測下正常樣本的置信度分布;
- 小時級檢測下異常樣本的置信度分布;
- 小時級檢測下正常樣本的置信度分布。
圖片
天級檢測不同閾值下的準召情況:
圖片
小時級檢測不同閾值下的準召情況:
圖片
6.3 實驗三
生成相似UA,PV同比增長,數量:5, 10, 20, 50, 100。
異常樣本與原始樣本的異常置信度分布對比如下圖,由上到下分別為:
- 天級檢測下異常樣本的置信度分布;
- 天級檢測下正常樣本的置信度分布;
- 小時級檢測下異常樣本的置信度分布;
- 小時級檢測下正常樣本的置信度分布。
圖片
天級檢測不同閾值下的準召情況:
圖片
小時級檢測不同閾值下的準召情況:
圖片
6.4 實驗四
生成相似UA,整體PV不增長,數量:10, 20, 50, 100;min_pv=1。
異常樣本與原始樣本的異常置信度分布對比如下圖,由上到下分別為:
- 天級檢測下異常樣本的置信度分布;
- 天級檢測下正常樣本的置信度分布;
- 小時級檢測下異常樣本的置信度分布;
- 小時級檢測下正常樣本的置信度分布。
圖片
天級檢測不同閾值下的準召情況:
圖片
小時級檢測不同閾值下的準召情況:
圖片
7.總結與展望
通過實驗,驗證了該算法的有效性,但在后續的工程化應用中,還需要結合具體的應用場景進行適當的調整。比如采樣點的數量、采樣點的選取方法、樣本Embedding方法、距離計算方法等。
此外,在實踐中,若要發揮出異常檢測的真正價值,還需要考慮以下問題:
- 檢測到異常后,如何快速定位到異常樣本;
- 異常樣本定位后,如何快速度評估分析,確定異常是否需要進一步處理;
- 若需要進一步處理,如何快速定位到異常樣本來源特征,制定出相應的攻防策略等。