diff --git a/face_store.py b/face_store.py new file mode 100644 index 0000000..5588996 --- /dev/null +++ b/face_store.py @@ -0,0 +1,68 @@ +import faiss +import numpy as np +import pickle +from pathlib import Path +from typing import List, Optional, Tuple + +class FaceStore: + def __init__(self, dimension: int = 512): # 512 for ArcFace + self.dimension = dimension + # Use cosine similarity instead of L2 distance + self.index = faiss.IndexFlatIP(dimension) # Inner Product = Cosine similarity for normalized vectors + self.face_data = [] + self.store_path = Path("face_store.pkl") + self.index_path = Path("face_index.faiss") + self.load_if_exists() + + def load_if_exists(self): + if self.store_path.exists() and self.index_path.exists(): + # Load face data + with open(self.store_path, 'rb') as f: + self.face_data = pickle.load(f) + # Load FAISS index + self.index = faiss.read_index(str(self.index_path)) + + def save(self): + # Save face data + with open(self.store_path, 'wb') as f: + pickle.dump(self.face_data, f) + # Save FAISS index + faiss.write_index(self.index, str(self.index_path)) + + def normalize_embedding(self, embedding: np.ndarray) -> np.ndarray: + """L2 normalize the embedding""" + embedding = embedding.astype(np.float32) + # Reshape to 2D if needed + if embedding.ndim == 1: + embedding = embedding.reshape(1, -1) + # L2 normalize + faiss.normalize_L2(embedding) + return embedding + + def add_face(self, name: str, embedding: np.ndarray) -> None: + # Normalizing the embedding before adding + normalized_embedding = self.normalize_embedding(embedding) + self.face_data.append({"name": name, "embedding": normalized_embedding.flatten()}) + self.index.add(normalized_embedding) + self.save() + print(f"Added face for {name}. Total faces: {self.index.ntotal}") + + def search_face(self, embedding: np.ndarray, threshold: float = 0.5) -> Optional[Tuple[str, float]]: + if self.index.ntotal == 0: + return None + + # Normalizing the query embedding + normalized_embedding = self.normalize_embedding(embedding) + + # Searching using cosine similarity + similarities, indices = self.index.search(normalized_embedding, 1) + similarity = similarities[0][0] + + print(f"Best match similarity: {similarity}, threshold: {threshold}") + + # For cosine similarity, higher is better and max is 1.0 so we can optimize and keep on checking + if similarity > threshold: + matched_face = self.face_data[indices[0][0]] + # Similarity is already between 0 and 1 for cosine + return matched_face["name"], float(similarity) + return None \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..ba7c809 --- /dev/null +++ b/main.py @@ -0,0 +1,161 @@ + + +from contextlib import asynccontextmanager +import io +from typing import List +import numpy as np +from fastapi import FastAPI, File, Form, HTTPException, UploadFile +from fastapi.middleware.cors import CORSMiddleware +from PIL import Image +from pydantic import BaseModel +import insightface +from insightface.app import FaceAnalysis +from face_store import FaceStore + +# Initializing InsightFace model with better detection settings with robust detection +face_analyzer = FaceAnalysis( + providers=['CPUExecutionProvider'], + allowed_modules=['detection', 'recognition'] +) +face_analyzer.prepare(ctx_id=0, det_size=(640, 640)) + +# Initializing face store +face_store = FaceStore() + +@asynccontextmanager +async def lifespan(app: FastAPI): + print("Initializing face recognition system...") + yield + print("Cleaning up resources...") + +app = FastAPI(lifespan=lifespan) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +class Visitor(BaseModel): + name: str + encoded_face: List[float] + + class Config: + from_attributes = True + +def get_largest_face(faces): + """Select the largest face from detected faces based on bounding box area.""" + if not faces: + return None + + # Calculating areas of all faces + areas = [(face, (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1])) + for face in faces] + # Returning face with largest area + return max(areas, key=lambda x: x[1])[0] + +def process_image(image_data: bytes): + """Process image and return embedding of the largest face.""" + try: + image_stream = io.BytesIO(image_data) + image_pil = Image.open(image_stream).convert("RGB") + + # Resizing image if too large (optional, adjust dimensions as needed) + max_size = 1920 + if max(image_pil.size) > max_size: + ratio = max_size / max(image_pil.size) + new_size = tuple(int(dim * ratio) for dim in image_pil.size) + image_pil = image_pil.resize(new_size, Image.Resampling.LANCZOS) + + image_np = np.array(image_pil) + faces = face_analyzer.get(image_np) + + if not faces: + return None, "No face detected" + + # Get the largest face + largest_face = get_largest_face(faces) + + # Converting embedding to numpy array to ensure consistent format + embedding = np.array(largest_face.embedding, dtype=np.float32) + return embedding, None + + except Exception as e: + return None, f"Error processing image: {str(e)}" + +@app.get("/") +async def health_check(): + return {"message": "Face recognition API is running"} + +@app.post("/api/register") +async def register_visitor(name: str = Form(...), image: UploadFile = File(...)): + try: + image_data = await image.read() + embedding, error = process_image(image_data) + + if error: + return {"message": error} + + # Converting embedding to numpy array if it isn't already + embedding = np.array(embedding, dtype=np.float32) + + # Adding debug logging + print(f"Registering face for {name}") + print(f"Embedding shape: {embedding.shape}") + print(f"Embedding type: {type(embedding)}") + + # Checking if face already exists + existing_match = face_store.search_face(embedding) + if existing_match: + return { + "message": "Visitor already exists", + "name": existing_match[0] + } + + # Registering new face + face_store.add_face(name, embedding) + + # Verifying registration + verification = face_store.search_face(embedding) + if not verification: + raise HTTPException(status_code=500, detail="Face registration failed verification") + + return { + "message": "Visitor registered successfully", + "name": name + } + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/search") +async def search_visitor(image: UploadFile = File(...)): + try: + image_data = await image.read() + embedding, error = process_image(image_data) + + if error: + return {"message": error} + + # Converting embedding to numpy array if it isn't already + embedding = np.array(embedding, dtype=np.float32) + + # Adding debug logging + print(f"Searching for face") + print(f"Embedding shape: {embedding.shape}") + print(f"Embedding type: {type(embedding)}") + + match = face_store.search_face(embedding) + if match: + name, confidence = match + return { + "message": "Visitor found", + "name": name, + "confidence": confidence + } + return {"message": "Visitor not found"} + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..90e890c --- /dev/null +++ b/readme.md @@ -0,0 +1,11 @@ + +# python version>= 3.11 +# venv setup +1. python -m venv venv + +2. venv/Scripts/activate + +# 1 +pip install -r requirements.txt +# 2 +uvicorn main:app --reload \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4f4cea9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,28 @@ +# Web Framework +fastapi==0.109.2 +uvicorn==0.27.1 +python-multipart==0.0.7 +pydantic==2.6.1 +python-dotenv==1.0.1 + +# Image Processing +Pillow==10.2.0 + +# Machine Learning & Vector Search +numpy==1.26.3 +faiss-cpu==1.7.4 +insightface==0.7.3 +onnxruntime==1.16.3 + +# Optional - for production +gunicorn==21.2.0 +python-jose==3.3.0 +passlib==1.7.4 +bcrypt==4.1.2 + +# Database +sqlalchemy==2.0.25 +aiosqlite==0.19.0 + +# CORS +starlette==0.36.3 \ No newline at end of file