This commit is contained in:
mertalev
2026-02-14 02:12:40 -05:00
parent b9fceeef75
commit 8bfacda3da
4 changed files with 163 additions and 51 deletions

View File

@@ -52,9 +52,9 @@ const getClaimBatch = (queueName: QueueName): number => {
const STATUS_FILTER = {
[QueueJobStatus.Active]: JobQueueStatus.Active,
[QueueJobStatus.Failed]: JobQueueStatus.Failed,
[QueueJobStatus.Failed]: null as null, // failures are in a separate table
[QueueJobStatus.Waiting]: JobQueueStatus.Pending,
[QueueJobStatus.Complete]: null, // completed jobs are deleted
[QueueJobStatus.Complete]: null as null, // completed jobs are deleted
[QueueJobStatus.Delayed]: JobQueueStatus.Pending, // delayed = pending with future run_after
[QueueJobStatus.Paused]: JobQueueStatus.Pending, // paused queue has pending jobs
};
@@ -150,6 +150,8 @@ export class JobRepository {
queueName,
stallTimeout: 5 * 60 * 1000, // 5 min
claimBatch: getClaimBatch(queueName),
maxRetries: 5,
backoffBaseMs: 30_000,
concurrency: 1,
db: this.db,
onJob: (job) => this.eventRepository.emit('JobRun', queueName, job),
@@ -220,26 +222,33 @@ export class JobRepository {
}
clear(name: QueueName, _type: QueueCleanType) {
return this.db.deleteFrom(getTable(this.db, name)).where('status', '=', JobQueueStatus.Failed).execute();
return this.db.deleteFrom('job_failures').where('queueName', '=', name).execute();
}
async getJobCounts(name: QueueName): Promise<JobCounts> {
const result = await this.db
.selectFrom(getTable(this.db, name))
.select((eb) => ['status', eb.fn.countAll<number>().as('count')])
.groupBy('status')
.execute();
const [statusResult, failedResult] = await Promise.all([
this.db
.selectFrom(getTable(this.db, name))
.select((eb) => ['status', eb.fn.countAll<number>().as('count')])
.groupBy('status')
.execute(),
this.db
.selectFrom('job_failures')
.select((eb) => eb.fn.countAll<number>().as('count'))
.where('queueName', '=', name)
.executeTakeFirst(),
]);
const counts: JobCounts = {
active: 0,
completed: 0,
failed: 0,
failed: Number(failedResult?.count ?? 0),
delayed: 0,
waiting: 0,
paused: 0,
};
for (const row of result) {
for (const row of statusResult) {
switch (row.status) {
case JobQueueStatus.Pending: {
counts.waiting = Number(row.count);
@@ -249,10 +258,6 @@ export class JobRepository {
counts.active = Number(row.count);
break;
}
case JobQueueStatus.Failed: {
counts.failed = Number(row.count);
break;
}
}
}
@@ -312,32 +317,58 @@ export class JobRepository {
}
async searchJobs(name: QueueName, dto: QueueJobSearchDto): Promise<QueueJobResponseDto[]> {
const requestedStatuses = dto.status ?? Object.values(QueueJobStatus);
const includeFailed = requestedStatuses.includes(QueueJobStatus.Failed);
const statuses: JobQueueStatus[] = [];
for (const status of dto.status ?? Object.values(QueueJobStatus)) {
for (const status of requestedStatuses) {
const mapped = STATUS_FILTER[status];
if (mapped !== null && !statuses.includes(mapped)) {
statuses.push(mapped);
}
}
if (statuses.length === 0) {
return [];
const results: QueueJobResponseDto[] = [];
if (statuses.length > 0) {
const rows = await this.db
.selectFrom(getTable(this.db, name))
.select(['id', 'code', 'data', 'runAfter'])
.where('status', 'in', statuses)
.orderBy('id', 'desc')
.limit(1000)
.execute();
for (const row of rows) {
results.push({
id: String(row.id),
name: JOB_CODE_TO_NAME[row.code],
data: (row.data ?? {}) as object,
timestamp: new Date(row.runAfter).getTime(),
});
}
}
const rows = await this.db
.selectFrom(getTable(this.db, name))
.select(['id', 'code', 'data', 'runAfter'])
.where('status', 'in', statuses)
.orderBy('id', 'desc')
.limit(1000)
.execute();
if (includeFailed) {
const failedRows = await this.db
.selectFrom('job_failures')
.select(['id', 'code', 'data', 'failedAt'])
.where('queueName', '=', name)
.orderBy('id', 'desc')
.limit(1000)
.execute();
return rows.map((row) => ({
id: String(row.id),
name: JOB_CODE_TO_NAME[row.code],
data: (row.data ?? {}) as object,
timestamp: new Date(row.runAfter).getTime(),
}));
for (const row of failedRows) {
results.push({
id: `f-${row.id}`,
name: JOB_CODE_TO_NAME[row.code],
data: (row.data ?? {}) as object,
timestamp: new Date(row.failedAt).getTime(),
});
}
}
return results;
}
private getJobOptions(item: JobItem): { dedupKey?: string; priority?: number; delay?: number } | null {

View File

@@ -42,6 +42,7 @@ import { AuditTable } from 'src/schema/tables/audit.table';
import { FaceSearchTable } from 'src/schema/tables/face-search.table';
import { GeodataPlacesTable } from 'src/schema/tables/geodata-places.table';
import {
JobFailuresTable,
JobQueueMetaTable,
JobsBackgroundTaskTable,
JobsBackupDatabaseTable,
@@ -175,6 +176,7 @@ export class ImmichDatabase {
JobsWorkflowTable,
JobsEditorTable,
JobQueueMetaTable,
JobFailuresTable,
];
functions = [
@@ -312,4 +314,5 @@ export interface DB {
jobs_workflow: JobsWorkflowTable;
jobs_editor: JobsEditorTable;
job_queue_meta: JobQueueMetaTable;
job_failures: JobFailuresTable;
}

View File

@@ -1,4 +1,4 @@
import { JobCode, JobQueueStatus } from 'src/enum';
import { JobCode, JobQueueStatus, QueueName } from 'src/enum';
import { Column, ConfigurationParameter, Generated, Index, PrimaryColumn, Table } from 'src/sql-tools';
export type JobTable = {
@@ -9,8 +9,17 @@ export type JobTable = {
code: JobCode;
priority: Generated<number>;
status: Generated<JobQueueStatus>;
retries: Generated<number>;
data: unknown;
dedupKey: string | null;
};
export type JobFailureTable = {
id: Generated<number>;
failedAt: Generated<Date>;
queueName: string;
code: JobCode;
data: unknown;
error: string | null;
};
@@ -37,14 +46,14 @@ function defineJobTable(name: string) {
@Column({ type: 'smallint', default: 0 })
status!: Generated<JobQueueStatus>;
@Column({ type: 'smallint', default: 0 })
retries!: Generated<number>;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'text', nullable: true })
dedupKey!: string | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
const decorated = [
@@ -55,9 +64,9 @@ function defineJobTable(name: string) {
name: `IDX_${name}_dedup`,
columns: ['dedupKey'],
unique: true,
where: `"dedupKey" IS NOT NULL AND status = 0`,
where: `"dedupKey" IS NOT NULL`,
}),
Index({ name: `IDX_${name}_pending`, expression: 'priority DESC, id ASC', where: 'status = 0' }),
Index({ name: `IDX_${name}_pending`, expression: 'priority DESC, id ASC' }),
Table(name),
].reduce((cls, dec) => dec(cls) || cls, JobTable);
Object.defineProperty(decorated, 'name', { value: name });
@@ -111,3 +120,26 @@ export class JobQueueMetaTable {
@Column({ type: 'boolean', default: false })
isPaused!: Generated<boolean>;
}
// Dead-letter table for permanently failed jobs
@Table('job_failures')
@Index({ name: 'IDX_job_failures_queue', columns: ['queueName'] })
export class JobFailuresTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
failedAt!: Generated<Date>;
@Column({ type: 'text' })
queueName!: QueueName;
@Column({ type: 'smallint' })
code!: JobCode;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'text', nullable: true })
error!: string | null;
}

View File

@@ -28,15 +28,21 @@ export class QueueWorker {
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private sweepTimer: ReturnType<typeof setTimeout> | null = null;
private readonly queueName: QueueName;
private readonly table: ReturnType<typeof getTable>;
private readonly stallTimeout: number;
private readonly claimBatch: number;
private readonly maxRetries: number;
private readonly backoffBaseMs: number;
private readonly db: Kysely<DB>;
private readonly onJobFn: (job: JobItem) => Promise<unknown>;
constructor(options: QueueWorkerOptions) {
this.queueName = options.queueName;
this.stallTimeout = options.stallTimeout;
this.claimBatch = options.claimBatch;
this.maxRetries = options.maxRetries;
this.backoffBaseMs = options.backoffBaseMs;
this.concurrency = options.concurrency;
this.db = options.db;
this.table = getTable(this.db, options.queueName);
@@ -133,7 +139,7 @@ export class QueueWorker {
}
await this.onJobFn({ name: jobName, data: row.data } as JobItem);
// Success: delete completed job and try to fetch next
const next = this.stopped ? undefined : await this.completeAndFetch(row.id, true).catch(() => undefined);
const next = this.stopped ? undefined : await this.completeAndFetch(row.id).catch(() => undefined);
this.activeJobs.delete(row.id);
if (next) {
void this.processJob(next);
@@ -142,9 +148,11 @@ export class QueueWorker {
this.hasPending = false;
}
} catch (error: unknown) {
// Failure: mark as failed and try to fetch next
const errorMsg = error instanceof Error ? error.message : String(error);
const next = await this.completeAndFetch(row.id, false, errorMsg).catch(() => undefined);
const next =
row.retries < this.maxRetries
? await this.retryAndFetch(row.id, row.retries).catch(() => undefined)
: await this.deadLetterAndFetch(row, errorMsg).catch(() => undefined);
this.activeJobs.delete(row.id);
if (next) {
void this.processJob(next);
@@ -192,19 +200,55 @@ export class QueueWorker {
}
/**
* Atomically complete a job (delete on success, mark failed on failure) and claim the next one.
* Uses a CTE to combine operations in a single round-trip.
* Atomically delete a completed job and claim the next one.
*/
private completeAndFetch(jobId: number, success: boolean, errorMsg?: string) {
const query = this.db.with('mark', (qb) =>
success
? qb.deleteFrom(this.table).where('id', '=', jobId)
: qb
.updateTable(this.table)
.set({ status: JobQueueStatus.Failed, error: errorMsg ?? null })
.where('id', '=', jobId),
private completeAndFetch(jobId: number) {
const prefix = this.db.with('mark', (qb: any) => qb.deleteFrom(this.table).where('id', '=', jobId));
return this.claimNext(prefix as any);
}
/**
* Atomically retry a failed job (reset to pending with backoff) and claim the next ready one.
*/
private retryAndFetch(jobId: number, retries: number) {
const backoffMs = this.backoffBaseMs * 5 ** retries;
const prefix = this.db.with('mark', (qb) =>
qb
.updateTable(this.table)
.set({
status: JobQueueStatus.Pending,
retries: retries + 1,
runAfter: sql<Date>`now() + ${sql.lit(`'${backoffMs} milliseconds'`)}::interval`,
startedAt: null,
expiresAt: null,
})
.where('id', '=', jobId),
);
return query
return this.claimNext(prefix as any);
}
/**
* Atomically delete a permanently failed job, log it to the dead-letter table, and claim the next one.
*/
private deadLetterAndFetch(row: Selectable<JobTable>, errorMsg: string) {
const prefix = this.db
.with('mark', (qb) => qb.deleteFrom(this.table).where('id', '=', row.id))
.with('logged', (qb) =>
qb.insertInto('job_failures').values({
queueName: this.queueName,
code: row.code,
data: row.data,
error: errorMsg,
}),
);
return this.claimNext(prefix as any);
}
/**
* Shared suffix: claim the next pending job. Appended after prefix CTEs (mark, logged, etc.).
*/
private claimNext(prefix: Kysely<DB>) {
return prefix
.with('next', (qb) =>
qb
.selectFrom(this.table)
@@ -390,7 +434,7 @@ export class WriteBuffer {
${dedupKey}::text[],
${runAfter}::timestamptz[]
)
ON CONFLICT ("dedupKey") WHERE "dedupKey" IS NOT NULL AND status = ${JobQueueStatus.Pending}
ON CONFLICT ("dedupKey") WHERE "dedupKey" IS NOT NULL
DO NOTHING
`;
}
@@ -428,6 +472,8 @@ interface QueueWorkerOptions {
queueName: QueueName;
stallTimeout: number;
claimBatch: number;
maxRetries: number;
backoffBaseMs: number;
concurrency: number;
db: Kysely<DB>;
onJob: (job: JobItem) => Promise<unknown>;