166 lines
6.8 KiB
Python
166 lines
6.8 KiB
Python
import cv2
|
|
import time
|
|
from collections import defaultdict
|
|
from backend.models.yolo_manager import YOLOManager
|
|
from backend.models.clip_manager import CLIPManager
|
|
from backend.utils.image_utils import is_blurry, crop_image, convert_cv2_to_pil
|
|
|
|
class VideoProcessor:
|
|
def __init__(self, yolo_model_path="yolov8n.pt", clip_model_id="openai/clip-vit-base-patch32"):
|
|
self.yolo = YOLOManager(yolo_model_path)
|
|
self.clip = CLIPManager(clip_model_id)
|
|
|
|
# Buffer to store the best shot for each track ID
|
|
# Format: {track_id: {'crop': np.array, 'area': float, 'frame_idx': int, 'bbox': list}}
|
|
self.active_tracks = {}
|
|
|
|
# Store final results
|
|
self.final_results = []
|
|
|
|
# CLIP Candidates
|
|
self.pothole_labels = ["pothole", "shadow", "patch work", "manhole", "road crack"]
|
|
self.sign_labels = ["stop sign", "yield sign", "speed limit 30", "speed limit 40", "speed limit 50", "speed limit 60", "pedestrian crossing", "no u-turn", "traffic light", "keep right"]
|
|
|
|
# Frame counter
|
|
self.frame_count = 0
|
|
|
|
def process_video(self, video_path: str):
|
|
cap = cv2.VideoCapture(video_path)
|
|
if not cap.isOpened():
|
|
print(f"Error opening video: {video_path}")
|
|
return []
|
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
print(f"Processing video {video_path} ({width}x{height})...")
|
|
|
|
while cap.isOpened():
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
self.frame_count += 1
|
|
|
|
# 1. Run YOLO Tracking
|
|
results = self.yolo.track(frame)
|
|
|
|
if results.boxes is None or results.boxes.id is None:
|
|
continue
|
|
|
|
boxes = results.boxes.xyxy.cpu().numpy()
|
|
track_ids = results.boxes.id.cpu().numpy()
|
|
class_ids = results.boxes.cls.cpu().numpy() # 0, 1, 2 depending on training
|
|
|
|
current_frame_ids = set()
|
|
|
|
for box, track_id, cls in zip(boxes, track_ids, class_ids):
|
|
track_id = int(track_id)
|
|
current_frame_ids.add(track_id)
|
|
|
|
x1, y1, x2, y2 = box
|
|
w_box = x2 - x1
|
|
h_box = y2 - y1
|
|
area = w_box * h_box
|
|
|
|
# Check if this is the "best shot" so far
|
|
if track_id not in self.active_tracks:
|
|
self.active_tracks[track_id] = {
|
|
'crop': crop_image(frame, box),
|
|
'area': area,
|
|
'frame_idx': self.frame_count,
|
|
'class_id': int(cls),
|
|
'bbox': box,
|
|
'processed': False
|
|
}
|
|
else:
|
|
# Update if bigger area and not processed yet
|
|
if area > self.active_tracks[track_id]['area'] and not self.active_tracks[track_id]['processed']:
|
|
self.active_tracks[track_id].update({
|
|
'crop': crop_image(frame, box),
|
|
'area': area,
|
|
'frame_idx': self.frame_count,
|
|
'bbox': box
|
|
})
|
|
|
|
# Trigger Classification if object is near the edge (leaving frame)
|
|
# Margin of 50 pixels
|
|
if x1 < 50 or y1 < 50 or x2 > width - 50 or y2 > height - 50:
|
|
self._classify_and_store(track_id)
|
|
|
|
# Cleanup tracks that are no longer present (Exited frame)
|
|
# Identify tracks in self.active_tracks that are NOT in current_frame_ids
|
|
# We need to be careful not to classify already processed/removed tracks
|
|
# But here we execute the "Trigger A"
|
|
|
|
# Simple approach: Check all active tracks. If a track was seen recently but not now, assume it left?
|
|
# Better approach for simplicity: We do classification on 'processed' flag or when finalizing.
|
|
# Real ByteTrack keeps tracks 'lost' for some frames.
|
|
# Here we will iterate existing keys and if not in current_frame, classify.
|
|
|
|
# Cleanup tracks that are no longer present (Exited frame)
|
|
# We iterate over a copy of keys to avoid RuntimeError
|
|
for tid in list(self.active_tracks.keys()):
|
|
if tid not in current_frame_ids:
|
|
# It's gone from view (or mostly gone)
|
|
self._classify_and_store(tid)
|
|
# Remove from active tracks to save memory
|
|
if self.active_tracks[tid].get('processed'):
|
|
del self.active_tracks[tid]
|
|
|
|
cap.release()
|
|
|
|
# Process any remaining tracks
|
|
for tid in list(self.active_tracks.keys()):
|
|
self._classify_and_store(tid)
|
|
|
|
print("Processing complete.")
|
|
return self.final_results
|
|
|
|
def _classify_and_store(self, track_id):
|
|
track_data = self.active_tracks.get(track_id)
|
|
if not track_data or track_data.get('processed'):
|
|
return
|
|
|
|
crop = track_data['crop']
|
|
|
|
# Blur check - if too blurry, maybe skip or mark low confidence?
|
|
# For now, we process anyway but could log it.
|
|
# if is_blurry(crop): ...
|
|
|
|
# Prepare for CLIP
|
|
pil_image = convert_cv2_to_pil(crop)
|
|
|
|
# Deciding which labels to use based on YOLO class
|
|
# Assuming YOLO classes: 0=Sign, 1=Pothole/Manhole (Just an example schema)
|
|
# You'd need to map this to your specific training.
|
|
# For this logic, let's try both or fallback.
|
|
|
|
# Strategy: Classify against ALL relevant labels to be safe?
|
|
# Or split if we trust YOLO class.
|
|
# Let's trust YOLO class if available.
|
|
# For this template, I will simply check against both lists and take the max confidence one.
|
|
|
|
candidates = self.sign_labels + self.pothole_labels
|
|
best_label, score = self.clip.get_best_match(pil_image, candidates, threshold=0.5)
|
|
|
|
obj_type = "Traffic Sign" if best_label in self.sign_labels else "Road Damage"
|
|
|
|
result = {
|
|
"id": track_id,
|
|
"type": obj_type,
|
|
"subtype": best_label,
|
|
"confidence": float(score),
|
|
"frame_idx": track_data['frame_idx'],
|
|
# In a real app, you might save the crop to disk and return a URL
|
|
# "crop_path": save_to_disk...
|
|
}
|
|
|
|
self.final_results.append(result)
|
|
self.active_tracks[track_id]['processed'] = True
|
|
|
|
if __name__ == "__main__":
|
|
# Test run
|
|
processor = VideoProcessor()
|
|
# processor.process_video("test_video.mp4")
|