動態 ROI 區域監控與實例分割
區域監控是計算機視覺中的一個熱門應用,它通過監控一段時間內進入和離開特定區域的實體數量來實現。常見的區域監控應用場景包括建筑工地安全、病人監控和禁區控制。
這個區域,通常被稱為感興趣區域(ROI),通常由用戶設置為一個靜態的多邊形形狀,覆蓋攝像機捕捉到的場景的一部分。雖然這種方法在固定位置的攝像機上效果良好,但在感興趣區域或攝像機本身移動的情況下,如自動駕駛車輛或機器人應用中,可能就不太適用。
在本文中,我們考慮專用車道監控的問題。城市中通常有為自行車、公交車和拼車車輛等類型車輛指定的車道。然而,執法限制常常導致錯誤車輛使用或阻塞這些車道。為了解決這個問題,我們將開發一個可以在車輛儀表盤攝像頭上使用的專用車道監控應用。
訓練模型
為了檢測專用車道,我們使用Roboflow訓練一個自定義分割模型。Roboflow是一個在線平臺,幫助用戶快速創建、訓練和部署計算機視覺模型。他們的入門教程是學習平臺關鍵功能的好起點。
與生成邊界框預測的目標檢測模型不同,分割模型預測目標物體的精確像素邊界,提供關于其形狀和大小的更豐富信息。除了物理物體外,它們在檢測空間區域(如專用車道)方面也非常有效。
將我們的數據集上傳到新的Roboflow項目后,我們使用智能多邊形工具來標注專用車道。
我們創建第二個Roboflow項目,這次選擇“目標檢測”作為項目類型。這個模型將檢測車輛。使用邊界框工具上傳并標注圖像數據集。
一旦兩個數據集都被標注并分割為訓練/驗證/測試集,就可以使用Roboflow訓練和托管模型。
過濾檢測結果
除了我們的計算機視覺模型外,我們還需要編寫程序邏輯來過濾檢測結果,只包括在專用車道內行駛的車輛。讓我們用一個簡單的Python腳本來實現這一點。
下面的代碼將我們在Roboflow上托管的模型下載到本地機器,并使用它們對輸入圖像進行推理。
from inference import get_model
import cv2
# load hosted models from Roboflow
vehicle_model = get_model("sg-vehicles/1")
lane_model = get_model("sg-bus-lanes/1")
# process an image
image = cv2.imread("/your/image.png")
vehicle_results = vehicle_model.infer(image)[0]
lane_results = lane_model.infer(image)[0]
我們使用Supervision庫來可視化我們的預測。
import supervision as sv
# convert predictions into supervision detection objects
vehicle_detections = sv.Detections.from_inference(vehicle_results)
lane_detections = sv.Detections.from_inference(lane_results)
# initialize annotators
bounding_box_annotator = sv.BoxAnnotator()
mask_annotator = sv.MaskAnnotator()
label_annotator = sv.LabelAnnotator()
# annotate image
image = bounding_box_annotator.annotate(scene=image, detections=vehicle_detections)
image = label_annotator.annotate(scene=image, detections=vehicle_detections)
image = mask_annotator.annotate(scene=image, detections=lane_detections)
image = label_annotator.annotate(scene=image, detections=lane_detections)
# show image
cv2.imshow("output", image)
如上圖所示,雖然總共檢測到四輛車,但實際上只有一輛車在專用車道內行駛。過濾不需要的車輛的一種方法是只考慮其邊界框與專用車道分割掩碼相交的檢測結果。我們使用以下函數來實現這一點,該函數簡單地檢查掩碼的任何像素是否位于給定窗口內:
import numpy as np
def filter_detection_by_mask(detection, mask):
num_objects = len(detection)
filter_mask = np.empty(num_objects, dtype='bool')
for idx in range(num_objects):
bbox = detection.xyxy[idx].astype(int)
filter_mask[idx] = np.any(mask[bbox[1]:bbox[3], bbox[0]:bbox[2]])
return detection[filter_mask]
# filter detections by mask
lane_mask = lane_detections.mask
lane_mask = np.any(lane_mask, axis=0)
vehicle_detections = filter_detection_by_mask(vehicle_detections, lane_mask)
可視化過濾后的檢測結果,我們觀察到只有白色貨車保留下來。
跟蹤區域入侵
為了測量每輛錯誤車輛在專用車道內行駛的時間長度,我們需要跟蹤其在各幀中的位置。我們使用ByteTrack來實現這一點,ByteTrack是一種多目標跟蹤算法,它在后續幀之間匹配目標檢測并為每個檢測分配一個唯一的跟蹤ID。
我們定義以下函數,該函數接受過濾后的檢測結果、一個ByteTrack實例和一個記錄唯一檢測結果出現幀數的Python字典。在一定幀數后未被檢測到的跟蹤對象將被移除。
import supervision as sv
# ByteTrack
tracker = sv.ByteTrack()
# dict of python tuples
# [0: number of detections, 1: frames since last detection]
tracked_objects = {}
def track_detections(detections, tracker, tracked_objects):
detections = tracker.update_with_detections(detections)
# remove old detections
max_misses = fps
expired_ids = []
for id in tracked_objects:
tracked_objects[id][1] += 1
if tracked_objects[id][1] > max_misses:
expired_ids.append(id)
for id in expired_ids:
tracked_objects.pop(id)
# add or modify based on current detections
for id in list(detections.tracker_id):
if id in tracked_objects:
tracked_objects[id][0] += 1
tracked_objects[id][1] = 0
else:
tracked_objects[id] = [1, 0]
return detections
vehicle_detections = track_detections(vehicle_detections)
我們修改我們的標注腳本,以打印專用車道內每輛車的跟蹤ID和持續時間。總時間可以通過將車輛被跟蹤的幀數乘以視頻幀率(FPS)來計算。
使用Roboflow管道進行視頻推理
現在我們的代碼可以在圖像上運行了,讓我們修改它以處理視頻文件。我們首先將代碼轉換為一個單一的Python類。
class LaneIntrusionModel:
def __init__(self):
self.vehicle_model = get_model("sg-vehicles/1")
self.lane_model = get_model("sg-bus-lanes/1")
self.tracker = sv.ByteTrack()
self.tracker.reset()
self.tracked_objects = {}
self.fps = 30
self.bounding_box_annotator = sv.BoxAnnotator()
self.mask_annotator = sv.MaskAnnotator()
self.label_annotator = sv.LabelAnnotator()
# public methods
def infer(self, image):
vehicle_results = self.vehicle_model.infer(image)
lane_results = self.lane_model.infer(image)
return list(zip(vehicle_results, lane_results))
def process_inference(self, image, vehicle_results, lane_results):
vehicle_detections, lane_detections = self.get_detections(image, vehicle_results, lane_results)
vehicle_detections = self.track_detections(vehicle_detections)
annotated_image = self.annotate_frame(image, vehicle_detections, lane_detections)
return annotated_image
## private methods
def get_detections(self, image, vehicle_results, lane_results):
vehicle_detections = sv.Detections.from_inference(vehicle_results)
lane_detections = sv.Detections.from_inference(lane_results)
lane_mask = lane_detections.mask
if lane_mask is None or lane_mask.shape[0] == 0:
return vehicle_detections[vehicle_detections.class_id == -1], lane_detections
lane_mask = np.any(lane_mask, axis=0)
vehicle_detections = filter_detection_by_mask(vehicle_detections, lane_mask)
return vehicle_detections, lane_detections
def track_detections(self, detections):
detections = self.tracker.update_with_detections(detections)
# remove old detections
max_misses = self.fps
expired_ids = []
for id in self.tracked_objects:
self.tracked_objects[id][1] += 1
if self.tracked_objects[id][1] > max_misses:
expired_ids.append(id)
for id in expired_ids:
self.tracked_objects.pop(id)
# add or modify based on current detections
for id in list(detections.tracker_id):
if id in self.tracked_objects:
self.tracked_objects[id][0] += 1
self.tracked_objects[id][1] = 0
else:
self.tracked_objects[id] = [1, 0]
return detections
def annotate_frame(self, image, vehicle_detections, lane_detections):
if len(vehicle_detections):
labels = []
for id in list(vehicle_detections.tracker_id):
time_tracked = self.tracked_objects[id][0]/self.fps
labels.append(f"vehicle:#{id} time:{time_tracked:.2f}s")
image = self.bounding_box_annotator.annotate(scene=image, detections=vehicle_detections)
image = self.label_annotator.annotate(scene=image, detections=vehicle_detections, labels=labels)
if len(lane_detections):
image = self.mask_annotator.annotate(scene=image, detections=lane_detections)
image = self.label_annotator.annotate(scene=image, detections=lane_detections)
return image
由于深度學習模型的計算需求,實時處理可能具有挑戰性,通常需要專門的硬件(如GPU)。Roboflow提供了InferencePipeline工具包(https://inference.roboflow.com/using_inference/inference_pipeline/),通過在單獨的線程上運行預處理、推理和后處理,并支持批量推理,來提高處理時間。我們首先定義以下回調函數:
from inference.core.interfaces.camera.entities import VideoFrame
from inference import InferencePipeline
from typing import Union, List, Optional, Any
model = LaneIntrusionModel()
def on_video_frame(video_frames: List[VideoFrame]) -> List[Any]:
images = [v.image for v in video_frames]
return model.infer(images)
def on_prediction(
predictions: Union[dict, List[Optional[dict]]],
video_frame: Union[VideoFrame, List[Optional[VideoFrame]]]
) -> None:
if not issubclass(type(predictions), list):
# this is required to support both sequential and batch processing with single code
# if you use only one mode - you may create function that handles with only one type
# of input
predictions = [predictions]
video_frame = [video_frame]
for result, frame in zip(predictions, video_frame):
annotated_image = model.process_inference(frame.image, result[0], result[1])
cv2.imshow("output", annotated_image)
cv2.waitKey(1)
然后,我們使用以下腳本在視頻文件上運行管道:
pipeline = InferencePipeline.init_with_custom_logic(
max_fps=out_fps,
video_reference="/your/video.mp4",
on_video_frame=on_video_frame,
on_prediction=on_prediction,
)
# start the pipeline
pipeline.start()
# wait for the pipeline to finish
pipeline.join()
得到以下結果:
結論
總之,本教程展示了如何使用Roboflow Inference實現實時區域監控。分割模型允許動態檢測圖像場景中的感興趣區域,從而在持續變化的環境中實現區域監控。
除了專用車道監控外,動態區域監控的其他應用場景可能包括:
- 送貨無人機掃描著陸區的包裹
- 閉路電視攝像機監控高低潮之間的海岸線游泳者
實時視頻處理的限制通常將這些應用中的模型類型限制為快速、單階段的檢測器,如YOLO和SSD,這些模型需要強大的訓練數據才能有效。未來的ML優化和硬件工作可能需要將動態區域監控能力引入零樣本檢測模型。