fix bottleneck

This commit is contained in:
mertalev
2026-02-14 03:48:28 -05:00
parent 839fb61340
commit 0160e6fd5f
3 changed files with 12 additions and 11 deletions

View File

@@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common';
import { ModuleRef, Reflector } from '@nestjs/core';
import { ClassConstructor } from 'class-transformer';
import { Kysely, sql } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { PostgresJSDialect } from 'kysely-postgres-js';
import { setTimeout } from 'node:timers/promises';
import postgres from 'postgres';
import { JobConfig } from 'src/decorators';
@@ -64,7 +64,8 @@ export class JobRepository {
private workers: Partial<Record<QueueName, QueueWorker>> = {};
private handlers: Partial<Record<JobName, JobMapItem>> = {};
private writeBuffer!: WriteBuffer;
private writePool: postgres.Sql | null = null;
private pool: postgres.Sql | null = null;
private db!: Kysely<DB>;
private listenConn: postgres.Sql | null = null;
private listenReady = false;
private pauseState: Partial<Record<QueueName, boolean>> = {};
@@ -74,7 +75,6 @@ export class JobRepository {
private configRepository: ConfigRepository,
private eventRepository: EventRepository,
private logger: LoggingRepository,
@InjectKysely() private db: Kysely<DB>,
) {
this.logger.setContext(JobRepository.name);
}
@@ -128,8 +128,9 @@ export class JobRepository {
}
}
this.writePool = this.createPgConnection({ max: 4, connection: { synchronous_commit: 'off' } });
this.writeBuffer = new WriteBuffer(this.writePool, (queue) => this.notify(queue));
this.pool = this.createPgConnection({ max: 10, connection: { synchronous_commit: 'off' } });
this.db = new Kysely<DB>({ dialect: new PostgresJSDialect({ postgres: this.pool }) });
this.writeBuffer = new WriteBuffer(this.pool, (queue) => this.notify(queue));
}
async startWorkers() {
@@ -140,7 +141,7 @@ export class JobRepository {
.updateTable(getTable(this.db, queueName))
.set({ status: JobQueueStatus.Pending, startedAt: null, expiresAt: null })
.where('status', '=', JobQueueStatus.Active)
.where('expiresAt', '<', sql<Date>`now()`)
.where('expiresAt', '<', sql<Date>`now()`) // needed for multi-instance safety
.execute(),
),
);
@@ -491,9 +492,9 @@ export class JobRepository {
await this.writeBuffer.flush();
}
if (this.writePool) {
await this.writePool.end();
this.writePool = null;
if (this.pool) {
await this.pool.end();
this.pool = null;
}
if (this.listenConn) {
await this.listenConn.end();

View File

@@ -53,7 +53,7 @@ export class JobService extends BaseService {
const response = await this.jobRepository.run(job);
await this.eventRepository.emit('JobSuccess', { job, response });
if (response && typeof response === 'string' && [JobStatus.Success, JobStatus.Skipped].includes(response)) {
await this.onDone(job);
void this.onDone(job).catch((error) => this.logger.error(`Failed to queue follow-up for ${job.name}: ${error}`));
}
} catch (error: Error | any) {
await this.eventRepository.emit('JobError', { job, error });

View File

@@ -284,7 +284,7 @@ export class QueueWorker {
expiresAt: null,
})
.where('status', '=', JobQueueStatus.Active)
.where('expiresAt', '<', sql<Date>`now()`)
.where('expiresAt', '<', sql<Date>`now()`) // needed for multi-instance safety
.executeTakeFirst();
}