diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 9bb7268eb1..bf076a1cff 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -143,7 +143,11 @@ export class JobRepository { this.pool = this.createPgConnection({ max: 20, connection: { synchronous_commit: 'off' } }); this.db = new Kysely({ dialect: new PostgresJSDialect({ postgres: this.pool }) }); - this.writeBuffer = new WriteBuffer(this.pool, (queue) => this.notify(queue)); + this.writeBuffer = new WriteBuffer( + this.pool, + (queue) => this.notify(queue), + (error) => this.logger.error(`Failed to flush job write buffer: ${error}`), + ); } async startWorkers() { @@ -287,9 +291,9 @@ export class JobRepository { return (this.handlers[name] as JobMapItem).queueName; } - queueAll(items: JobItem[]): Promise { + queueAll(items: JobItem[]): void { if (items.length === 0) { - return Promise.resolve(); + return; } const bufferItems: { queue: QueueName; row: InsertRow }[] = []; @@ -308,11 +312,11 @@ export class JobRepository { }); } - return this.writeBuffer.add(bufferItems); + this.writeBuffer.add(bufferItems); } - queue(item: JobItem): Promise { - return this.queueAll([item]); + queue(item: JobItem): void { + this.queueAll([item]); } async waitForQueueCompletion(...queues: QueueName[]): Promise { diff --git a/server/src/services/database-backup.service.ts b/server/src/services/database-backup.service.ts index de7090fa83..379dc2178a 100644 --- a/server/src/services/database-backup.service.ts +++ b/server/src/services/database-backup.service.ts @@ -29,7 +29,6 @@ import { UnsupportedPostgresError, } from 'src/utils/database-backups'; import { ImmichFileResponse } from 'src/utils/file'; -import { handlePromiseError } from 'src/utils/misc'; @Injectable() export class DatabaseBackupService { @@ -68,7 +67,7 @@ export class DatabaseBackupService { this.cronRepository.create({ name: 'backupDatabase', expression: database.cronExpression, - onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.DatabaseBackup }), this.logger), + onTick: () => this.jobRepository.queue({ name: JobName.DatabaseBackup }), start: database.enabled, }); } diff --git a/server/src/services/library.service.ts b/server/src/services/library.service.ts index 841fa4743c..10463ebfdb 100644 --- a/server/src/services/library.service.ts +++ b/server/src/services/library.service.ts @@ -47,7 +47,7 @@ export class LibraryService extends BaseService { this.cronRepository.create({ name: CronJob.LibraryScan, expression: scan.cronExpression, - onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.LibraryScanQueueAll }), this.logger), + onTick: () => this.jobRepository.queue({ name: JobName.LibraryScanQueueAll }), start: scan.enabled, }); } diff --git a/server/src/services/storage-template.service.ts b/server/src/services/storage-template.service.ts index 4d01493b14..0586a6de1f 100644 --- a/server/src/services/storage-template.service.ts +++ b/server/src/services/storage-template.service.ts @@ -135,7 +135,7 @@ export class StorageTemplateService extends BaseService { @OnEvent({ name: 'AssetMetadataExtracted' }) onAssetMetadataExtracted({ source, assetId }: ArgOf<'AssetMetadataExtracted'>) { - void this.jobRepository.queue({ name: JobName.StorageTemplateMigrationSingle, data: { source, id: assetId } }); + this.jobRepository.queue({ name: JobName.StorageTemplateMigrationSingle, data: { source, id: assetId } }); } @OnJob({ name: JobName.StorageTemplateMigrationSingle, queue: QueueName.StorageTemplateMigration }) diff --git a/server/src/utils/job-queue.util.ts b/server/src/utils/job-queue.util.ts index 5a0172e3e2..be54362618 100644 --- a/server/src/utils/job-queue.util.ts +++ b/server/src/utils/job-queue.util.ts @@ -325,15 +325,15 @@ export class QueueWorker { export class WriteBuffer { private buffers = Object.fromEntries(Object.values(QueueName).map((name) => [name as QueueName, [] as InsertRow[]])); - private pending: Deferred | null = null; private timer: ReturnType | null = null; constructor( private pgPool: postgres.Sql, private notify: (queue: QueueName) => Promise, + private onFlushError?: (error: unknown) => void, ) {} - async add(items: { queue: QueueName; row: InsertRow }[]): Promise { + add(items: { queue: QueueName; row: InsertRow }[]): void { if (items.length === 0) { return; } @@ -342,10 +342,8 @@ export class WriteBuffer { this.buffers[queue].push(row); } if (!this.timer) { - this.pending = createDeferred(); - this.timer = setTimeout(() => void this.flush(), 10); + this.timer = setTimeout(() => void this.flush().catch((error) => this.onFlushError?.(error)), 10); } - return this.pending!.promise; } async flush(): Promise { @@ -353,8 +351,6 @@ export class WriteBuffer { clearTimeout(this.timer); this.timer = null; } - const deferred = this.pending; - this.pending = null; const promises: Promise[] = []; @@ -385,12 +381,7 @@ export class WriteBuffer { } } - try { - await Promise.all(promises); - deferred?.resolve(); - } catch (error) { - deferred?.reject(error); - } + await Promise.all(promises); } private async copyInsert(tableName: string, rows: InsertRow[]) { @@ -466,13 +457,6 @@ const QUEUE_TABLE = { [QueueName.Editor]: 'jobs_editor', } as const; -const createDeferred = (): Deferred => { - let resolve!: () => void; - let reject!: (error: unknown) => void; - const promise = new Promise((_resolve, _reject) => ((resolve = _resolve), (reject = _reject))); - return { promise, resolve, reject }; -}; - interface QueueWorkerOptions { queueName: QueueName; stallTimeout: number; @@ -484,4 +468,3 @@ interface QueueWorkerOptions { onJob: (job: JobItem) => Promise; } -type Deferred = { promise: Promise; resolve: () => void; reject: (error: unknown) => void }; diff --git a/server/test/repositories/job.repository.mock.ts b/server/test/repositories/job.repository.mock.ts index f42dfc3f0e..d374d3e204 100644 --- a/server/test/repositories/job.repository.mock.ts +++ b/server/test/repositories/job.repository.mock.ts @@ -12,8 +12,8 @@ export const newJobRepositoryMock = (): Mocked Promise.resolve()), - queueAll: vitest.fn().mockImplementation(() => Promise.resolve()), + queue: vitest.fn(), + queueAll: vitest.fn(), isActive: vitest.fn(), isPaused: vitest.fn(), getJobCounts: vitest.fn(),