diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 118e4b2019..9d78c98308 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common'; import { ModuleRef, Reflector } from '@nestjs/core'; import { ClassConstructor } from 'class-transformer'; import { Kysely, sql } from 'kysely'; -import { InjectKysely } from 'nestjs-kysely'; +import { PostgresJSDialect } from 'kysely-postgres-js'; import { setTimeout } from 'node:timers/promises'; import postgres from 'postgres'; import { JobConfig } from 'src/decorators'; @@ -64,7 +64,8 @@ export class JobRepository { private workers: Partial> = {}; private handlers: Partial> = {}; private writeBuffer!: WriteBuffer; - private writePool: postgres.Sql | null = null; + private pool: postgres.Sql | null = null; + private db!: Kysely; private listenConn: postgres.Sql | null = null; private listenReady = false; private pauseState: Partial> = {}; @@ -74,7 +75,6 @@ export class JobRepository { private configRepository: ConfigRepository, private eventRepository: EventRepository, private logger: LoggingRepository, - @InjectKysely() private db: Kysely, ) { this.logger.setContext(JobRepository.name); } @@ -128,8 +128,9 @@ export class JobRepository { } } - this.writePool = this.createPgConnection({ max: 4, connection: { synchronous_commit: 'off' } }); - this.writeBuffer = new WriteBuffer(this.writePool, (queue) => this.notify(queue)); + this.pool = this.createPgConnection({ max: 10, connection: { synchronous_commit: 'off' } }); + this.db = new Kysely({ dialect: new PostgresJSDialect({ postgres: this.pool }) }); + this.writeBuffer = new WriteBuffer(this.pool, (queue) => this.notify(queue)); } async startWorkers() { @@ -140,7 +141,7 @@ export class JobRepository { .updateTable(getTable(this.db, queueName)) .set({ status: JobQueueStatus.Pending, startedAt: null, expiresAt: null }) .where('status', '=', JobQueueStatus.Active) - .where('expiresAt', '<', sql`now()`) + .where('expiresAt', '<', sql`now()`) // needed for multi-instance safety .execute(), ), ); @@ -491,9 +492,9 @@ export class JobRepository { await this.writeBuffer.flush(); } - if (this.writePool) { - await this.writePool.end(); - this.writePool = null; + if (this.pool) { + await this.pool.end(); + this.pool = null; } if (this.listenConn) { await this.listenConn.end(); diff --git a/server/src/services/job.service.ts b/server/src/services/job.service.ts index 2a47745a6c..7d3d5efb26 100644 --- a/server/src/services/job.service.ts +++ b/server/src/services/job.service.ts @@ -53,7 +53,7 @@ export class JobService extends BaseService { const response = await this.jobRepository.run(job); await this.eventRepository.emit('JobSuccess', { job, response }); if (response && typeof response === 'string' && [JobStatus.Success, JobStatus.Skipped].includes(response)) { - await this.onDone(job); + void this.onDone(job).catch((error) => this.logger.error(`Failed to queue follow-up for ${job.name}: ${error}`)); } } catch (error: Error | any) { await this.eventRepository.emit('JobError', { job, error }); diff --git a/server/src/utils/job-queue.util.ts b/server/src/utils/job-queue.util.ts index 47419f2b30..5a0172e3e2 100644 --- a/server/src/utils/job-queue.util.ts +++ b/server/src/utils/job-queue.util.ts @@ -284,7 +284,7 @@ export class QueueWorker { expiresAt: null, }) .where('status', '=', JobQueueStatus.Active) - .where('expiresAt', '<', sql`now()`) + .where('expiresAt', '<', sql`now()`) // needed for multi-instance safety .executeTakeFirst(); }