from contextlib import asynccontextmanager import io from typing import List import numpy as np from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from PIL import Image from pydantic import BaseModel import insightface from insightface.app import FaceAnalysis from face_store import FaceStore import time # Initializing InsightFace model with better detection settings with robust detection face_analyzer = FaceAnalysis( providers=['CPUExecutionProvider'], allowed_modules=['detection', 'recognition'] ) face_analyzer.prepare(ctx_id=0, det_size=(640, 640)) # Initializing face store face_store = FaceStore() @asynccontextmanager async def lifespan(app: FastAPI): print("Initializing face recognition system...") yield print("Cleaning up resources...") app = FastAPI(lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class Visitor(BaseModel): name: str encoded_face: List[float] class Config: from_attributes = True def get_largest_face(faces): """Select the largest face from detected faces based on bounding box area.""" if not faces: return None # Calculating areas of all faces areas = [(face, (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1])) for face in faces] # Returning face with largest area return max(areas, key=lambda x: x[1])[0] def process_image(image_data: bytes): """Process image and return embedding of the largest face.""" try: image_stream = io.BytesIO(image_data) image_pil = Image.open(image_stream).convert("RGB") # Resizing image if too large (optional, adjust dimensions as needed) max_size = 1920 if max(image_pil.size) > max_size: ratio = max_size / max(image_pil.size) new_size = tuple(int(dim * ratio) for dim in image_pil.size) image_pil = image_pil.resize(new_size, Image.Resampling.LANCZOS) image_np = np.array(image_pil) faces = face_analyzer.get(image_np) if not faces: return None, "No face detected" # Get the largest face largest_face = get_largest_face(faces) # Converting embedding to numpy array to ensure consistent format embedding = np.array(largest_face.embedding, dtype=np.float32) return embedding, None except Exception as e: return None, f"Error processing image: {str(e)}" @app.get("/") async def health_check(): return {"message": "Face recognition API is running"} @app.post("/api/register") async def register_visitor(name: str = Form(...), image: UploadFile = File(...)): try: image_data = await image.read() embedding, error = process_image(image_data) if error: return {"message": error} # Converting embedding to numpy array if it isn't already embedding = np.array(embedding, dtype=np.float32) # Adding debug logging print(f"Registering face for {name}") print(f"Embedding shape: {embedding.shape}") print(f"Embedding type: {type(embedding)}") # Checking if face already exists existing_match = face_store.search_face(embedding) if existing_match: return { "message": "Visitor already exists", "name": existing_match[0] } # Registering new face face_store.add_face(name, embedding) # Verifying registration verification = face_store.search_face(embedding) if not verification: raise HTTPException(status_code=500, detail="Face registration failed verification") return { "message": "Visitor registered successfully", "name": name } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/search") async def search_visitor(image: UploadFile = File(...)): try: image_data = await image.read() start_time = time.time() print(f"API request started at: {time.strftime('%H:%M:%S')}") embedding, error = process_image(image_data) if error: return {"message": error} # Converting embedding to numpy array if it isn't already embedding = np.array(embedding, dtype=np.float32) # Adding debug logging print(f"Searching for face") print(f"Embedding shape: {embedding.shape}") print(f"Embedding type: {type(embedding)}") search_start = time.time() match = face_store.search_face(embedding) search_time = time.time() - search_start print(f"Face search took: {search_time:.4f} seconds") total_time = time.time() - start_time print(f"Total API processing time: {total_time:.4f} seconds") if match: name, confidence = match return { "message": "Visitor found", "name": name, "confidence": confidence, "search_time": search_time, "processing_time": total_time } return { "message": "Visitor not found", "processing_time": total_time } except Exception as e: raise HTTPException(status_code=500, detail=str(e))