2026-02-09 12:50:10 +05:30

166 lines
6.8 KiB
Python

import cv2
import time
from collections import defaultdict
from backend.models.yolo_manager import YOLOManager
from backend.models.clip_manager import CLIPManager
from backend.utils.image_utils import is_blurry, crop_image, convert_cv2_to_pil
class VideoProcessor:
def __init__(self, yolo_model_path="yolov8n.pt", clip_model_id="openai/clip-vit-base-patch32"):
self.yolo = YOLOManager(yolo_model_path)
self.clip = CLIPManager(clip_model_id)
# Buffer to store the best shot for each track ID
# Format: {track_id: {'crop': np.array, 'area': float, 'frame_idx': int, 'bbox': list}}
self.active_tracks = {}
# Store final results
self.final_results = []
# CLIP Candidates
self.pothole_labels = ["pothole", "shadow", "patch work", "manhole", "road crack"]
self.sign_labels = ["stop sign", "yield sign", "speed limit 30", "speed limit 40", "speed limit 50", "speed limit 60", "pedestrian crossing", "no u-turn", "traffic light", "keep right"]
# Frame counter
self.frame_count = 0
def process_video(self, video_path: str):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error opening video: {video_path}")
return []
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"Processing video {video_path} ({width}x{height})...")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
self.frame_count += 1
# 1. Run YOLO Tracking
results = self.yolo.track(frame)
if results.boxes is None or results.boxes.id is None:
continue
boxes = results.boxes.xyxy.cpu().numpy()
track_ids = results.boxes.id.cpu().numpy()
class_ids = results.boxes.cls.cpu().numpy() # 0, 1, 2 depending on training
current_frame_ids = set()
for box, track_id, cls in zip(boxes, track_ids, class_ids):
track_id = int(track_id)
current_frame_ids.add(track_id)
x1, y1, x2, y2 = box
w_box = x2 - x1
h_box = y2 - y1
area = w_box * h_box
# Check if this is the "best shot" so far
if track_id not in self.active_tracks:
self.active_tracks[track_id] = {
'crop': crop_image(frame, box),
'area': area,
'frame_idx': self.frame_count,
'class_id': int(cls),
'bbox': box,
'processed': False
}
else:
# Update if bigger area and not processed yet
if area > self.active_tracks[track_id]['area'] and not self.active_tracks[track_id]['processed']:
self.active_tracks[track_id].update({
'crop': crop_image(frame, box),
'area': area,
'frame_idx': self.frame_count,
'bbox': box
})
# Trigger Classification if object is near the edge (leaving frame)
# Margin of 50 pixels
if x1 < 50 or y1 < 50 or x2 > width - 50 or y2 > height - 50:
self._classify_and_store(track_id)
# Cleanup tracks that are no longer present (Exited frame)
# Identify tracks in self.active_tracks that are NOT in current_frame_ids
# We need to be careful not to classify already processed/removed tracks
# But here we execute the "Trigger A"
# Simple approach: Check all active tracks. If a track was seen recently but not now, assume it left?
# Better approach for simplicity: We do classification on 'processed' flag or when finalizing.
# Real ByteTrack keeps tracks 'lost' for some frames.
# Here we will iterate existing keys and if not in current_frame, classify.
# Cleanup tracks that are no longer present (Exited frame)
# We iterate over a copy of keys to avoid RuntimeError
for tid in list(self.active_tracks.keys()):
if tid not in current_frame_ids:
# It's gone from view (or mostly gone)
self._classify_and_store(tid)
# Remove from active tracks to save memory
if self.active_tracks[tid].get('processed'):
del self.active_tracks[tid]
cap.release()
# Process any remaining tracks
for tid in list(self.active_tracks.keys()):
self._classify_and_store(tid)
print("Processing complete.")
return self.final_results
def _classify_and_store(self, track_id):
track_data = self.active_tracks.get(track_id)
if not track_data or track_data.get('processed'):
return
crop = track_data['crop']
# Blur check - if too blurry, maybe skip or mark low confidence?
# For now, we process anyway but could log it.
# if is_blurry(crop): ...
# Prepare for CLIP
pil_image = convert_cv2_to_pil(crop)
# Deciding which labels to use based on YOLO class
# Assuming YOLO classes: 0=Sign, 1=Pothole/Manhole (Just an example schema)
# You'd need to map this to your specific training.
# For this logic, let's try both or fallback.
# Strategy: Classify against ALL relevant labels to be safe?
# Or split if we trust YOLO class.
# Let's trust YOLO class if available.
# For this template, I will simply check against both lists and take the max confidence one.
candidates = self.sign_labels + self.pothole_labels
best_label, score = self.clip.get_best_match(pil_image, candidates, threshold=0.5)
obj_type = "Traffic Sign" if best_label in self.sign_labels else "Road Damage"
result = {
"id": track_id,
"type": obj_type,
"subtype": best_label,
"confidence": float(score),
"frame_idx": track_data['frame_idx'],
# In a real app, you might save the crop to disk and return a URL
# "crop_path": save_to_disk...
}
self.final_results.append(result)
self.active_tracks[track_id]['processed'] = True
if __name__ == "__main__":
# Test run
processor = VideoProcessor()
# processor.process_video("test_video.mp4")