diff --git a/server/src/enum.ts b/server/src/enum.ts index 8f509754da..743fc3b1e7 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -569,6 +569,13 @@ export enum QueueName { Editor = 'editor', } +export const JobQueueStatus = { + Pending: 0, + Active: 1, + Failed: 2, +} as const; +export type JobQueueStatus = (typeof JobQueueStatus)[keyof typeof JobQueueStatus]; + export enum QueueJobStatus { Active = 'active', Failed = 'failed', @@ -658,6 +665,12 @@ export enum JobName { WorkflowRun = 'WorkflowRun', } +type JobNameValue = (typeof JobName)[keyof typeof JobName]; +const names = Object.values(JobName); +export const JobCode = Object.fromEntries(names.map((key, i) => [key, i])) as Record; +export const JOB_CODE_TO_NAME = Object.fromEntries(names.map((key, i) => [i, key])) as Record; +export type JobCode = (typeof JobCode)[keyof typeof JobCode]; + export enum QueueCommand { Start = 'start', /** @deprecated Use `updateQueue` instead */ diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 9bc7c40988..d05b18962d 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -1,21 +1,30 @@ import { Injectable } from '@nestjs/common'; import { ModuleRef, Reflector } from '@nestjs/core'; -import { Kysely, sql } from 'kysely'; import { ClassConstructor } from 'class-transformer'; -import { setTimeout } from 'node:timers/promises'; +import { Kysely, sql } from 'kysely'; import { InjectKysely } from 'nestjs-kysely'; +import { setTimeout } from 'node:timers/promises'; import postgres from 'postgres'; import { JobConfig } from 'src/decorators'; import { QueueJobResponseDto, QueueJobSearchDto } from 'src/dtos/queue.dto'; -import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueJobStatus, QueueName } from 'src/enum'; +import { + JOB_CODE_TO_NAME, + JobCode, + JobName, + JobQueueStatus, + JobStatus, + MetadataKey, + QueueCleanType, + QueueJobStatus, + QueueName, +} from 'src/enum'; import { ConfigRepository } from 'src/repositories/config.repository'; import { EventRepository } from 'src/repositories/event.repository'; -import { QUEUE_TABLE, WriteBuffer } from 'src/repositories/job.write-buffer'; -import { charToJobName, jobNameToChar, QueueWorker } from 'src/repositories/job.worker'; import { LoggingRepository } from 'src/repositories/logging.repository'; import { DB } from 'src/schema'; import { JobCounts, JobItem, JobOf } from 'src/types'; import { asPostgresConnectionConfig } from 'src/utils/database'; +import { getTable, InsertRow, QueueWorker, WriteBuffer } from 'src/utils/job-queue.util'; import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc'; type JobMapItem = { @@ -25,57 +34,29 @@ type JobMapItem = { label: string; }; -// Status char codes -const STATUS_PENDING = 'p'; -const STATUS_ACTIVE = 'a'; -const STATUS_FAILED = 'f'; - -// Stall timeouts in milliseconds -const STALL_LONG = 60 * 60 * 1000; // 1 hour -const STALL_MEDIUM = 30 * 60 * 1000; // 30 min -const STALL_DEFAULT = 5 * 60 * 1000; // 5 min - -const getStallTimeout = (queueName: QueueName): number => { - switch (queueName) { - case QueueName.VideoConversion: - case QueueName.BackupDatabase: - case QueueName.Editor: { - return STALL_LONG; - } - case QueueName.Library: - case QueueName.StorageTemplateMigration: { - return STALL_MEDIUM; - } - default: { - return STALL_DEFAULT; - } - } -}; - const getClaimBatch = (queueName: QueueName): number => { switch (queueName) { - case QueueName.VideoConversion: - case QueueName.BackupDatabase: - case QueueName.StorageTemplateMigration: - case QueueName.Editor: - case QueueName.FacialRecognition: - case QueueName.DuplicateDetection: { + case QueueName.VideoConversion: { return 1; } + case QueueName.FaceDetection: + case QueueName.SmartSearch: + case QueueName.Ocr: { + return 2; + } default: { return 100; // will be clamped to slotsAvailable by the worker } } }; -// Map QueueJobStatus to our "char" status codes -const STATUS_FILTER: Record = { - [QueueJobStatus.Active]: STATUS_ACTIVE, - [QueueJobStatus.Failed]: STATUS_FAILED, - [QueueJobStatus.Waiting]: STATUS_PENDING, +const STATUS_FILTER = { + [QueueJobStatus.Active]: JobQueueStatus.Active, + [QueueJobStatus.Failed]: JobQueueStatus.Failed, + [QueueJobStatus.Waiting]: JobQueueStatus.Pending, [QueueJobStatus.Complete]: null, // completed jobs are deleted - [QueueJobStatus.Delayed]: STATUS_PENDING, // delayed = pending with future run_after - [QueueJobStatus.Paused]: STATUS_PENDING, // paused queue has pending jobs + [QueueJobStatus.Delayed]: JobQueueStatus.Pending, // delayed = pending with future run_after + [QueueJobStatus.Paused]: JobQueueStatus.Pending, // paused queue has pending jobs }; @Injectable() @@ -84,6 +65,7 @@ export class JobRepository { private handlers: Partial> = {}; private writeBuffer!: WriteBuffer; private listenConn: postgres.Sql | null = null; + private listenReady = false; private pauseState: Partial> = {}; constructor( @@ -146,57 +128,38 @@ export class JobRepository { } } - startWorkers() { + async startWorkers() { this.writeBuffer = new WriteBuffer(this.db, (queue) => this.notify(queue)); // Startup sweep: reset any active jobs from a previous crash - const startupPromises = Object.values(QueueName).map(async (queueName) => { - const tableName = QUEUE_TABLE[queueName]; - await sql` - UPDATE ${sql.table(tableName)} - SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL - WHERE "status" = ${STATUS_ACTIVE}::"char" - `.execute(this.db); - }); + await Promise.all( + Object.values(QueueName).map((queueName) => + this.db + .updateTable(getTable(this.db, queueName)) + .set({ status: JobQueueStatus.Pending, startedAt: null, expiresAt: null }) + .where('status', '=', JobQueueStatus.Active) + .where('expiresAt', '<', sql`now()`) + .execute(), + ), + ); - // Load pause state and setup workers - void Promise.all(startupPromises).then(async () => { - // Load pause state from DB - const metaRows = await this.db.selectFrom('job_queue_meta').selectAll().execute(); - for (const row of metaRows) { - this.pauseState[row.queue_name as QueueName] = row.is_paused; - } + // Create workers + for (const queueName of Object.values(QueueName)) { + this.workers[queueName] = new QueueWorker({ + queueName, + stallTimeout: 5 * 60 * 1000, // 5 min + claimBatch: getClaimBatch(queueName), + concurrency: 1, + db: this.db, + onJob: (job) => this.eventRepository.emit('JobRun', queueName, job), + }); + } - // Create workers - for (const queueName of Object.values(QueueName)) { - const worker = new QueueWorker({ - queueName, - tableName: QUEUE_TABLE[queueName], - stallTimeout: getStallTimeout(queueName), - claimBatch: getClaimBatch(queueName), - concurrency: 1, - db: this.db, - onJob: (job) => this.eventRepository.emit('JobRun', queueName, job), - }); - - if (this.pauseState[queueName]) { - worker.pause(); - } - - this.workers[queueName] = worker; - } - - // Setup LISTEN/NOTIFY - await this.setupListen(); - - // Trigger initial fetch for all workers - for (const worker of Object.values(this.workers)) { - worker.onNotification(); - } - }); + // Setup LISTEN/NOTIFY, sync pause state, and trigger initial fetch + await this.setupListen(); } - async run({ name, data }: JobItem) { + run({ name, data }: JobItem) { const item = this.handlers[name as JobName]; if (!item) { this.logger.warn(`Skipping unknown job: "${name}"`); @@ -216,9 +179,14 @@ export class JobRepository { worker.setConcurrency(concurrency); } - isActive(name: QueueName): Promise { - const worker = this.workers[name]; - return Promise.resolve(worker ? worker.activeJobCount > 0 : false); + async isActive(name: QueueName): Promise { + const result = await this.db + .selectFrom(getTable(this.db, name)) + .select('id') + .where('status', '=', JobQueueStatus.Active) + .limit(1) + .executeTakeFirst(); + return result !== undefined; } isPaused(name: QueueName): Promise { @@ -229,37 +197,38 @@ export class JobRepository { this.pauseState[name] = true; await this.db .insertInto('job_queue_meta') - .values({ queue_name: name, is_paused: true }) - .onConflict((oc) => oc.column('queue_name').doUpdateSet({ is_paused: true })) + .values({ queueName: name, isPaused: true }) + .onConflict((oc) => oc.column('queueName').doUpdateSet({ isPaused: true })) .execute(); this.workers[name]?.pause(); + await this.notify(name, 'pause'); } async resume(name: QueueName) { this.pauseState[name] = false; await this.db .insertInto('job_queue_meta') - .values({ queue_name: name, is_paused: false }) - .onConflict((oc) => oc.column('queue_name').doUpdateSet({ is_paused: false })) + .values({ queueName: name, isPaused: false }) + .onConflict((oc) => oc.column('queueName').doUpdateSet({ isPaused: false })) .execute(); this.workers[name]?.resume(); + await this.notify(name, 'resume'); } - async empty(name: QueueName) { - const tableName = QUEUE_TABLE[name]; - await sql`DELETE FROM ${sql.table(tableName)} WHERE "status" = ${STATUS_PENDING}::"char"`.execute(this.db); + empty(name: QueueName) { + return this.db.deleteFrom(getTable(this.db, name)).where('status', '=', JobQueueStatus.Pending).execute(); } - async clear(name: QueueName, _type: QueueCleanType) { - const tableName = QUEUE_TABLE[name]; - await sql`DELETE FROM ${sql.table(tableName)} WHERE "status" = ${STATUS_FAILED}::"char"`.execute(this.db); + clear(name: QueueName, _type: QueueCleanType) { + return this.db.deleteFrom(getTable(this.db, name)).where('status', '=', JobQueueStatus.Failed).execute(); } async getJobCounts(name: QueueName): Promise { - const tableName = QUEUE_TABLE[name]; - const result = await sql<{ status: string; count: string }>` - SELECT "status", count(*)::text as count FROM ${sql.table(tableName)} GROUP BY "status" - `.execute(this.db); + const result = await this.db + .selectFrom(getTable(this.db, name)) + .select((eb) => ['status', eb.fn.countAll().as('count')]) + .groupBy('status') + .execute(); const counts: JobCounts = { active: 0, @@ -270,29 +239,23 @@ export class JobRepository { paused: 0, }; - for (const row of result.rows) { + for (const row of result) { switch (row.status) { - case STATUS_PENDING: { + case JobQueueStatus.Pending: { counts.waiting = Number(row.count); break; } - case STATUS_ACTIVE: { + case JobQueueStatus.Active: { counts.active = Number(row.count); break; } - case STATUS_FAILED: { + case JobQueueStatus.Failed: { counts.failed = Number(row.count); break; } } } - // In-memory active count may be more accurate than DB for in-flight jobs - const worker = this.workers[name]; - if (worker) { - counts.active = worker.activeJobCount; - } - if (this.pauseState[name]) { counts.paused = counts.waiting; counts.waiting = 0; @@ -305,32 +268,31 @@ export class JobRepository { return (this.handlers[name] as JobMapItem).queueName; } - async queueAll(items: JobItem[]): Promise { + queueAll(items: JobItem[]): Promise { if (items.length === 0) { - return; + return Promise.resolve(); } - const bufferItems: { queue: QueueName; row: { name: string; data: unknown; priority: number; dedup_key: string | null; run_after: Date } }[] = []; - + const bufferItems: { queue: QueueName; row: InsertRow }[] = []; for (const item of items) { const queueName = this.getQueueName(item.name); const options = this.getJobOptions(item); bufferItems.push({ queue: queueName, row: { - name: jobNameToChar(item.name), - data: item.data || {}, - priority: options?.priority ?? 0, - dedup_key: options?.dedupKey ?? null, - run_after: options?.delay ? new Date(Date.now() + options.delay) : new Date(), + code: JobCode[item.name], + data: item.data ?? null, + priority: options?.priority ?? null, + dedupKey: options?.dedupKey ?? null, + runAfter: options?.delay ? new Date(Date.now() + options.delay) : null, }, }); } - await this.writeBuffer.add(bufferItems); + return this.writeBuffer.add(bufferItems); } - async queue(item: JobItem): Promise { + queue(item: JobItem): Promise { return this.queueAll([item]); } @@ -350,31 +312,31 @@ export class JobRepository { } async searchJobs(name: QueueName, dto: QueueJobSearchDto): Promise { - const tableName = QUEUE_TABLE[name]; - const statuses = dto.status ?? Object.values(QueueJobStatus); - const charStatuses = statuses - .map((s) => STATUS_FILTER[s]) - .filter((s): s is string => s !== null); + const statuses: JobQueueStatus[] = []; + for (const status of dto.status ?? Object.values(QueueJobStatus)) { + const mapped = STATUS_FILTER[status]; + if (mapped !== null && !statuses.includes(mapped)) { + statuses.push(mapped); + } + } - if (charStatuses.length === 0) { + if (statuses.length === 0) { return []; } - const uniqueStatuses = [...new Set(charStatuses)]; + const rows = await this.db + .selectFrom(getTable(this.db, name)) + .select(['id', 'code', 'data', 'runAfter']) + .where('status', 'in', statuses) + .orderBy('id', 'desc') + .limit(1000) + .execute(); - const rows = await sql<{ id: number; name: string; data: unknown; run_after: Date }>` - SELECT "id", "name", "data", "run_after" - FROM ${sql.table(tableName)} - WHERE "status" = ANY(${sql.val(uniqueStatuses)}::"char"[]) - ORDER BY "id" DESC - LIMIT 1000 - `.execute(this.db); - - return rows.rows.map((row) => ({ + return rows.map((row) => ({ id: String(row.id), - name: charToJobName(row.name) ?? (row.name as unknown as JobName), + name: JOB_CODE_TO_NAME[row.code], data: (row.data ?? {}) as object, - timestamp: new Date(row.run_after).getTime(), + timestamp: new Date(row.runAfter).getTime(), })); } @@ -403,13 +365,20 @@ export class JobRepository { /** @deprecated */ // todo: remove this when asset notifications no longer need it. - public async removeJob(name: JobName, jobID: string): Promise { - const queueName = this.getQueueName(name); - const tableName = QUEUE_TABLE[queueName]; - await sql`DELETE FROM ${sql.table(tableName)} WHERE "id" = ${Number(jobID)}`.execute(this.db); + removeJob(name: JobName, dedupKey: string) { + return this.db + .deleteFrom(getTable(this.db, this.getQueueName(name))) + .where('dedupKey', '=', dedupKey) + .where('status', '=', JobQueueStatus.Pending) + .execute(); } private async setupListen(): Promise { + if (this.listenConn) { + await this.listenConn.end(); + this.listenConn = null; + } + const { database } = this.configRepository.getEnv(); const pgConfig = asPostgresConnectionConfig(database.config); this.listenConn = postgres({ @@ -423,14 +392,69 @@ export class JobRepository { }); for (const queueName of Object.values(QueueName)) { - await this.listenConn.listen(`jobs:${queueName}`, () => { - this.workers[queueName]?.onNotification(); - }); + await this.listenConn.listen( + `jobs:${queueName}`, + (payload) => this.onNotify(queueName, payload), + () => this.onReconnect(), + ); + } + + this.listenReady = true; + await this.syncPauseState(); + for (const worker of Object.values(this.workers)) { + worker.onNotification(); } } - private async notify(queue: QueueName): Promise { - await sql`SELECT pg_notify(${`jobs:${queue}`}, '')`.execute(this.db); + private onNotify(queueName: QueueName, payload: string) { + switch (payload) { + case 'pause': { + this.pauseState[queueName] = true; + this.workers[queueName]?.pause(); + break; + } + case 'resume': { + this.pauseState[queueName] = false; + this.workers[queueName]?.resume(); + break; + } + default: { + this.workers[queueName]?.onNotification(); + break; + } + } + } + + private onReconnect() { + if (!this.listenReady) { + return; + } + this.listenReady = false; + this.logger.log('LISTEN connection re-established, syncing state'); + void this.syncPauseState().then(() => { + for (const worker of Object.values(this.workers)) { + worker.onNotification(); + } + this.listenReady = true; + }); + } + + private async syncPauseState(): Promise { + const metaRows = await this.db.selectFrom('job_queue_meta').selectAll().execute(); + for (const row of metaRows) { + const queueName = row.queueName as QueueName; + const wasPaused = this.pauseState[queueName] ?? false; + this.pauseState[queueName] = row.isPaused; + if (wasPaused && !row.isPaused) { + this.workers[queueName]?.resume(); + } else if (!wasPaused && row.isPaused) { + this.workers[queueName]?.pause(); + } + } + } + + private notify(queue: QueueName, payload = '') { + return sql`SELECT pg_notify(${`jobs:${queue}`}, ${payload})`.execute(this.db); } async onShutdown(): Promise { diff --git a/server/src/repositories/job.worker.ts b/server/src/repositories/job.worker.ts deleted file mode 100644 index 4cb423a6d1..0000000000 --- a/server/src/repositories/job.worker.ts +++ /dev/null @@ -1,298 +0,0 @@ -import { Kysely, sql } from 'kysely'; -import { JobName, QueueName } from 'src/enum'; -import { DB } from 'src/schema'; -import { JobItem } from 'src/types'; - -// Job status codes stored as "char" (single-byte PostgreSQL type) -const STATUS_PENDING = 'p'; -const STATUS_ACTIVE = 'a'; -const STATUS_FAILED = 'f'; - -// Bidirectional JobName <-> "char" mapping -const JOB_CHAR: Record = {}; -const CHAR_JOB: Record = {}; - -// Assign sequential character codes starting from 0x01 -let charCode = 1; -for (const jobName of Object.values(JobName)) { - const char = String.fromCodePoint(charCode++); - JOB_CHAR[jobName] = char; - CHAR_JOB[char] = jobName; -} - -export const jobNameToChar = (name: JobName): string => JOB_CHAR[name]; -export const charToJobName = (char: string): JobName | undefined => CHAR_JOB[char]; - -type JobRow = { - id: number; - name: string; - data: unknown; - priority: number; - status: string; - dedup_key: string | null; - run_after: Date; - started_at: Date | null; - expires_at: Date | null; - error: string | null; -}; - -export interface QueueWorkerOptions { - queueName: QueueName; - tableName: string; - stallTimeout: number; - claimBatch: number; - concurrency: number; - db: Kysely; - onJob: (job: JobItem) => Promise; -} - -export class QueueWorker { - private concurrency: number; - private activeCount = 0; - private activeJobs = new Map(); - private hasPending = true; - private fetching = false; - private paused = false; - private stopped = false; - private heartbeatTimer: ReturnType | null = null; - - private readonly queueName: QueueName; - private readonly tableName: string; - private readonly stallTimeout: number; - private readonly claimBatch: number; - private readonly db: Kysely; - private readonly onJobFn: (job: JobItem) => Promise; - - constructor(options: QueueWorkerOptions) { - this.queueName = options.queueName; - this.tableName = options.tableName; - this.stallTimeout = options.stallTimeout; - this.claimBatch = options.claimBatch; - this.concurrency = options.concurrency; - this.db = options.db; - this.onJobFn = options.onJob; - } - - get activeJobCount(): number { - return this.activeCount; - } - - onNotification(): void { - this.hasPending = true; - void this.tryFetch(); - } - - setConcurrency(n: number): void { - this.concurrency = n; - void this.tryFetch(); - } - - pause(): void { - this.paused = true; - } - - resume(): void { - this.paused = false; - this.hasPending = true; - void this.tryFetch(); - } - - async shutdown(): Promise { - this.stopped = true; - this.stopHeartbeat(); - - // Re-queue active jobs - if (this.activeJobs.size > 0) { - const ids = [...this.activeJobs.keys()]; - await sql` - UPDATE ${sql.table(this.tableName)} - SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL - WHERE "id" = ANY(${sql.val(ids)}::bigint[]) - `.execute(this.db); - } - } - - private get slotsAvailable(): number { - return Math.max(0, this.concurrency - this.activeCount); - } - - private async tryFetch(): Promise { - if (this.fetching || this.paused || this.stopped) { - return; - } - this.fetching = true; - try { - while (this.slotsAvailable > 0 && this.hasPending && !this.stopped) { - const limit = Math.min(this.slotsAvailable, this.claimBatch); - const jobs = await this.claim(limit); - if (jobs.length === 0) { - const recovered = await this.recoverStalled(); - if (recovered === 0) { - this.hasPending = false; - break; - } - continue; - } - this.activeCount += jobs.length; - for (const job of jobs) { - void this.processJob(job); - } - } - } finally { - this.fetching = false; - } - } - - private async processJob(row: JobRow): Promise { - this.activeJobs.set(row.id, { startedAt: Date.now() }); - this.startHeartbeat(); - try { - const jobName = charToJobName(row.name); - if (!jobName) { - throw new Error(`Unknown job char code: ${row.name.codePointAt(0)}`); - } - await this.onJobFn({ name: jobName, data: row.data } as JobItem); - // Success: delete completed job and try to fetch next - const next = await this.completeAndFetch(row.id, true); - this.activeJobs.delete(row.id); - if (next) { - void this.processJob(next); - } else { - this.activeCount--; - this.hasPending = false; - } - } catch (error: unknown) { - // Failure: mark as failed and try to fetch next - const errorMsg = error instanceof Error ? error.message : String(error); - const next = await this.completeAndFetch(row.id, false, errorMsg); - this.activeJobs.delete(row.id); - if (next) { - void this.processJob(next); - } else { - this.activeCount--; - this.hasPending = false; - } - } finally { - if (this.activeJobs.size === 0) { - this.stopHeartbeat(); - } - } - } - - /** - * Claim up to `limit` pending jobs using FOR UPDATE SKIP LOCKED - */ - private async claim(limit: number): Promise { - const result = await sql` - UPDATE ${sql.table(this.tableName)} SET - "status" = ${STATUS_ACTIVE}::"char", - "started_at" = now(), - "expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval - WHERE "id" IN ( - SELECT "id" FROM ${sql.table(this.tableName)} - WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now() - ORDER BY "priority" DESC, "id" ASC - FOR UPDATE SKIP LOCKED - LIMIT ${sql.lit(limit)} - ) - RETURNING * - `.execute(this.db); - return result.rows as JobRow[]; - } - - /** - * Atomically complete a job (delete on success, mark failed on failure) and claim the next one. - * Uses a CTE to combine operations in a single round-trip. - */ - private async completeAndFetch( - jobId: number, - success: boolean, - errorMsg?: string, - ): Promise { - if (success) { - const result = await sql` - WITH completed AS ( - DELETE FROM ${sql.table(this.tableName)} WHERE "id" = ${jobId} - ), - next AS ( - SELECT "id" FROM ${sql.table(this.tableName)} - WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now() - ORDER BY "priority" DESC, "id" ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - UPDATE ${sql.table(this.tableName)} SET - "status" = ${STATUS_ACTIVE}::"char", - "started_at" = now(), - "expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval - WHERE "id" = (SELECT "id" FROM next) - RETURNING * - `.execute(this.db); - return (result.rows as JobRow[])[0]; - } - - const result = await sql` - WITH failed AS ( - UPDATE ${sql.table(this.tableName)} - SET "status" = ${STATUS_FAILED}::"char", "error" = ${errorMsg ?? null} - WHERE "id" = ${jobId} - ), - next AS ( - SELECT "id" FROM ${sql.table(this.tableName)} - WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now() - ORDER BY "priority" DESC, "id" ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - UPDATE ${sql.table(this.tableName)} SET - "status" = ${STATUS_ACTIVE}::"char", - "started_at" = now(), - "expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval - WHERE "id" = (SELECT "id" FROM next) - RETURNING * - `.execute(this.db); - return (result.rows as JobRow[])[0]; - } - - /** - * Recover stalled jobs: reset jobs whose expires_at has passed - */ - private async recoverStalled(): Promise { - const result = await sql` - UPDATE ${sql.table(this.tableName)} - SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL - WHERE "status" = ${STATUS_ACTIVE}::"char" AND "expires_at" < now() - `.execute(this.db); - return Number(result.numAffectedRows ?? 0); - } - - /** - * Extend expiry for all active jobs (heartbeat) - */ - private async extendExpiry(): Promise { - if (this.activeJobs.size === 0) { - return; - } - const ids = [...this.activeJobs.keys()]; - await sql` - UPDATE ${sql.table(this.tableName)} - SET "expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval - WHERE "id" = ANY(${sql.val(ids)}::bigint[]) - `.execute(this.db); - } - - private startHeartbeat(): void { - if (this.heartbeatTimer) { - return; - } - const interval = Math.max(1000, Math.floor(this.stallTimeout / 2)); - this.heartbeatTimer = setInterval(() => void this.extendExpiry(), interval); - } - - private stopHeartbeat(): void { - if (this.heartbeatTimer) { - clearInterval(this.heartbeatTimer); - this.heartbeatTimer = null; - } - } -} diff --git a/server/src/repositories/job.write-buffer.ts b/server/src/repositories/job.write-buffer.ts deleted file mode 100644 index 3b10dd0e70..0000000000 --- a/server/src/repositories/job.write-buffer.ts +++ /dev/null @@ -1,121 +0,0 @@ -import { Kysely, sql } from 'kysely'; -import { QueueName } from 'src/enum'; -import { DB } from 'src/schema'; - -export type InsertRow = { - name: string; - data: unknown; - priority: number; - dedup_key: string | null; - run_after: Date; -}; - -type QueueTableName = keyof DB & `jobs_${string}`; - -export const QUEUE_TABLE: Record = { - [QueueName.ThumbnailGeneration]: 'jobs_thumbnail_generation', - [QueueName.MetadataExtraction]: 'jobs_metadata_extraction', - [QueueName.VideoConversion]: 'jobs_video_conversion', - [QueueName.FaceDetection]: 'jobs_face_detection', - [QueueName.FacialRecognition]: 'jobs_facial_recognition', - [QueueName.SmartSearch]: 'jobs_smart_search', - [QueueName.DuplicateDetection]: 'jobs_duplicate_detection', - [QueueName.BackgroundTask]: 'jobs_background_task', - [QueueName.StorageTemplateMigration]: 'jobs_storage_template_migration', - [QueueName.Migration]: 'jobs_migration', - [QueueName.Search]: 'jobs_search', - [QueueName.Sidecar]: 'jobs_sidecar', - [QueueName.Library]: 'jobs_library', - [QueueName.Notification]: 'jobs_notification', - [QueueName.BackupDatabase]: 'jobs_backup_database', - [QueueName.Ocr]: 'jobs_ocr', - [QueueName.Workflow]: 'jobs_workflow', - [QueueName.Editor]: 'jobs_editor', -}; - -type Deferred = { promise: Promise; resolve: () => void }; - -const createDeferred = (): Deferred => { - let resolve!: () => void; - const promise = new Promise((r) => (resolve = r)); - return { promise, resolve }; -}; - -const CHUNK_SIZE = 5000; - -export class WriteBuffer { - private buffers = new Map(); - private pending: Deferred | null = null; - private timer: ReturnType | null = null; - - constructor( - private db: Kysely, - private notify: (queue: QueueName) => Promise, - ) {} - - async add(items: { queue: QueueName; row: InsertRow }[]): Promise { - for (const { queue, row } of items) { - let buf = this.buffers.get(queue); - if (!buf) { - buf = []; - this.buffers.set(queue, buf); - } - buf.push(row); - } - if (!this.timer) { - this.pending = createDeferred(); - this.timer = setTimeout(() => void this.flush(), 10); - } - return this.pending!.promise; - } - - async flush(): Promise { - const snapshot = this.buffers; - this.buffers = new Map(); - if (this.timer) { - clearTimeout(this.timer); - this.timer = null; - } - const deferred = this.pending; - this.pending = null; - - if (snapshot.size === 0) { - deferred?.resolve(); - return; - } - - try { - for (const [queue, rows] of snapshot) { - const tableName = QUEUE_TABLE[queue]; - for (let i = 0; i < rows.length; i += CHUNK_SIZE) { - const chunk = rows.slice(i, i + CHUNK_SIZE); - await this.insertChunk(tableName, chunk); - } - await this.notify(queue); - } - } finally { - deferred?.resolve(); - } - } - - private async insertChunk(tableName: string, rows: InsertRow[]): Promise { - const names = rows.map((r) => r.name); - const datas = rows.map((r) => JSON.stringify(r.data)); - const priorities = rows.map((r) => r.priority); - const dedupKeys = rows.map((r) => r.dedup_key); - const runAfters = rows.map((r) => r.run_after.toISOString()); - - await sql` - INSERT INTO ${sql.table(tableName)} ("name", "data", "priority", "dedup_key", "run_after") - SELECT * FROM unnest( - ${sql.val(names)}::"char"[], - ${sql.val(datas)}::jsonb[], - ${sql.val(priorities)}::smallint[], - ${sql.val(dedupKeys)}::text[], - ${sql.val(runAfters)}::timestamptz[] - ) - ON CONFLICT ("dedup_key") WHERE "dedup_key" IS NOT NULL AND "status" = 'p'::"char" - DO NOTHING - `.execute(this.db); - } -} diff --git a/server/src/schema/tables/job.table.ts b/server/src/schema/tables/job.table.ts index 43e367dd85..927421144b 100644 --- a/server/src/schema/tables/job.table.ts +++ b/server/src/schema/tables/job.table.ts @@ -1,25 +1,12 @@ +import { JobCode, JobQueueStatus } from 'src/enum'; import { Column, ConfigurationParameter, Generated, Index, PrimaryColumn, Table } from 'src/sql-tools'; -// Job status values stored as "char" (single-byte PostgreSQL type): -// 'p' = pending, 'a' = active, 'c' = completed, 'f' = failed - -@Table('jobs_thumbnail_generation') -@Index({ name: 'IDX_jobs_thumbnail_generation_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_thumbnail_generation_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsThumbnailGenerationTable { +export class JobTable { @PrimaryColumn({ type: 'bigint', identity: true }) id!: Generated; - @Column({ type: '"char"' }) - name!: string; + @Column({ type: 'smallint' }) + code!: JobCode; @Column({ type: 'jsonb', nullable: true }) data!: unknown; @@ -27,774 +14,89 @@ export class JobsThumbnailGenerationTable { @Column({ type: 'smallint', default: 0 }) priority!: Generated; - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_metadata_extraction') -@Index({ name: 'IDX_jobs_metadata_extraction_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_metadata_extraction_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsMetadataExtractionTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; + status!: Generated; @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; + dedupKey!: string | null; @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; + runAfter!: Generated; @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; + startedAt!: Date | null; @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; + expiresAt!: Date | null; @Column({ type: 'text', nullable: true }) error!: string | null; } -@Table('jobs_video_conversion') -@Index({ name: 'IDX_jobs_video_conversion_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_video_conversion_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsVideoConversionTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; +function defineJobTable(name: string) { + class NewJobTable extends JobTable {} - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; + const decorated = [ + ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }), + ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }), + ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }), + Index({ + name: `IDX_${name}_dedup`, + columns: ['dedupKey'], + unique: true, + where: `"dedupKey" IS NOT NULL AND status = 0`, + }), + Index({ name: `IDX_${name}_pending`, columns: ['priority', 'id'], where: 'status = 0' }), + Table(name), + ].reduce((cls, dec) => dec(cls) || cls, NewJobTable); + Object.defineProperty(decorated, 'name', { value: name }); + return decorated; } -@Table('jobs_face_detection') -@Index({ name: 'IDX_jobs_face_detection_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_face_detection_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsFaceDetectionTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_facial_recognition') -@Index({ - name: 'IDX_jobs_facial_recognition_pending', - columns: ['priority', 'id'], - where: `"status" = 'p'::"char"`, -}) -@Index({ - name: 'IDX_jobs_facial_recognition_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsFacialRecognitionTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_smart_search') -@Index({ name: 'IDX_jobs_smart_search_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_smart_search_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsSmartSearchTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_duplicate_detection') -@Index({ - name: 'IDX_jobs_duplicate_detection_pending', - columns: ['priority', 'id'], - where: `"status" = 'p'::"char"`, -}) -@Index({ - name: 'IDX_jobs_duplicate_detection_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsDuplicateDetectionTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_background_task') -@Index({ name: 'IDX_jobs_background_task_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_background_task_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsBackgroundTaskTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_storage_template_migration') -@Index({ - name: 'IDX_jobs_storage_template_migration_pending', - columns: ['priority', 'id'], - where: `"status" = 'p'::"char"`, -}) -@Index({ - name: 'IDX_jobs_storage_template_migration_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsStorageTemplateMigrationTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_migration') -@Index({ name: 'IDX_jobs_migration_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_migration_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsMigrationTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_search') -@Index({ name: 'IDX_jobs_search_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_search_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsSearchTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_sidecar') -@Index({ name: 'IDX_jobs_sidecar_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_sidecar_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsSidecarTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_library') -@Index({ name: 'IDX_jobs_library_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_library_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsLibraryTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_notification') -@Index({ name: 'IDX_jobs_notification_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_notification_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsNotificationTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_backup_database') -@Index({ name: 'IDX_jobs_backup_database_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_backup_database_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsBackupDatabaseTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_ocr') -@Index({ name: 'IDX_jobs_ocr_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_ocr_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsOcrTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_workflow') -@Index({ name: 'IDX_jobs_workflow_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_workflow_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsWorkflowTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} - -@Table('jobs_editor') -@Index({ name: 'IDX_jobs_editor_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) -@Index({ - name: 'IDX_jobs_editor_dedup', - columns: ['dedup_key'], - unique: true, - where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, -}) -@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) -@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) -export class JobsEditorTable { - @PrimaryColumn({ type: 'bigint', identity: true }) - id!: Generated; - - @Column({ type: '"char"' }) - name!: string; - - @Column({ type: 'jsonb', nullable: true }) - data!: unknown; - - @Column({ type: 'smallint', default: 0 }) - priority!: Generated; - - @Column({ type: '"char"', default: 'p' }) - status!: Generated; - - @Column({ type: 'text', nullable: true }) - dedup_key!: string | null; - - @Column({ type: 'timestamp with time zone', default: () => 'now()' }) - run_after!: Generated; - - @Column({ type: 'timestamp with time zone', nullable: true }) - started_at!: Date | null; - - @Column({ type: 'timestamp with time zone', nullable: true }) - expires_at!: Date | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; -} +export const JobsThumbnailGenerationTable = defineJobTable('jobs_thumbnail_generation'); +export const JobsMetadataExtractionTable = defineJobTable('jobs_metadata_extraction'); +export const JobsVideoConversionTable = defineJobTable('jobs_video_conversion'); +export const JobsFaceDetectionTable = defineJobTable('jobs_face_detection'); +export const JobsFacialRecognitionTable = defineJobTable('jobs_facial_recognition'); +export const JobsSmartSearchTable = defineJobTable('jobs_smart_search'); +export const JobsDuplicateDetectionTable = defineJobTable('jobs_duplicate_detection'); +export const JobsBackgroundTaskTable = defineJobTable('jobs_background_task'); +export const JobsStorageTemplateMigrationTable = defineJobTable('jobs_storage_template_migration'); +export const JobsMigrationTable = defineJobTable('jobs_migration'); +export const JobsSearchTable = defineJobTable('jobs_search'); +export const JobsSidecarTable = defineJobTable('jobs_sidecar'); +export const JobsLibraryTable = defineJobTable('jobs_library'); +export const JobsNotificationTable = defineJobTable('jobs_notification'); +export const JobsBackupDatabaseTable = defineJobTable('jobs_backup_database'); +export const JobsOcrTable = defineJobTable('jobs_ocr'); +export const JobsWorkflowTable = defineJobTable('jobs_workflow'); +export const JobsEditorTable = defineJobTable('jobs_editor'); + +export type JobsThumbnailGenerationTable = InstanceType; +export type JobsMetadataExtractionTable = InstanceType; +export type JobsVideoConversionTable = InstanceType; +export type JobsFaceDetectionTable = InstanceType; +export type JobsFacialRecognitionTable = InstanceType; +export type JobsSmartSearchTable = InstanceType; +export type JobsDuplicateDetectionTable = InstanceType; +export type JobsBackgroundTaskTable = InstanceType; +export type JobsStorageTemplateMigrationTable = InstanceType; +export type JobsMigrationTable = InstanceType; +export type JobsSearchTable = InstanceType; +export type JobsSidecarTable = InstanceType; +export type JobsLibraryTable = InstanceType; +export type JobsNotificationTable = InstanceType; +export type JobsBackupDatabaseTable = InstanceType; +export type JobsOcrTable = InstanceType; +export type JobsWorkflowTable = InstanceType; +export type JobsEditorTable = InstanceType; // Queue metadata table @Table('job_queue_meta') export class JobQueueMetaTable { @PrimaryColumn({ type: 'text' }) - queue_name!: string; + queueName!: string; @Column({ type: 'boolean', default: false }) - is_paused!: Generated; + isPaused!: Generated; } diff --git a/server/src/services/queue.service.ts b/server/src/services/queue.service.ts index cdfa2ad2ed..ef50e1d34a 100644 --- a/server/src/services/queue.service.ts +++ b/server/src/services/queue.service.ts @@ -80,7 +80,7 @@ export class QueueService extends BaseService { onBootstrap() { this.jobRepository.setup(this.services); if (this.worker === ImmichWorker.Microservices) { - this.jobRepository.startWorkers(); + void this.jobRepository.startWorkers(); } } diff --git a/server/src/utils/job-queue.util.ts b/server/src/utils/job-queue.util.ts new file mode 100644 index 0000000000..584623463e --- /dev/null +++ b/server/src/utils/job-queue.util.ts @@ -0,0 +1,405 @@ +import { Kysely, Selectable, sql } from 'kysely'; +import { JOB_CODE_TO_NAME, JobCode, JobQueueStatus, QueueName } from 'src/enum'; +import { DB } from 'src/schema'; +import { JobTable } from 'src/schema/tables/job.table'; +import { JobItem } from 'src/types'; + +export type InsertRow = { + code: JobCode; + data: unknown; + priority: number | null; + dedupKey: string | null; + runAfter: Date | null; +}; + +export const getTable = (db: Kysely, queueName: QueueName) => db.dynamic.table(QUEUE_TABLE[queueName]).as('t'); + +export class QueueWorker { + activeJobCount = 0; + private concurrency: number; + private activeJobs = new Map(); + private hasPending = true; + private fetching = false; + private paused = false; + private stopped = false; + private heartbeatTimer: ReturnType | null = null; + private sweepTimer: ReturnType | null = null; + + private readonly table: ReturnType; + private readonly stallTimeout: number; + private readonly claimBatch: number; + private readonly db: Kysely; + private readonly onJobFn: (job: JobItem) => Promise; + + constructor(options: QueueWorkerOptions) { + this.stallTimeout = options.stallTimeout; + this.claimBatch = options.claimBatch; + this.concurrency = options.concurrency; + this.db = options.db; + this.table = getTable(this.db, options.queueName); + this.onJobFn = options.onJob; + + // One-shot sweep after stallTimeout to recover jobs orphaned by a crash + // that restarted before their expiry passed + this.sweepTimer = setTimeout(() => this.onNotification(), this.stallTimeout); + } + + onNotification() { + this.hasPending = true; + void this.tryFetch(); + } + + setConcurrency(n: number) { + this.concurrency = n; + void this.tryFetch(); + } + + pause() { + this.paused = true; + } + + resume() { + this.paused = false; + this.hasPending = true; + void this.tryFetch(); + } + + shutdown() { + this.stopped = true; + this.stopHeartbeat(); + if (this.sweepTimer) { + clearTimeout(this.sweepTimer); + this.sweepTimer = null; + } + + if (this.activeJobs.size === 0) { + return Promise.resolve(); + } + + // Re-queue active jobs + const ids = [...this.activeJobs.keys()]; + return this.db + .updateTable(this.table) + .set({ + status: JobQueueStatus.Pending, + startedAt: null, + expiresAt: null, + }) + .where('id', 'in', ids) + .execute(); + } + + private get slotsAvailable() { + return Math.max(0, this.concurrency - this.activeJobCount); + } + + private async tryFetch() { + if (this.fetching || this.paused || this.stopped) { + return; + } + this.fetching = true; + try { + while (this.slotsAvailable > 0 && this.hasPending && !this.stopped) { + const limit = Math.min(this.slotsAvailable, this.claimBatch); + const jobs = await this.claim(limit); + if (jobs.length === 0) { + const recovered = await this.recoverStalled(); + if (recovered.numChangedRows === 0n) { + this.hasPending = false; + break; + } + continue; + } + this.activeJobCount += jobs.length; + for (const job of jobs) { + void this.processJob(job); + } + } + } finally { + this.fetching = false; + } + } + + private async processJob(row: Selectable) { + this.activeJobs.set(row.id, { startedAt: Date.now() }); + this.startHeartbeat(); + try { + const jobName = JOB_CODE_TO_NAME[row.code]; + if (!jobName) { + throw new Error(`Unknown job char code: ${row.code}`); + } + await this.onJobFn({ name: jobName, data: row.data } as JobItem); + // Success: delete completed job and try to fetch next + const next = this.stopped ? undefined : await this.completeAndFetch(row.id, true).catch(() => undefined); + this.activeJobs.delete(row.id); + if (next) { + void this.processJob(next); + } else { + this.activeJobCount--; + this.hasPending = false; + } + } catch (error: unknown) { + // Failure: mark as failed and try to fetch next + const errorMsg = error instanceof Error ? error.message : String(error); + const next = await this.completeAndFetch(row.id, false, errorMsg).catch(() => undefined); + this.activeJobs.delete(row.id); + if (next) { + void this.processJob(next); + } else { + this.activeJobCount--; + this.hasPending = false; + } + } finally { + if (this.activeJobs.size === 0) { + this.stopHeartbeat(); + } + } + } + + /** + * Claim up to `limit` pending jobs. + * Uses a materialized CTE with FOR NO KEY UPDATE SKIP LOCKED + * to avoid race conditions and excessive locking. + */ + private claim(limit: number) { + return this.db + .with( + (wb) => wb('candidates').materialized(), + (qb) => + qb + .selectFrom(this.table) + .select('id') + .where('status', '=', JobQueueStatus.Pending) + .where('runAfter', '<=', sql`now()`) + .orderBy('priority', 'desc') + .orderBy('id', 'asc') + .limit(limit) + .forNoKeyUpdate() + .skipLocked(), + ) + .updateTable(this.table) + .set({ + status: JobQueueStatus.Active, + startedAt: sql`now()`, + expiresAt: sql`now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval`, + }) + .where((eb) => eb('id', 'in', eb.selectFrom('candidates').select('id'))) + .returningAll() + .execute(); + } + + /** + * Atomically complete a job (delete on success, mark failed on failure) and claim the next one. + * Uses a CTE to combine operations in a single round-trip. + */ + private completeAndFetch(jobId: number, success: boolean, errorMsg?: string) { + const query = this.db.with('mark', (qb) => + success + ? qb.deleteFrom(this.table).where('id', '=', jobId) + : qb + .updateTable(this.table) + .set({ status: JobQueueStatus.Failed, error: errorMsg ?? null }) + .where('id', '=', jobId), + ); + return query + .with('next', (qb) => + qb + .selectFrom(this.table) + .where('status', '=', JobQueueStatus.Pending) + .where('runAfter', '<=', sql`now()`) + .orderBy('priority', 'desc') + .orderBy('id', 'asc') + .limit(1) + .forNoKeyUpdate() + .skipLocked(), + ) + .updateTable(this.table) + .set({ + status: JobQueueStatus.Active, + startedAt: sql`now()`, + expiresAt: sql`now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval`, + }) + .where((eb) => eb('id', '=', eb.selectFrom('next').select('id'))) + .returningAll() + .executeTakeFirst(); + } + + /** + * Recover stalled jobs: reset jobs whose expires_at has passed + */ + private recoverStalled() { + return this.db + .updateTable(this.table) + .set({ + status: JobQueueStatus.Pending, + startedAt: null, + expiresAt: null, + }) + .where('status', '=', JobQueueStatus.Active) + .where('expiresAt', '<', sql`now()`) + .executeTakeFirst(); + } + + /** + * Extend expiry for all active jobs (heartbeat) + */ + private extendExpiry() { + if (this.activeJobs.size === 0) { + return; + } + const ids = [...this.activeJobs.keys()]; + return this.db + .updateTable(this.table) + .set({ + expiresAt: sql`now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval`, + }) + .where('id', 'in', ids) + .execute(); + } + + private startHeartbeat() { + if (this.heartbeatTimer) { + return; + } + this.heartbeatTimer = setInterval( + () => this.extendExpiry()?.catch(() => setTimeout(() => this.extendExpiry(), 5000)), + Math.floor(this.stallTimeout / 2), + ); + } + + private stopHeartbeat() { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + } +} + +export class WriteBuffer { + private buffers = Object.fromEntries(Object.values(QueueName).map((name) => [name as QueueName, [] as InsertRow[]])); + private pending: Deferred | null = null; + private timer: ReturnType | null = null; + + constructor( + private db: Kysely, + private notify: (queue: QueueName) => Promise, + ) {} + + async add(items: { queue: QueueName; row: InsertRow }[]): Promise { + if (items.length === 0) { + return; + } + + for (const { queue, row } of items) { + this.buffers[queue].push(row); + } + if (!this.timer) { + this.pending = createDeferred(); + this.timer = setTimeout(() => void this.flush(), 10); + } + return this.pending!.promise; + } + + async flush(): Promise { + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + const deferred = this.pending; + this.pending = null; + + const promises = []; + try { + for (const [queue, rows] of Object.entries(this.buffers)) { + if (rows.length === 0) { + continue; + } + const tableName = QUEUE_TABLE[queue as QueueName]; + promises.push(this.insertChunk(tableName, rows).then(() => this.notify(queue as QueueName))); + rows.length = 0; + } + await Promise.all(promises); + deferred?.resolve(); + } catch (error) { + deferred?.reject(error); + } + } + + private insertChunk(tableName: keyof JobTables, rows: InsertRow[]) { + return this.db + .insertInto(tableName) + .columns(['code', 'data', 'priority', 'dedupKey', 'runAfter']) + .expression((eb) => + eb + .selectFrom( + eb + .fn('unnest', [ + sql`${`{${rows.map((r) => r.code)}}`}::"smallint"[]`, + sql`${`{${rows.map((r) => { + if (!r.data) return null; + const json = JSON.stringify(r.data); + return '"' + json.replace(/\\/g, '\\\\').replace(/"/g, '\\"') + '"'; + })}}`}::jsonb[]`, + sql`${`{${rows.map((r) => r.priority)}}`}::smallint[]`, + sql`${`{${rows.map((r) => r.dedupKey)}}`}::text[]`, + sql`${`{${rows.map((r) => r.runAfter)}}`}::timestamptz[]`, + ]) + .as('v'), + ) + .selectAll(), + ) + .onConflict((oc) => + oc + .column('dedupKey') + .where('dedupKey', 'is not', null) + .where('status', '=', JobQueueStatus.Pending) + .doNothing(), + ) + .execute(); + } +} + +const QUEUE_TABLE = { + [QueueName.ThumbnailGeneration]: 'jobs_thumbnail_generation', + [QueueName.MetadataExtraction]: 'jobs_metadata_extraction', + [QueueName.VideoConversion]: 'jobs_video_conversion', + [QueueName.FaceDetection]: 'jobs_face_detection', + [QueueName.FacialRecognition]: 'jobs_facial_recognition', + [QueueName.SmartSearch]: 'jobs_smart_search', + [QueueName.DuplicateDetection]: 'jobs_duplicate_detection', + [QueueName.BackgroundTask]: 'jobs_background_task', + [QueueName.StorageTemplateMigration]: 'jobs_storage_template_migration', + [QueueName.Migration]: 'jobs_migration', + [QueueName.Search]: 'jobs_search', + [QueueName.Sidecar]: 'jobs_sidecar', + [QueueName.Library]: 'jobs_library', + [QueueName.Notification]: 'jobs_notification', + [QueueName.BackupDatabase]: 'jobs_backup_database', + [QueueName.Ocr]: 'jobs_ocr', + [QueueName.Workflow]: 'jobs_workflow', + [QueueName.Editor]: 'jobs_editor', +} as const; + +const createDeferred = (): Deferred => { + let resolve!: () => void; + let reject!: (error: unknown) => void; + const promise = new Promise((_resolve, _reject) => ((resolve = _resolve), (reject = _reject))); + return { promise, resolve, reject }; +}; + +interface QueueWorkerOptions { + queueName: QueueName; + stallTimeout: number; + claimBatch: number; + concurrency: number; + db: Kysely; + onJob: (job: JobItem) => Promise; +} + +type PickByValue = { + [K in keyof T as T[K] extends V ? K : never]: T[K]; +}; + +type JobTables = PickByValue; + +type Deferred = { promise: Promise; resolve: () => void; reject: (error: unknown) => void };