added sse for conversations

This commit is contained in:
Suman991 2026-04-17 17:31:23 +05:30
parent f65f2676d2
commit 19f1d64ce8
6 changed files with 149 additions and 19 deletions

63
package-lock.json generated
View File

@ -15,6 +15,7 @@
"@nestjs/jwt": "^11.0.2", "@nestjs/jwt": "^11.0.2",
"@nestjs/mapped-types": "*", "@nestjs/mapped-types": "*",
"@nestjs/mongoose": "^11.0.4", "@nestjs/mongoose": "^11.0.4",
"@nestjs/passport": "^11.0.5",
"@nestjs/platform-express": "^11.0.1", "@nestjs/platform-express": "^11.0.1",
"@nestjs/platform-socket.io": "^11.1.19", "@nestjs/platform-socket.io": "^11.1.19",
"@nestjs/swagger": "^11.3.0", "@nestjs/swagger": "^11.3.0",
@ -22,6 +23,8 @@
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"class-validator": "^0.15.1", "class-validator": "^0.15.1",
"mongoose": "^9.4.1", "mongoose": "^9.4.1",
"passport": "^0.7.0",
"passport-jwt": "^4.0.1",
"reflect-metadata": "^0.2.2", "reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1", "rxjs": "^7.8.1",
"socket.io": "^4.8.3" "socket.io": "^4.8.3"
@ -2428,6 +2431,16 @@
"rxjs": "^7.0.0" "rxjs": "^7.0.0"
} }
}, },
"node_modules/@nestjs/passport": {
"version": "11.0.5",
"resolved": "https://registry.npmjs.org/@nestjs/passport/-/passport-11.0.5.tgz",
"integrity": "sha512-ulQX6mbjlws92PIM15Naes4F4p2JoxGnIJuUsdXQPT+Oo2sqQmENEZXM7eYuimocfHnKlcfZOuyzbA33LwUlOQ==",
"license": "MIT",
"peerDependencies": {
"@nestjs/common": "^10.0.0 || ^11.0.0",
"passport": "^0.5.0 || ^0.6.0 || ^0.7.0"
}
},
"node_modules/@nestjs/platform-express": { "node_modules/@nestjs/platform-express": {
"version": "11.1.19", "version": "11.1.19",
"resolved": "https://registry.npmjs.org/@nestjs/platform-express/-/platform-express-11.1.19.tgz", "resolved": "https://registry.npmjs.org/@nestjs/platform-express/-/platform-express-11.1.19.tgz",
@ -8460,6 +8473,42 @@
"node": ">= 0.8" "node": ">= 0.8"
} }
}, },
"node_modules/passport": {
"version": "0.7.0",
"resolved": "https://registry.npmjs.org/passport/-/passport-0.7.0.tgz",
"integrity": "sha512-cPLl+qZpSc+ireUvt+IzqbED1cHHkDoVYMo30jbJIdOOjQ1MQYZBPiNvmi8UM6lJuOpTPXJGZQk0DtC4y61MYQ==",
"license": "MIT",
"dependencies": {
"passport-strategy": "1.x.x",
"pause": "0.0.1",
"utils-merge": "^1.0.1"
},
"engines": {
"node": ">= 0.4.0"
},
"funding": {
"type": "github",
"url": "https://github.com/sponsors/jaredhanson"
}
},
"node_modules/passport-jwt": {
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/passport-jwt/-/passport-jwt-4.0.1.tgz",
"integrity": "sha512-UCKMDYhNuGOBE9/9Ycuoyh7vP6jpeTp/+sfMJl7nLff/t6dps+iaeE0hhNkKN8/HZHcJ7lCdOyDxHdDoxoSvdQ==",
"license": "MIT",
"dependencies": {
"jsonwebtoken": "^9.0.0",
"passport-strategy": "^1.0.0"
}
},
"node_modules/passport-strategy": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/passport-strategy/-/passport-strategy-1.0.0.tgz",
"integrity": "sha512-CB97UUvDKJde2V0KDWWB3lyf6PC3FaZP7YxZ2G8OAtn9p4HI9j9JLP9qjOGZFvyl8uwNT8qM+hGnz/n16NI7oA==",
"engines": {
"node": ">= 0.4.0"
}
},
"node_modules/path-exists": { "node_modules/path-exists": {
"version": "4.0.0", "version": "4.0.0",
"resolved": "https://registry.npmjs.org/path-exists/-/path-exists-4.0.0.tgz", "resolved": "https://registry.npmjs.org/path-exists/-/path-exists-4.0.0.tgz",
@ -8537,6 +8586,11 @@
"node": ">=8" "node": ">=8"
} }
}, },
"node_modules/pause": {
"version": "0.0.1",
"resolved": "https://registry.npmjs.org/pause/-/pause-0.0.1.tgz",
"integrity": "sha512-KG8UEiEVkR3wGEb4m5yZkVCzigAD+cVEJck2CzYZO37ZGJfctvVptVO192MwrtPhzONn6go8ylnOdMhKqi4nfg=="
},
"node_modules/picocolors": { "node_modules/picocolors": {
"version": "1.1.1", "version": "1.1.1",
"resolved": "https://registry.npmjs.org/picocolors/-/picocolors-1.1.1.tgz", "resolved": "https://registry.npmjs.org/picocolors/-/picocolors-1.1.1.tgz",
@ -10271,6 +10325,15 @@
"integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==",
"license": "MIT" "license": "MIT"
}, },
"node_modules/utils-merge": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/utils-merge/-/utils-merge-1.0.1.tgz",
"integrity": "sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==",
"license": "MIT",
"engines": {
"node": ">= 0.4.0"
}
},
"node_modules/v8-compile-cache-lib": { "node_modules/v8-compile-cache-lib": {
"version": "3.0.1", "version": "3.0.1",
"resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz",

View File

@ -26,6 +26,7 @@
"@nestjs/jwt": "^11.0.2", "@nestjs/jwt": "^11.0.2",
"@nestjs/mapped-types": "*", "@nestjs/mapped-types": "*",
"@nestjs/mongoose": "^11.0.4", "@nestjs/mongoose": "^11.0.4",
"@nestjs/passport": "^11.0.5",
"@nestjs/platform-express": "^11.0.1", "@nestjs/platform-express": "^11.0.1",
"@nestjs/platform-socket.io": "^11.1.19", "@nestjs/platform-socket.io": "^11.1.19",
"@nestjs/swagger": "^11.3.0", "@nestjs/swagger": "^11.3.0",
@ -33,6 +34,8 @@
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"class-validator": "^0.15.1", "class-validator": "^0.15.1",
"mongoose": "^9.4.1", "mongoose": "^9.4.1",
"passport": "^0.7.0",
"passport-jwt": "^4.0.1",
"reflect-metadata": "^0.2.2", "reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1", "rxjs": "^7.8.1",
"socket.io": "^4.8.3" "socket.io": "^4.8.3"

View File

@ -0,0 +1,5 @@
import { Injectable } from "@nestjs/common";
import { AuthGuard } from "@nestjs/passport";
@Injectable()
export class JwtAuthGuard extends AuthGuard('jwt'){}

View File

@ -0,0 +1,17 @@
import { Injectable } from '@nestjs/common';
import { PassportStrategy } from '@nestjs/passport';
import { ExtractJwt, Strategy } from 'passport-jwt';
@Injectable()
export class JwtStrategy extends PassportStrategy(Strategy) {
constructor() {
super({
jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken(),
secretOrKey: process.env.JWT_SECRET,
});
}
validate(payload: any) {
return { sub: payload.sub, username: payload.username }; // attached to req.user
}
}

View File

@ -5,13 +5,13 @@ import {
Body, Body,
Patch, Patch,
Param, Param,
Delete, Sse,
Query,
} from '@nestjs/common'; } from '@nestjs/common';
import { ConversationsService } from './conversations.service'; import { ConversationsService } from './conversations.service';
import { CreateConversationDto } from './dto/create-conversation.dto'; import { CreateConversationDto } from './dto/create-conversation.dto';
import { UpdateConversationDto } from './dto/update-conversation.dto';
import { ApiBody, ApiOperation, ApiParam, ApiResponse } from '@nestjs/swagger'; import { ApiBody, ApiOperation, ApiParam, ApiResponse } from '@nestjs/swagger';
import path from 'path'; import { Observable } from 'rxjs/internal/Observable';
@Controller('conversations') @Controller('conversations')
export class ConversationsController { export class ConversationsController {
@ -23,6 +23,11 @@ export class ConversationsController {
return await this.conversationsService.create(createConversationDto); return await this.conversationsService.create(createConversationDto);
} }
@Sse('stream')
stream(@Query('userId') userId: string):Observable<MessageEvent> {
return this.conversationsService.createSseStream(userId);
}
@ApiOperation({ summary: 'Get all conversations where the user is in' }) @ApiOperation({ summary: 'Get all conversations where the user is in' })
@Get(':userId') @Get(':userId')
async findAll(@Param('userId') userId: string) { async findAll(@Param('userId') userId: string) {
@ -88,5 +93,4 @@ export class ConversationsController {
) { ) {
return this.conversationsService.removeParticipant(convId, userId); return this.conversationsService.removeParticipant(convId, userId);
} }
} }

View File

@ -4,7 +4,6 @@ import {
NotFoundException, NotFoundException,
} from '@nestjs/common'; } from '@nestjs/common';
import { CreateConversationDto } from './dto/create-conversation.dto'; import { CreateConversationDto } from './dto/create-conversation.dto';
import { UpdateConversationDto } from './dto/update-conversation.dto';
import { import {
Conversation, Conversation,
ConversationDocument, ConversationDocument,
@ -13,6 +12,7 @@ import {
import { InjectModel } from '@nestjs/mongoose'; import { InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose'; import { Model } from 'mongoose';
import { UserDocument } from 'src/users/schemas/user.schema'; import { UserDocument } from 'src/users/schemas/user.schema';
import { Observable, Subject } from 'rxjs';
type PopulatedConversation = Omit<ConversationDocument, 'participants'> & { type PopulatedConversation = Omit<ConversationDocument, 'participants'> & {
participants: UserDocument[]; participants: UserDocument[];
@ -24,6 +24,30 @@ export class ConversationsService {
@InjectModel(Conversation.name) @InjectModel(Conversation.name)
private conversationModel: Model<ConversationDocument>, private conversationModel: Model<ConversationDocument>,
) {} ) {}
private sseClients = new Map<string, Subject<MessageEvent>>();
createSseStream(userId: string): Observable<MessageEvent> {
const subject = new Subject<MessageEvent>();
this.sseClients.set(userId, subject);
// cleanup when client disconnects
return new Observable((observer) => {
const sub = subject.subscribe(observer);
return () => {
sub.unsubscribe();
this.sseClients.delete(userId);
};
});
}
pushToUser(userId: string, event: string, data: object) {
const subject = this.sseClients.get(userId);
if (!subject) return;
subject.next({ type: event, data } as MessageEvent);
}
async create( async create(
createConversationDto: CreateConversationDto, createConversationDto: CreateConversationDto,
): Promise<ConversationDocument> { ): Promise<ConversationDocument> {
@ -60,13 +84,20 @@ export class ConversationsService {
throw new ConflictException('Can not add users to P2P chat'); throw new ConflictException('Can not add users to P2P chat');
} }
return await this.conversationModel.findByIdAndUpdate( const updated = await this.conversationModel
.findByIdAndUpdate(
convId, convId,
{ {
$addToSet: { participants: userId }, // prevent duplicate addition $addToSet: { participants: userId }, // prevent duplicate addition
}, },
{ new: true }, { new: true },
).populate<{ participants: UserDocument[] }>('participants', 'name'); )
.populate<{ participants: UserDocument[] }>('participants', 'name');
// push to target user if online
this.pushToUser(userId, 'groupInvite', { conversation: updated });
return updated;
} }
async removeParticipant( async removeParticipant(
@ -81,12 +112,19 @@ export class ConversationsService {
throw new ConflictException('Can not remove users from P2P chat'); throw new ConflictException('Can not remove users from P2P chat');
} }
return await this.conversationModel.findByIdAndUpdate( const updated = await this.conversationModel
.findByIdAndUpdate(
conversationId, conversationId,
{ {
$pull: { participants: userId }, $pull: { participants: userId },
}, },
{ new: true }, { new: true },
).populate<{ participants: UserDocument[] }>('participants', 'name');; )
.populate<{ participants: UserDocument[] }>('participants', 'name');
// push to removed user if online
this.pushToUser(userId, 'groupRemoved', { roomId: conversationId });
return updated;
} }
} }