better serial queue handling

This commit is contained in:
mertalev
2026-02-14 04:42:29 -05:00
parent 0160e6fd5f
commit 0e1f61176a
2 changed files with 17 additions and 12 deletions

View File

@@ -22,7 +22,7 @@ import { ConfigRepository } from 'src/repositories/config.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { DB } from 'src/schema';
import { JobCounts, JobItem, JobOf } from 'src/types';
import { ConcurrentQueueName, JobCounts, JobItem, JobOf } from 'src/types';
import { asPostgresConnectionConfig } from 'src/utils/database';
import { getTable, InsertRow, QueueWorker, WriteBuffer } from 'src/utils/job-queue.util';
import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc';
@@ -34,7 +34,20 @@ type JobMapItem = {
label: string;
};
const SERIAL_QUEUES = [
QueueName.FacialRecognition,
QueueName.StorageTemplateMigration,
QueueName.DuplicateDetection,
QueueName.BackupDatabase,
];
export const isConcurrentQueue = (name: QueueName): name is ConcurrentQueueName => !SERIAL_QUEUES.includes(name);
const getClaimBatch = (queueName: QueueName): number => {
if (SERIAL_QUEUES.includes(queueName)) {
return 1;
}
switch (queueName) {
case QueueName.VideoConversion: {
return 1;

View File

@@ -28,8 +28,9 @@ import {
QueueName,
} from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository';
import { isConcurrentQueue } from 'src/repositories/job.repository';
import { BaseService } from 'src/services/base.service';
import { ConcurrentQueueName, JobItem } from 'src/types';
import { JobItem } from 'src/types';
import { handlePromiseError } from 'src/utils/misc';
const asNightlyTasksCron = (config: SystemConfig) => {
@@ -88,7 +89,7 @@ export class QueueService extends BaseService {
this.logger.debug(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) {
let concurrency = 1;
if (this.isConcurrentQueue(queueName)) {
if (isConcurrentQueue(queueName)) {
concurrency = config.job[queueName].concurrency;
}
this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`);
@@ -250,15 +251,6 @@ export class QueueService extends BaseService {
}
}
private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName {
return ![
QueueName.FacialRecognition,
QueueName.StorageTemplateMigration,
QueueName.DuplicateDetection,
QueueName.BackupDatabase,
].includes(name);
}
async handleNightlyJobs() {
const config = await this.getConfig({ withCache: false });
const jobs: JobItem[] = [];