From 0e1f61176a2d9b4544c3fb0e17b247511be0cb40 Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Sat, 14 Feb 2026 04:42:29 -0500 Subject: [PATCH] better serial queue handling --- server/src/repositories/job.repository.ts | 15 ++++++++++++++- server/src/services/queue.service.ts | 14 +++----------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 9d78c98308..c2085f97c0 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -22,7 +22,7 @@ import { ConfigRepository } from 'src/repositories/config.repository'; import { EventRepository } from 'src/repositories/event.repository'; import { LoggingRepository } from 'src/repositories/logging.repository'; import { DB } from 'src/schema'; -import { JobCounts, JobItem, JobOf } from 'src/types'; +import { ConcurrentQueueName, JobCounts, JobItem, JobOf } from 'src/types'; import { asPostgresConnectionConfig } from 'src/utils/database'; import { getTable, InsertRow, QueueWorker, WriteBuffer } from 'src/utils/job-queue.util'; import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc'; @@ -34,7 +34,20 @@ type JobMapItem = { label: string; }; +const SERIAL_QUEUES = [ + QueueName.FacialRecognition, + QueueName.StorageTemplateMigration, + QueueName.DuplicateDetection, + QueueName.BackupDatabase, +]; + +export const isConcurrentQueue = (name: QueueName): name is ConcurrentQueueName => !SERIAL_QUEUES.includes(name); + const getClaimBatch = (queueName: QueueName): number => { + if (SERIAL_QUEUES.includes(queueName)) { + return 1; + } + switch (queueName) { case QueueName.VideoConversion: { return 1; diff --git a/server/src/services/queue.service.ts b/server/src/services/queue.service.ts index ef50e1d34a..58dd5a5128 100644 --- a/server/src/services/queue.service.ts +++ b/server/src/services/queue.service.ts @@ -28,8 +28,9 @@ import { QueueName, } from 'src/enum'; import { ArgOf } from 'src/repositories/event.repository'; +import { isConcurrentQueue } from 'src/repositories/job.repository'; import { BaseService } from 'src/services/base.service'; -import { ConcurrentQueueName, JobItem } from 'src/types'; +import { JobItem } from 'src/types'; import { handlePromiseError } from 'src/utils/misc'; const asNightlyTasksCron = (config: SystemConfig) => { @@ -88,7 +89,7 @@ export class QueueService extends BaseService { this.logger.debug(`Updating queue concurrency settings`); for (const queueName of Object.values(QueueName)) { let concurrency = 1; - if (this.isConcurrentQueue(queueName)) { + if (isConcurrentQueue(queueName)) { concurrency = config.job[queueName].concurrency; } this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`); @@ -250,15 +251,6 @@ export class QueueService extends BaseService { } } - private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName { - return ![ - QueueName.FacialRecognition, - QueueName.StorageTemplateMigration, - QueueName.DuplicateDetection, - QueueName.BackupDatabase, - ].includes(name); - } - async handleNightlyJobs() { const config = await this.getConfig({ withCache: false }); const jobs: JobItem[] = [];