From 19f1d64ce84a852479b705ad285267552ba28293 Mon Sep 17 00:00:00 2001 From: Suman991 Date: Fri, 17 Apr 2026 17:31:23 +0530 Subject: [PATCH] added sse for conversations --- package-lock.json | 63 +++++++++++++++++ package.json | 3 + src/auth/guards/jwt-auth.guard.ts | 5 ++ src/auth/strategies/jwt.strategy.ts | 17 +++++ src/conversations/conversations.controller.ts | 12 ++-- src/conversations/conversations.service.ts | 68 +++++++++++++++---- 6 files changed, 149 insertions(+), 19 deletions(-) create mode 100644 src/auth/guards/jwt-auth.guard.ts create mode 100644 src/auth/strategies/jwt.strategy.ts diff --git a/package-lock.json b/package-lock.json index 5eb98a6..e87ca5d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "@nestjs/jwt": "^11.0.2", "@nestjs/mapped-types": "*", "@nestjs/mongoose": "^11.0.4", + "@nestjs/passport": "^11.0.5", "@nestjs/platform-express": "^11.0.1", "@nestjs/platform-socket.io": "^11.1.19", "@nestjs/swagger": "^11.3.0", @@ -22,6 +23,8 @@ "class-transformer": "^0.5.1", "class-validator": "^0.15.1", "mongoose": "^9.4.1", + "passport": "^0.7.0", + "passport-jwt": "^4.0.1", "reflect-metadata": "^0.2.2", "rxjs": "^7.8.1", "socket.io": "^4.8.3" @@ -2428,6 +2431,16 @@ "rxjs": "^7.0.0" } }, + "node_modules/@nestjs/passport": { + "version": "11.0.5", + "resolved": "https://registry.npmjs.org/@nestjs/passport/-/passport-11.0.5.tgz", + "integrity": "sha512-ulQX6mbjlws92PIM15Naes4F4p2JoxGnIJuUsdXQPT+Oo2sqQmENEZXM7eYuimocfHnKlcfZOuyzbA33LwUlOQ==", + "license": "MIT", + "peerDependencies": { + "@nestjs/common": "^10.0.0 || ^11.0.0", + "passport": "^0.5.0 || ^0.6.0 || ^0.7.0" + } + }, "node_modules/@nestjs/platform-express": { "version": "11.1.19", "resolved": "https://registry.npmjs.org/@nestjs/platform-express/-/platform-express-11.1.19.tgz", @@ -8460,6 +8473,42 @@ "node": ">= 0.8" } }, + "node_modules/passport": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/passport/-/passport-0.7.0.tgz", + "integrity": "sha512-cPLl+qZpSc+ireUvt+IzqbED1cHHkDoVYMo30jbJIdOOjQ1MQYZBPiNvmi8UM6lJuOpTPXJGZQk0DtC4y61MYQ==", + "license": "MIT", + "dependencies": { + "passport-strategy": "1.x.x", + "pause": "0.0.1", + "utils-merge": "^1.0.1" + }, + "engines": { + "node": ">= 0.4.0" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/jaredhanson" + } + }, + "node_modules/passport-jwt": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/passport-jwt/-/passport-jwt-4.0.1.tgz", + "integrity": "sha512-UCKMDYhNuGOBE9/9Ycuoyh7vP6jpeTp/+sfMJl7nLff/t6dps+iaeE0hhNkKN8/HZHcJ7lCdOyDxHdDoxoSvdQ==", + "license": "MIT", + "dependencies": { + "jsonwebtoken": "^9.0.0", + "passport-strategy": "^1.0.0" + } + }, + "node_modules/passport-strategy": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/passport-strategy/-/passport-strategy-1.0.0.tgz", + "integrity": "sha512-CB97UUvDKJde2V0KDWWB3lyf6PC3FaZP7YxZ2G8OAtn9p4HI9j9JLP9qjOGZFvyl8uwNT8qM+hGnz/n16NI7oA==", + "engines": { + "node": ">= 0.4.0" + } + }, "node_modules/path-exists": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/path-exists/-/path-exists-4.0.0.tgz", @@ -8537,6 +8586,11 @@ "node": ">=8" } }, + "node_modules/pause": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/pause/-/pause-0.0.1.tgz", + "integrity": "sha512-KG8UEiEVkR3wGEb4m5yZkVCzigAD+cVEJck2CzYZO37ZGJfctvVptVO192MwrtPhzONn6go8ylnOdMhKqi4nfg==" + }, "node_modules/picocolors": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/picocolors/-/picocolors-1.1.1.tgz", @@ -10271,6 +10325,15 @@ "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", "license": "MIT" }, + "node_modules/utils-merge": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/utils-merge/-/utils-merge-1.0.1.tgz", + "integrity": "sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==", + "license": "MIT", + "engines": { + "node": ">= 0.4.0" + } + }, "node_modules/v8-compile-cache-lib": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", diff --git a/package.json b/package.json index 86ed12c..a0028c5 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "@nestjs/jwt": "^11.0.2", "@nestjs/mapped-types": "*", "@nestjs/mongoose": "^11.0.4", + "@nestjs/passport": "^11.0.5", "@nestjs/platform-express": "^11.0.1", "@nestjs/platform-socket.io": "^11.1.19", "@nestjs/swagger": "^11.3.0", @@ -33,6 +34,8 @@ "class-transformer": "^0.5.1", "class-validator": "^0.15.1", "mongoose": "^9.4.1", + "passport": "^0.7.0", + "passport-jwt": "^4.0.1", "reflect-metadata": "^0.2.2", "rxjs": "^7.8.1", "socket.io": "^4.8.3" diff --git a/src/auth/guards/jwt-auth.guard.ts b/src/auth/guards/jwt-auth.guard.ts new file mode 100644 index 0000000..255f967 --- /dev/null +++ b/src/auth/guards/jwt-auth.guard.ts @@ -0,0 +1,5 @@ +import { Injectable } from "@nestjs/common"; +import { AuthGuard } from "@nestjs/passport"; + +@Injectable() +export class JwtAuthGuard extends AuthGuard('jwt'){} \ No newline at end of file diff --git a/src/auth/strategies/jwt.strategy.ts b/src/auth/strategies/jwt.strategy.ts new file mode 100644 index 0000000..bbb0c0f --- /dev/null +++ b/src/auth/strategies/jwt.strategy.ts @@ -0,0 +1,17 @@ +import { Injectable } from '@nestjs/common'; +import { PassportStrategy } from '@nestjs/passport'; +import { ExtractJwt, Strategy } from 'passport-jwt'; + +@Injectable() +export class JwtStrategy extends PassportStrategy(Strategy) { + constructor() { + super({ + jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken(), + secretOrKey: process.env.JWT_SECRET, + }); + } + + validate(payload: any) { + return { sub: payload.sub, username: payload.username }; // attached to req.user + } +} diff --git a/src/conversations/conversations.controller.ts b/src/conversations/conversations.controller.ts index 6784eb4..f4d5493 100644 --- a/src/conversations/conversations.controller.ts +++ b/src/conversations/conversations.controller.ts @@ -5,13 +5,13 @@ import { Body, Patch, Param, - Delete, + Sse, + Query, } from '@nestjs/common'; import { ConversationsService } from './conversations.service'; import { CreateConversationDto } from './dto/create-conversation.dto'; -import { UpdateConversationDto } from './dto/update-conversation.dto'; import { ApiBody, ApiOperation, ApiParam, ApiResponse } from '@nestjs/swagger'; -import path from 'path'; +import { Observable } from 'rxjs/internal/Observable'; @Controller('conversations') export class ConversationsController { @@ -23,6 +23,11 @@ export class ConversationsController { return await this.conversationsService.create(createConversationDto); } + @Sse('stream') + stream(@Query('userId') userId: string):Observable { + return this.conversationsService.createSseStream(userId); + } + @ApiOperation({ summary: 'Get all conversations where the user is in' }) @Get(':userId') async findAll(@Param('userId') userId: string) { @@ -88,5 +93,4 @@ export class ConversationsController { ) { return this.conversationsService.removeParticipant(convId, userId); } - } diff --git a/src/conversations/conversations.service.ts b/src/conversations/conversations.service.ts index 0171681..775c6de 100644 --- a/src/conversations/conversations.service.ts +++ b/src/conversations/conversations.service.ts @@ -4,7 +4,6 @@ import { NotFoundException, } from '@nestjs/common'; import { CreateConversationDto } from './dto/create-conversation.dto'; -import { UpdateConversationDto } from './dto/update-conversation.dto'; import { Conversation, ConversationDocument, @@ -13,6 +12,7 @@ import { import { InjectModel } from '@nestjs/mongoose'; import { Model } from 'mongoose'; import { UserDocument } from 'src/users/schemas/user.schema'; +import { Observable, Subject } from 'rxjs'; type PopulatedConversation = Omit & { participants: UserDocument[]; @@ -24,6 +24,30 @@ export class ConversationsService { @InjectModel(Conversation.name) private conversationModel: Model, ) {} + + private sseClients = new Map>(); + + createSseStream(userId: string): Observable { + const subject = new Subject(); + this.sseClients.set(userId, subject); + + // cleanup when client disconnects + return new Observable((observer) => { + const sub = subject.subscribe(observer); + return () => { + sub.unsubscribe(); + this.sseClients.delete(userId); + }; + }); + } + + pushToUser(userId: string, event: string, data: object) { + const subject = this.sseClients.get(userId); + if (!subject) return; + + subject.next({ type: event, data } as MessageEvent); + } + async create( createConversationDto: CreateConversationDto, ): Promise { @@ -60,13 +84,20 @@ export class ConversationsService { throw new ConflictException('Can not add users to P2P chat'); } - return await this.conversationModel.findByIdAndUpdate( - convId, - { - $addToSet: { participants: userId }, // prevent duplicate addition - }, - { new: true }, - ).populate<{ participants: UserDocument[] }>('participants', 'name'); + const updated = await this.conversationModel + .findByIdAndUpdate( + convId, + { + $addToSet: { participants: userId }, // prevent duplicate addition + }, + { new: true }, + ) + .populate<{ participants: UserDocument[] }>('participants', 'name'); + + // push to target user if online + this.pushToUser(userId, 'groupInvite', { conversation: updated }); + + return updated; } async removeParticipant( @@ -81,12 +112,19 @@ export class ConversationsService { throw new ConflictException('Can not remove users from P2P chat'); } - return await this.conversationModel.findByIdAndUpdate( - conversationId, - { - $pull: { participants: userId }, - }, - { new: true }, - ).populate<{ participants: UserDocument[] }>('participants', 'name');; + const updated = await this.conversationModel + .findByIdAndUpdate( + conversationId, + { + $pull: { participants: userId }, + }, + { new: true }, + ) + .populate<{ participants: UserDocument[] }>('participants', 'name'); + + // push to removed user if online + this.pushToUser(userId, 'groupRemoved', { roomId: conversationId }); + + return updated; } }