From 27de6e7c1af7f2eb0ea50feaef58fa8f4080c4b8 Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Sat, 14 Feb 2026 00:48:36 -0500 Subject: [PATCH] fixes --- server/src/repositories/job.repository.ts | 11 +++-------- server/src/utils/job-queue.util.ts | 24 ++++++++++++++++++----- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 920f3c57d3..4b41667be2 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -127,12 +127,12 @@ export class JobRepository { throw new ImmichStartupError(errorMessage); } } + + this.writePool = this.createPgConnection({ max: 4, connection: { synchronous_commit: 'off' } }); + this.writeBuffer = new WriteBuffer(this.writePool, (queue) => this.notify(queue)); } async startWorkers() { - this.writePool = this.createPgConnection({ max: 4, connection: { synchronous_commit: 'off' } }); - this.writeBuffer = new WriteBuffer(this.writePool, (queue) => this.notify(queue)); - // Startup sweep: reset any active jobs from a previous crash await Promise.all( Object.values(QueueName).map((queueName) => @@ -145,7 +145,6 @@ export class JobRepository { ), ); - // Create workers for (const queueName of Object.values(QueueName)) { this.workers[queueName] = new QueueWorker({ queueName, @@ -157,7 +156,6 @@ export class JobRepository { }); } - // Setup LISTEN/NOTIFY, sync pause state, and trigger initial fetch await this.setupListen(); } @@ -465,16 +463,13 @@ export class JobRepository { } async onShutdown(): Promise { - // Stop workers const shutdownPromises = Object.values(this.workers).map((worker) => worker.shutdown()); await Promise.all(shutdownPromises); - // Flush write buffer if (this.writeBuffer) { await this.writeBuffer.flush(); } - // Close dedicated connections if (this.writePool) { await this.writePool.end(); this.writePool = null; diff --git a/server/src/utils/job-queue.util.ts b/server/src/utils/job-queue.util.ts index c52c9a77e9..70a4462775 100644 --- a/server/src/utils/job-queue.util.ts +++ b/server/src/utils/job-queue.util.ts @@ -208,6 +208,7 @@ export class QueueWorker { .with('next', (qb) => qb .selectFrom(this.table) + .select('id') .where('status', '=', JobQueueStatus.Pending) .where('runAfter', '<=', sql`now()`) .orderBy('priority', 'desc') @@ -367,14 +368,27 @@ export class WriteBuffer { private insertChunk(tableName: string, rows: InsertRow[]) { const now = new Date().toISOString(); + const code = []; + const data = []; + const priority = []; + const dedupKey = []; + const runAfter = []; + for (const row of rows) { + code.push(row.code); + data.push(row.data ?? null); + priority.push(row.priority ?? 0); + dedupKey.push(row.dedupKey); + runAfter.push(row.runAfter?.toISOString() ?? now); + } + return this.pgPool` INSERT INTO ${this.pgPool(tableName)} (code, data, priority, "dedupKey", "runAfter") SELECT * FROM unnest( - ${rows.map((r) => r.code)}::smallint[], - ${rows.map((r) => (r.data != null ? JSON.stringify(r.data) : null))}::jsonb[], - ${rows.map((r) => r.priority ?? 0)}::smallint[], - ${rows.map((r) => r.dedupKey)}::text[], - ${rows.map((r) => r.runAfter?.toISOString() ?? now)}::timestamptz[] + ${code}::smallint[], + ${data as any}::jsonb[], + ${priority}::smallint[], + ${dedupKey}::text[], + ${runAfter}::timestamptz[] ) ON CONFLICT ("dedupKey") WHERE "dedupKey" IS NOT NULL AND status = ${JobQueueStatus.Pending} DO NOTHING