You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

174 lines
5.6 KiB

  1. from contextlib import asynccontextmanager
  2. import io
  3. from typing import List
  4. import numpy as np
  5. from fastapi import FastAPI, File, Form, HTTPException, UploadFile
  6. from fastapi.middleware.cors import CORSMiddleware
  7. from PIL import Image
  8. from pydantic import BaseModel
  9. import insightface
  10. from insightface.app import FaceAnalysis
  11. from face_store import FaceStore
  12. import time
  13. # Initializing InsightFace model with better detection settings with robust detection
  14. face_analyzer = FaceAnalysis(
  15. providers=['CPUExecutionProvider'],
  16. allowed_modules=['detection', 'recognition']
  17. )
  18. face_analyzer.prepare(ctx_id=0, det_size=(640, 640))
  19. # Initializing face store
  20. face_store = FaceStore()
  21. @asynccontextmanager
  22. async def lifespan(app: FastAPI):
  23. print("Initializing face recognition system...")
  24. yield
  25. print("Cleaning up resources...")
  26. app = FastAPI(lifespan=lifespan)
  27. app.add_middleware(
  28. CORSMiddleware,
  29. allow_origins=["*"],
  30. allow_credentials=True,
  31. allow_methods=["*"],
  32. allow_headers=["*"],
  33. )
  34. class Visitor(BaseModel):
  35. name: str
  36. encoded_face: List[float]
  37. class Config:
  38. from_attributes = True
  39. def get_largest_face(faces):
  40. """Select the largest face from detected faces based on bounding box area."""
  41. if not faces:
  42. return None
  43. # Calculating areas of all faces
  44. areas = [(face, (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1]))
  45. for face in faces]
  46. # Returning face with largest area
  47. return max(areas, key=lambda x: x[1])[0]
  48. def process_image(image_data: bytes):
  49. """Process image and return embedding of the largest face."""
  50. try:
  51. image_stream = io.BytesIO(image_data)
  52. image_pil = Image.open(image_stream).convert("RGB")
  53. # Resizing image if too large (optional, adjust dimensions as needed)
  54. max_size = 1920
  55. if max(image_pil.size) > max_size:
  56. ratio = max_size / max(image_pil.size)
  57. new_size = tuple(int(dim * ratio) for dim in image_pil.size)
  58. image_pil = image_pil.resize(new_size, Image.Resampling.LANCZOS)
  59. image_np = np.array(image_pil)
  60. faces = face_analyzer.get(image_np)
  61. if not faces:
  62. return None, "No face detected"
  63. # Get the largest face
  64. largest_face = get_largest_face(faces)
  65. # Converting embedding to numpy array to ensure consistent format
  66. embedding = np.array(largest_face.embedding, dtype=np.float32)
  67. return embedding, None
  68. except Exception as e:
  69. return None, f"Error processing image: {str(e)}"
  70. @app.get("/")
  71. async def health_check():
  72. return {"message": "Face recognition API is running"}
  73. @app.post("/api/register")
  74. async def register_visitor(name: str = Form(...), image: UploadFile = File(...)):
  75. try:
  76. image_data = await image.read()
  77. embedding, error = process_image(image_data)
  78. if error:
  79. return {"message": error}
  80. # Converting embedding to numpy array if it isn't already
  81. embedding = np.array(embedding, dtype=np.float32)
  82. # Adding debug logging
  83. print(f"Registering face for {name}")
  84. print(f"Embedding shape: {embedding.shape}")
  85. print(f"Embedding type: {type(embedding)}")
  86. # Checking if face already exists
  87. existing_match = face_store.search_face(embedding)
  88. if existing_match:
  89. return {
  90. "message": "Visitor already exists",
  91. "name": existing_match[0]
  92. }
  93. # Registering new face
  94. face_store.add_face(name, embedding)
  95. # Verifying registration
  96. verification = face_store.search_face(embedding)
  97. if not verification:
  98. raise HTTPException(status_code=500, detail="Face registration failed verification")
  99. return {
  100. "message": "Visitor registered successfully",
  101. "name": name
  102. }
  103. except Exception as e:
  104. raise HTTPException(status_code=500, detail=str(e))
  105. @app.post("/api/search")
  106. async def search_visitor(image: UploadFile = File(...)):
  107. try:
  108. image_data = await image.read()
  109. start_time = time.time()
  110. print(f"API request started at: {time.strftime('%H:%M:%S')}")
  111. embedding, error = process_image(image_data)
  112. if error:
  113. return {"message": error}
  114. # Converting embedding to numpy array if it isn't already
  115. embedding = np.array(embedding, dtype=np.float32)
  116. # Adding debug logging
  117. print(f"Searching for face")
  118. print(f"Embedding shape: {embedding.shape}")
  119. print(f"Embedding type: {type(embedding)}")
  120. search_start = time.time()
  121. match = face_store.search_face(embedding)
  122. search_time = time.time() - search_start
  123. print(f"Face search took: {search_time:.4f} seconds")
  124. total_time = time.time() - start_time
  125. print(f"Total API processing time: {total_time:.4f} seconds")
  126. if match:
  127. name, confidence = match
  128. return {
  129. "message": "Visitor found",
  130. "name": name,
  131. "confidence": confidence,
  132. "search_time": search_time,
  133. "processing_time": total_time
  134. }
  135. return {
  136. "message": "Visitor not found",
  137. "processing_time": total_time
  138. }
  139. except Exception as e:
  140. raise HTTPException(status_code=500, detail=str(e))