From 8bfacda3da2c27cc45ae21af7206bd1ce2af5cff Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Sat, 14 Feb 2026 02:12:40 -0500 Subject: [PATCH] retry --- server/src/repositories/job.repository.ts | 91 +++++++++++++++-------- server/src/schema/index.ts | 3 + server/src/schema/tables/job.table.ts | 44 +++++++++-- server/src/utils/job-queue.util.ts | 76 +++++++++++++++---- 4 files changed, 163 insertions(+), 51 deletions(-) diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 4b41667be2..b935b4212f 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -52,9 +52,9 @@ const getClaimBatch = (queueName: QueueName): number => { const STATUS_FILTER = { [QueueJobStatus.Active]: JobQueueStatus.Active, - [QueueJobStatus.Failed]: JobQueueStatus.Failed, + [QueueJobStatus.Failed]: null as null, // failures are in a separate table [QueueJobStatus.Waiting]: JobQueueStatus.Pending, - [QueueJobStatus.Complete]: null, // completed jobs are deleted + [QueueJobStatus.Complete]: null as null, // completed jobs are deleted [QueueJobStatus.Delayed]: JobQueueStatus.Pending, // delayed = pending with future run_after [QueueJobStatus.Paused]: JobQueueStatus.Pending, // paused queue has pending jobs }; @@ -150,6 +150,8 @@ export class JobRepository { queueName, stallTimeout: 5 * 60 * 1000, // 5 min claimBatch: getClaimBatch(queueName), + maxRetries: 5, + backoffBaseMs: 30_000, concurrency: 1, db: this.db, onJob: (job) => this.eventRepository.emit('JobRun', queueName, job), @@ -220,26 +222,33 @@ export class JobRepository { } clear(name: QueueName, _type: QueueCleanType) { - return this.db.deleteFrom(getTable(this.db, name)).where('status', '=', JobQueueStatus.Failed).execute(); + return this.db.deleteFrom('job_failures').where('queueName', '=', name).execute(); } async getJobCounts(name: QueueName): Promise { - const result = await this.db - .selectFrom(getTable(this.db, name)) - .select((eb) => ['status', eb.fn.countAll().as('count')]) - .groupBy('status') - .execute(); + const [statusResult, failedResult] = await Promise.all([ + this.db + .selectFrom(getTable(this.db, name)) + .select((eb) => ['status', eb.fn.countAll().as('count')]) + .groupBy('status') + .execute(), + this.db + .selectFrom('job_failures') + .select((eb) => eb.fn.countAll().as('count')) + .where('queueName', '=', name) + .executeTakeFirst(), + ]); const counts: JobCounts = { active: 0, completed: 0, - failed: 0, + failed: Number(failedResult?.count ?? 0), delayed: 0, waiting: 0, paused: 0, }; - for (const row of result) { + for (const row of statusResult) { switch (row.status) { case JobQueueStatus.Pending: { counts.waiting = Number(row.count); @@ -249,10 +258,6 @@ export class JobRepository { counts.active = Number(row.count); break; } - case JobQueueStatus.Failed: { - counts.failed = Number(row.count); - break; - } } } @@ -312,32 +317,58 @@ export class JobRepository { } async searchJobs(name: QueueName, dto: QueueJobSearchDto): Promise { + const requestedStatuses = dto.status ?? Object.values(QueueJobStatus); + const includeFailed = requestedStatuses.includes(QueueJobStatus.Failed); + const statuses: JobQueueStatus[] = []; - for (const status of dto.status ?? Object.values(QueueJobStatus)) { + for (const status of requestedStatuses) { const mapped = STATUS_FILTER[status]; if (mapped !== null && !statuses.includes(mapped)) { statuses.push(mapped); } } - if (statuses.length === 0) { - return []; + const results: QueueJobResponseDto[] = []; + + if (statuses.length > 0) { + const rows = await this.db + .selectFrom(getTable(this.db, name)) + .select(['id', 'code', 'data', 'runAfter']) + .where('status', 'in', statuses) + .orderBy('id', 'desc') + .limit(1000) + .execute(); + + for (const row of rows) { + results.push({ + id: String(row.id), + name: JOB_CODE_TO_NAME[row.code], + data: (row.data ?? {}) as object, + timestamp: new Date(row.runAfter).getTime(), + }); + } } - const rows = await this.db - .selectFrom(getTable(this.db, name)) - .select(['id', 'code', 'data', 'runAfter']) - .where('status', 'in', statuses) - .orderBy('id', 'desc') - .limit(1000) - .execute(); + if (includeFailed) { + const failedRows = await this.db + .selectFrom('job_failures') + .select(['id', 'code', 'data', 'failedAt']) + .where('queueName', '=', name) + .orderBy('id', 'desc') + .limit(1000) + .execute(); - return rows.map((row) => ({ - id: String(row.id), - name: JOB_CODE_TO_NAME[row.code], - data: (row.data ?? {}) as object, - timestamp: new Date(row.runAfter).getTime(), - })); + for (const row of failedRows) { + results.push({ + id: `f-${row.id}`, + name: JOB_CODE_TO_NAME[row.code], + data: (row.data ?? {}) as object, + timestamp: new Date(row.failedAt).getTime(), + }); + } + } + + return results; } private getJobOptions(item: JobItem): { dedupKey?: string; priority?: number; delay?: number } | null { diff --git a/server/src/schema/index.ts b/server/src/schema/index.ts index 283f09be13..9ba74cb56e 100644 --- a/server/src/schema/index.ts +++ b/server/src/schema/index.ts @@ -42,6 +42,7 @@ import { AuditTable } from 'src/schema/tables/audit.table'; import { FaceSearchTable } from 'src/schema/tables/face-search.table'; import { GeodataPlacesTable } from 'src/schema/tables/geodata-places.table'; import { + JobFailuresTable, JobQueueMetaTable, JobsBackgroundTaskTable, JobsBackupDatabaseTable, @@ -175,6 +176,7 @@ export class ImmichDatabase { JobsWorkflowTable, JobsEditorTable, JobQueueMetaTable, + JobFailuresTable, ]; functions = [ @@ -312,4 +314,5 @@ export interface DB { jobs_workflow: JobsWorkflowTable; jobs_editor: JobsEditorTable; job_queue_meta: JobQueueMetaTable; + job_failures: JobFailuresTable; } diff --git a/server/src/schema/tables/job.table.ts b/server/src/schema/tables/job.table.ts index 6dcab29f8d..65128f83a1 100644 --- a/server/src/schema/tables/job.table.ts +++ b/server/src/schema/tables/job.table.ts @@ -1,4 +1,4 @@ -import { JobCode, JobQueueStatus } from 'src/enum'; +import { JobCode, JobQueueStatus, QueueName } from 'src/enum'; import { Column, ConfigurationParameter, Generated, Index, PrimaryColumn, Table } from 'src/sql-tools'; export type JobTable = { @@ -9,8 +9,17 @@ export type JobTable = { code: JobCode; priority: Generated; status: Generated; + retries: Generated; data: unknown; dedupKey: string | null; +}; + +export type JobFailureTable = { + id: Generated; + failedAt: Generated; + queueName: string; + code: JobCode; + data: unknown; error: string | null; }; @@ -37,14 +46,14 @@ function defineJobTable(name: string) { @Column({ type: 'smallint', default: 0 }) status!: Generated; + @Column({ type: 'smallint', default: 0 }) + retries!: Generated; + @Column({ type: 'jsonb', nullable: true }) data!: unknown; @Column({ type: 'text', nullable: true }) dedupKey!: string | null; - - @Column({ type: 'text', nullable: true }) - error!: string | null; } const decorated = [ @@ -55,9 +64,9 @@ function defineJobTable(name: string) { name: `IDX_${name}_dedup`, columns: ['dedupKey'], unique: true, - where: `"dedupKey" IS NOT NULL AND status = 0`, + where: `"dedupKey" IS NOT NULL`, }), - Index({ name: `IDX_${name}_pending`, expression: 'priority DESC, id ASC', where: 'status = 0' }), + Index({ name: `IDX_${name}_pending`, expression: 'priority DESC, id ASC' }), Table(name), ].reduce((cls, dec) => dec(cls) || cls, JobTable); Object.defineProperty(decorated, 'name', { value: name }); @@ -111,3 +120,26 @@ export class JobQueueMetaTable { @Column({ type: 'boolean', default: false }) isPaused!: Generated; } + +// Dead-letter table for permanently failed jobs +@Table('job_failures') +@Index({ name: 'IDX_job_failures_queue', columns: ['queueName'] }) +export class JobFailuresTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + failedAt!: Generated; + + @Column({ type: 'text' }) + queueName!: QueueName; + + @Column({ type: 'smallint' }) + code!: JobCode; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} diff --git a/server/src/utils/job-queue.util.ts b/server/src/utils/job-queue.util.ts index 70a4462775..d2841d81b7 100644 --- a/server/src/utils/job-queue.util.ts +++ b/server/src/utils/job-queue.util.ts @@ -28,15 +28,21 @@ export class QueueWorker { private heartbeatTimer: ReturnType | null = null; private sweepTimer: ReturnType | null = null; + private readonly queueName: QueueName; private readonly table: ReturnType; private readonly stallTimeout: number; private readonly claimBatch: number; + private readonly maxRetries: number; + private readonly backoffBaseMs: number; private readonly db: Kysely; private readonly onJobFn: (job: JobItem) => Promise; constructor(options: QueueWorkerOptions) { + this.queueName = options.queueName; this.stallTimeout = options.stallTimeout; this.claimBatch = options.claimBatch; + this.maxRetries = options.maxRetries; + this.backoffBaseMs = options.backoffBaseMs; this.concurrency = options.concurrency; this.db = options.db; this.table = getTable(this.db, options.queueName); @@ -133,7 +139,7 @@ export class QueueWorker { } await this.onJobFn({ name: jobName, data: row.data } as JobItem); // Success: delete completed job and try to fetch next - const next = this.stopped ? undefined : await this.completeAndFetch(row.id, true).catch(() => undefined); + const next = this.stopped ? undefined : await this.completeAndFetch(row.id).catch(() => undefined); this.activeJobs.delete(row.id); if (next) { void this.processJob(next); @@ -142,9 +148,11 @@ export class QueueWorker { this.hasPending = false; } } catch (error: unknown) { - // Failure: mark as failed and try to fetch next const errorMsg = error instanceof Error ? error.message : String(error); - const next = await this.completeAndFetch(row.id, false, errorMsg).catch(() => undefined); + const next = + row.retries < this.maxRetries + ? await this.retryAndFetch(row.id, row.retries).catch(() => undefined) + : await this.deadLetterAndFetch(row, errorMsg).catch(() => undefined); this.activeJobs.delete(row.id); if (next) { void this.processJob(next); @@ -192,19 +200,55 @@ export class QueueWorker { } /** - * Atomically complete a job (delete on success, mark failed on failure) and claim the next one. - * Uses a CTE to combine operations in a single round-trip. + * Atomically delete a completed job and claim the next one. */ - private completeAndFetch(jobId: number, success: boolean, errorMsg?: string) { - const query = this.db.with('mark', (qb) => - success - ? qb.deleteFrom(this.table).where('id', '=', jobId) - : qb - .updateTable(this.table) - .set({ status: JobQueueStatus.Failed, error: errorMsg ?? null }) - .where('id', '=', jobId), + private completeAndFetch(jobId: number) { + const prefix = this.db.with('mark', (qb: any) => qb.deleteFrom(this.table).where('id', '=', jobId)); + return this.claimNext(prefix as any); + } + + /** + * Atomically retry a failed job (reset to pending with backoff) and claim the next ready one. + */ + private retryAndFetch(jobId: number, retries: number) { + const backoffMs = this.backoffBaseMs * 5 ** retries; + const prefix = this.db.with('mark', (qb) => + qb + .updateTable(this.table) + .set({ + status: JobQueueStatus.Pending, + retries: retries + 1, + runAfter: sql`now() + ${sql.lit(`'${backoffMs} milliseconds'`)}::interval`, + startedAt: null, + expiresAt: null, + }) + .where('id', '=', jobId), ); - return query + return this.claimNext(prefix as any); + } + + /** + * Atomically delete a permanently failed job, log it to the dead-letter table, and claim the next one. + */ + private deadLetterAndFetch(row: Selectable, errorMsg: string) { + const prefix = this.db + .with('mark', (qb) => qb.deleteFrom(this.table).where('id', '=', row.id)) + .with('logged', (qb) => + qb.insertInto('job_failures').values({ + queueName: this.queueName, + code: row.code, + data: row.data, + error: errorMsg, + }), + ); + return this.claimNext(prefix as any); + } + + /** + * Shared suffix: claim the next pending job. Appended after prefix CTEs (mark, logged, etc.). + */ + private claimNext(prefix: Kysely) { + return prefix .with('next', (qb) => qb .selectFrom(this.table) @@ -390,7 +434,7 @@ export class WriteBuffer { ${dedupKey}::text[], ${runAfter}::timestamptz[] ) - ON CONFLICT ("dedupKey") WHERE "dedupKey" IS NOT NULL AND status = ${JobQueueStatus.Pending} + ON CONFLICT ("dedupKey") WHERE "dedupKey" IS NOT NULL DO NOTHING `; } @@ -428,6 +472,8 @@ interface QueueWorkerOptions { queueName: QueueName; stallTimeout: number; claimBatch: number; + maxRetries: number; + backoffBaseMs: number; concurrency: number; db: Kysely; onJob: (job: JobItem) => Promise;