|
|
@ -0,0 +1,161 @@ |
|
|
|
|
|
|
|
|
|
|
|
from contextlib import asynccontextmanager |
|
|
|
import io |
|
|
|
from typing import List |
|
|
|
import numpy as np |
|
|
|
from fastapi import FastAPI, File, Form, HTTPException, UploadFile |
|
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
|
from PIL import Image |
|
|
|
from pydantic import BaseModel |
|
|
|
import insightface |
|
|
|
from insightface.app import FaceAnalysis |
|
|
|
from face_store import FaceStore |
|
|
|
|
|
|
|
# Initializing InsightFace model with better detection settings with robust detection |
|
|
|
face_analyzer = FaceAnalysis( |
|
|
|
providers=['CPUExecutionProvider'], |
|
|
|
allowed_modules=['detection', 'recognition'] |
|
|
|
) |
|
|
|
face_analyzer.prepare(ctx_id=0, det_size=(640, 640)) |
|
|
|
|
|
|
|
# Initializing face store |
|
|
|
face_store = FaceStore() |
|
|
|
|
|
|
|
@asynccontextmanager |
|
|
|
async def lifespan(app: FastAPI): |
|
|
|
print("Initializing face recognition system...") |
|
|
|
yield |
|
|
|
print("Cleaning up resources...") |
|
|
|
|
|
|
|
app = FastAPI(lifespan=lifespan) |
|
|
|
|
|
|
|
app.add_middleware( |
|
|
|
CORSMiddleware, |
|
|
|
allow_origins=["*"], |
|
|
|
allow_credentials=True, |
|
|
|
allow_methods=["*"], |
|
|
|
allow_headers=["*"], |
|
|
|
) |
|
|
|
|
|
|
|
class Visitor(BaseModel): |
|
|
|
name: str |
|
|
|
encoded_face: List[float] |
|
|
|
|
|
|
|
class Config: |
|
|
|
from_attributes = True |
|
|
|
|
|
|
|
def get_largest_face(faces): |
|
|
|
"""Select the largest face from detected faces based on bounding box area.""" |
|
|
|
if not faces: |
|
|
|
return None |
|
|
|
|
|
|
|
# Calculating areas of all faces |
|
|
|
areas = [(face, (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1])) |
|
|
|
for face in faces] |
|
|
|
# Returning face with largest area |
|
|
|
return max(areas, key=lambda x: x[1])[0] |
|
|
|
|
|
|
|
def process_image(image_data: bytes): |
|
|
|
"""Process image and return embedding of the largest face.""" |
|
|
|
try: |
|
|
|
image_stream = io.BytesIO(image_data) |
|
|
|
image_pil = Image.open(image_stream).convert("RGB") |
|
|
|
|
|
|
|
# Resizing image if too large (optional, adjust dimensions as needed) |
|
|
|
max_size = 1920 |
|
|
|
if max(image_pil.size) > max_size: |
|
|
|
ratio = max_size / max(image_pil.size) |
|
|
|
new_size = tuple(int(dim * ratio) for dim in image_pil.size) |
|
|
|
image_pil = image_pil.resize(new_size, Image.Resampling.LANCZOS) |
|
|
|
|
|
|
|
image_np = np.array(image_pil) |
|
|
|
faces = face_analyzer.get(image_np) |
|
|
|
|
|
|
|
if not faces: |
|
|
|
return None, "No face detected" |
|
|
|
|
|
|
|
# Get the largest face |
|
|
|
largest_face = get_largest_face(faces) |
|
|
|
|
|
|
|
# Converting embedding to numpy array to ensure consistent format |
|
|
|
embedding = np.array(largest_face.embedding, dtype=np.float32) |
|
|
|
return embedding, None |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
return None, f"Error processing image: {str(e)}" |
|
|
|
|
|
|
|
@app.get("/") |
|
|
|
async def health_check(): |
|
|
|
return {"message": "Face recognition API is running"} |
|
|
|
|
|
|
|
@app.post("/api/register") |
|
|
|
async def register_visitor(name: str = Form(...), image: UploadFile = File(...)): |
|
|
|
try: |
|
|
|
image_data = await image.read() |
|
|
|
embedding, error = process_image(image_data) |
|
|
|
|
|
|
|
if error: |
|
|
|
return {"message": error} |
|
|
|
|
|
|
|
# Converting embedding to numpy array if it isn't already |
|
|
|
embedding = np.array(embedding, dtype=np.float32) |
|
|
|
|
|
|
|
# Adding debug logging |
|
|
|
print(f"Registering face for {name}") |
|
|
|
print(f"Embedding shape: {embedding.shape}") |
|
|
|
print(f"Embedding type: {type(embedding)}") |
|
|
|
|
|
|
|
# Checking if face already exists |
|
|
|
existing_match = face_store.search_face(embedding) |
|
|
|
if existing_match: |
|
|
|
return { |
|
|
|
"message": "Visitor already exists", |
|
|
|
"name": existing_match[0] |
|
|
|
} |
|
|
|
|
|
|
|
# Registering new face |
|
|
|
face_store.add_face(name, embedding) |
|
|
|
|
|
|
|
# Verifying registration |
|
|
|
verification = face_store.search_face(embedding) |
|
|
|
if not verification: |
|
|
|
raise HTTPException(status_code=500, detail="Face registration failed verification") |
|
|
|
|
|
|
|
return { |
|
|
|
"message": "Visitor registered successfully", |
|
|
|
"name": name |
|
|
|
} |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
@app.post("/api/search") |
|
|
|
async def search_visitor(image: UploadFile = File(...)): |
|
|
|
try: |
|
|
|
image_data = await image.read() |
|
|
|
embedding, error = process_image(image_data) |
|
|
|
|
|
|
|
if error: |
|
|
|
return {"message": error} |
|
|
|
|
|
|
|
# Converting embedding to numpy array if it isn't already |
|
|
|
embedding = np.array(embedding, dtype=np.float32) |
|
|
|
|
|
|
|
# Adding debug logging |
|
|
|
print(f"Searching for face") |
|
|
|
print(f"Embedding shape: {embedding.shape}") |
|
|
|
print(f"Embedding type: {type(embedding)}") |
|
|
|
|
|
|
|
match = face_store.search_face(embedding) |
|
|
|
if match: |
|
|
|
name, confidence = match |
|
|
|
return { |
|
|
|
"message": "Visitor found", |
|
|
|
"name": name, |
|
|
|
"confidence": confidence |
|
|
|
} |
|
|
|
return {"message": "Visitor not found"} |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
raise HTTPException(status_code=500, detail=str(e)) |