refactor(server): events (#13003)

* refactor(server): events

* chore: better type

---------

Co-authored-by: Daniel Dietzler <mail@ddietzler.dev>
This commit is contained in:
Jason Rasmussen
2024-09-30 10:35:11 -04:00
committed by GitHub
parent 95c67949f7
commit a2d457b01d
28 changed files with 260 additions and 259 deletions

View File

@@ -1,6 +1,5 @@
import { Inject, Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { EventEmitter2 } from '@nestjs/event-emitter';
import {
OnGatewayConnection,
OnGatewayDisconnect,
@@ -13,16 +12,17 @@ import {
ArgsOf,
ClientEventMap,
EmitEvent,
EmitHandler,
EventItem,
IEventRepository,
ServerEvent,
ServerEventMap,
serverEvents,
ServerEvents,
} from 'src/interfaces/event.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { AuthService } from 'src/services/auth.service';
import { Instrumentation } from 'src/utils/instrumentation';
import { handlePromiseError } from 'src/utils/misc';
type EmitHandlers = Partial<{ [T in EmitEvent]: EmitHandler<T>[] }>;
type EmitHandlers = Partial<{ [T in EmitEvent]: Array<EventItem<T>> }>;
@Instrumentation()
@WebSocketGateway({
@@ -39,7 +39,6 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
constructor(
private moduleRef: ModuleRef,
private eventEmitter: EventEmitter2,
@Inject(ILoggerRepository) private logger: ILoggerRepository,
) {
this.logger.setContext(EventRepository.name);
@@ -48,14 +47,10 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
afterInit(server: Server) {
this.logger.log('Initialized websocket server');
for (const event of Object.values(ServerEvent)) {
if (event === ServerEvent.WEBSOCKET_CONNECT) {
continue;
}
server.on(event, (data: unknown) => {
for (const event of serverEvents) {
server.on(event, (...args: ArgsOf<any>) => {
this.logger.debug(`Server event: ${event} (receive)`);
this.eventEmitter.emit(event, data);
handlePromiseError(this.onEvent({ name: event, args, server: true }), this.logger);
});
}
}
@@ -72,7 +67,7 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
if (auth.session) {
await client.join(auth.session.id);
}
this.serverSend(ServerEvent.WEBSOCKET_CONNECT, { userId: auth.user.id });
await this.onEvent({ name: 'websocket.connect', args: [{ userId: auth.user.id }], server: false });
} catch (error: Error | any) {
this.logger.error(`Websocket connection error: ${error}`, error?.stack);
client.emit('error', 'unauthorized');
@@ -85,18 +80,29 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
await client.leave(client.nsp.name);
}
on<T extends EmitEvent>(event: T, handler: EmitHandler<T>): void {
on<T extends EmitEvent>(item: EventItem<T>): void {
const event = item.event;
if (!this.emitHandlers[event]) {
this.emitHandlers[event] = [];
}
this.emitHandlers[event].push(handler);
this.emitHandlers[event].push(item);
}
async emit<T extends EmitEvent>(event: T, ...args: ArgsOf<T>): Promise<void> {
const handlers = this.emitHandlers[event] || [];
for (const handler of handlers) {
await handler(...args);
return this.onEvent({ name: event, args, server: false });
}
private async onEvent<T extends EmitEvent>(event: { name: T; args: ArgsOf<T>; server: boolean }): Promise<void> {
const handlers = this.emitHandlers[event.name] || [];
for (const { handler, server } of handlers) {
// exclude handlers that ignore server events
if (!server && event.server) {
continue;
}
await handler(...event.args);
}
}
@@ -108,9 +114,8 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
this.server?.emit(event, data);
}
serverSend<E extends keyof ServerEventMap>(event: E, data: ServerEventMap[E]) {
serverSend<T extends ServerEvents>(event: T, ...args: ArgsOf<T>): void {
this.logger.debug(`Server event: ${event} (send)`);
this.server?.serverSideEmit(event, data);
return this.eventEmitter.emit(event, data);
this.server?.serverSideEmit(event, ...args);
}
}