|
|
- from contextlib import asynccontextmanager
- import io
- from typing import List
- import numpy as np
- from fastapi import FastAPI, File, Form, HTTPException, UploadFile
- from fastapi.middleware.cors import CORSMiddleware
- from PIL import Image
- from pydantic import BaseModel
- import insightface
- from insightface.app import FaceAnalysis
- from face_store import FaceStore
- import time
-
- # Initializing InsightFace model with better detection settings with robust detection
- face_analyzer = FaceAnalysis(
- providers=['CPUExecutionProvider'],
- allowed_modules=['detection', 'recognition']
- )
- face_analyzer.prepare(ctx_id=0, det_size=(640, 640))
-
- # Initializing face store
- face_store = FaceStore()
-
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- print("Initializing face recognition system...")
- yield
- print("Cleaning up resources...")
-
- app = FastAPI(lifespan=lifespan)
-
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
-
- class Visitor(BaseModel):
- name: str
- encoded_face: List[float]
-
- class Config:
- from_attributes = True
-
- def get_largest_face(faces):
- """Select the largest face from detected faces based on bounding box area."""
- if not faces:
- return None
-
- # Calculating areas of all faces
- areas = [(face, (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1]))
- for face in faces]
- # Returning face with largest area
- return max(areas, key=lambda x: x[1])[0]
-
- def process_image(image_data: bytes):
- """Process image and return embedding of the largest face."""
- try:
- image_stream = io.BytesIO(image_data)
- image_pil = Image.open(image_stream).convert("RGB")
-
- # Resizing image if too large (optional, adjust dimensions as needed)
- max_size = 1920
- if max(image_pil.size) > max_size:
- ratio = max_size / max(image_pil.size)
- new_size = tuple(int(dim * ratio) for dim in image_pil.size)
- image_pil = image_pil.resize(new_size, Image.Resampling.LANCZOS)
-
- image_np = np.array(image_pil)
- faces = face_analyzer.get(image_np)
-
- if not faces:
- return None, "No face detected"
-
- # Get the largest face
- largest_face = get_largest_face(faces)
-
- # Converting embedding to numpy array to ensure consistent format
- embedding = np.array(largest_face.embedding, dtype=np.float32)
- return embedding, None
-
- except Exception as e:
- return None, f"Error processing image: {str(e)}"
-
- @app.get("/")
- async def health_check():
- return {"message": "Face recognition API is running"}
-
- @app.post("/api/register")
- async def register_visitor(name: str = Form(...), image: UploadFile = File(...)):
- try:
- image_data = await image.read()
- embedding, error = process_image(image_data)
-
- if error:
- return {"message": error}
-
- # Converting embedding to numpy array if it isn't already
- embedding = np.array(embedding, dtype=np.float32)
-
- # Adding debug logging
- print(f"Registering face for {name}")
- print(f"Embedding shape: {embedding.shape}")
- print(f"Embedding type: {type(embedding)}")
-
- # Checking if face already exists
- existing_match = face_store.search_face(embedding)
- if existing_match:
- return {
- "message": "Visitor already exists",
- "name": existing_match[0]
- }
-
- # Registering new face
- face_store.add_face(name, embedding)
-
- # Verifying registration
- verification = face_store.search_face(embedding)
- if not verification:
- raise HTTPException(status_code=500, detail="Face registration failed verification")
-
- return {
- "message": "Visitor registered successfully",
- "name": name
- }
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
-
- @app.post("/api/search")
- async def search_visitor(image: UploadFile = File(...)):
- try:
- image_data = await image.read()
- start_time = time.time()
- print(f"API request started at: {time.strftime('%H:%M:%S')}")
-
- embedding, error = process_image(image_data)
-
- if error:
- return {"message": error}
-
- # Converting embedding to numpy array if it isn't already
- embedding = np.array(embedding, dtype=np.float32)
-
- # Adding debug logging
- print(f"Searching for face")
- print(f"Embedding shape: {embedding.shape}")
- print(f"Embedding type: {type(embedding)}")
-
- search_start = time.time()
- match = face_store.search_face(embedding)
- search_time = time.time() - search_start
- print(f"Face search took: {search_time:.4f} seconds")
-
- total_time = time.time() - start_time
- print(f"Total API processing time: {total_time:.4f} seconds")
-
- if match:
- name, confidence = match
- return {
- "message": "Visitor found",
- "name": name,
- "confidence": confidence,
- "search_time": search_time,
- "processing_time": total_time
- }
- return {
- "message": "Visitor not found",
- "processing_time": total_time
- }
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
|