from contextlib import asynccontextmanager
|
|
import io
|
|
from typing import List
|
|
import numpy as np
|
|
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from PIL import Image
|
|
from pydantic import BaseModel
|
|
import insightface
|
|
from insightface.app import FaceAnalysis
|
|
from face_store import FaceStore
|
|
import time
|
|
|
|
# Initializing InsightFace model with better detection settings with robust detection
|
|
face_analyzer = FaceAnalysis(
|
|
providers=['CPUExecutionProvider'],
|
|
allowed_modules=['detection', 'recognition']
|
|
)
|
|
face_analyzer.prepare(ctx_id=0, det_size=(640, 640))
|
|
|
|
# Initializing face store
|
|
face_store = FaceStore()
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
print("Initializing face recognition system...")
|
|
yield
|
|
print("Cleaning up resources...")
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
class Visitor(BaseModel):
|
|
name: str
|
|
encoded_face: List[float]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
def get_largest_face(faces):
|
|
"""Select the largest face from detected faces based on bounding box area."""
|
|
if not faces:
|
|
return None
|
|
|
|
# Calculating areas of all faces
|
|
areas = [(face, (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1]))
|
|
for face in faces]
|
|
# Returning face with largest area
|
|
return max(areas, key=lambda x: x[1])[0]
|
|
|
|
def process_image(image_data: bytes):
|
|
"""Process image and return embedding of the largest face."""
|
|
try:
|
|
image_stream = io.BytesIO(image_data)
|
|
image_pil = Image.open(image_stream).convert("RGB")
|
|
|
|
# Resizing image if too large (optional, adjust dimensions as needed)
|
|
max_size = 1920
|
|
if max(image_pil.size) > max_size:
|
|
ratio = max_size / max(image_pil.size)
|
|
new_size = tuple(int(dim * ratio) for dim in image_pil.size)
|
|
image_pil = image_pil.resize(new_size, Image.Resampling.LANCZOS)
|
|
|
|
image_np = np.array(image_pil)
|
|
faces = face_analyzer.get(image_np)
|
|
|
|
if not faces:
|
|
return None, "No face detected"
|
|
|
|
# Get the largest face
|
|
largest_face = get_largest_face(faces)
|
|
|
|
# Converting embedding to numpy array to ensure consistent format
|
|
embedding = np.array(largest_face.embedding, dtype=np.float32)
|
|
return embedding, None
|
|
|
|
except Exception as e:
|
|
return None, f"Error processing image: {str(e)}"
|
|
|
|
@app.get("/")
|
|
async def health_check():
|
|
return {"message": "Face recognition API is running"}
|
|
|
|
@app.post("/api/register")
|
|
async def register_visitor(name: str = Form(...), image: UploadFile = File(...)):
|
|
try:
|
|
image_data = await image.read()
|
|
embedding, error = process_image(image_data)
|
|
|
|
if error:
|
|
return {"message": error}
|
|
|
|
# Converting embedding to numpy array if it isn't already
|
|
embedding = np.array(embedding, dtype=np.float32)
|
|
|
|
# Adding debug logging
|
|
print(f"Registering face for {name}")
|
|
print(f"Embedding shape: {embedding.shape}")
|
|
print(f"Embedding type: {type(embedding)}")
|
|
|
|
# Checking if face already exists
|
|
existing_match = face_store.search_face(embedding)
|
|
if existing_match:
|
|
return {
|
|
"message": "Visitor already exists",
|
|
"name": existing_match[0]
|
|
}
|
|
|
|
# Registering new face
|
|
face_store.add_face(name, embedding)
|
|
|
|
# Verifying registration
|
|
verification = face_store.search_face(embedding)
|
|
if not verification:
|
|
raise HTTPException(status_code=500, detail="Face registration failed verification")
|
|
|
|
return {
|
|
"message": "Visitor registered successfully",
|
|
"name": name
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/api/search")
|
|
async def search_visitor(image: UploadFile = File(...)):
|
|
try:
|
|
image_data = await image.read()
|
|
start_time = time.time()
|
|
print(f"API request started at: {time.strftime('%H:%M:%S')}")
|
|
|
|
embedding, error = process_image(image_data)
|
|
|
|
if error:
|
|
return {"message": error}
|
|
|
|
# Converting embedding to numpy array if it isn't already
|
|
embedding = np.array(embedding, dtype=np.float32)
|
|
|
|
# Adding debug logging
|
|
print(f"Searching for face")
|
|
print(f"Embedding shape: {embedding.shape}")
|
|
print(f"Embedding type: {type(embedding)}")
|
|
|
|
search_start = time.time()
|
|
match = face_store.search_face(embedding)
|
|
search_time = time.time() - search_start
|
|
print(f"Face search took: {search_time:.4f} seconds")
|
|
|
|
total_time = time.time() - start_time
|
|
print(f"Total API processing time: {total_time:.4f} seconds")
|
|
|
|
if match:
|
|
name, confidence = match
|
|
return {
|
|
"message": "Visitor found",
|
|
"name": name,
|
|
"confidence": confidence,
|
|
"search_time": search_time,
|
|
"processing_time": total_time
|
|
}
|
|
return {
|
|
"message": "Visitor not found",
|
|
"processing_time": total_time
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|