This commit is contained in:
mertalev
2026-02-12 21:09:42 -05:00
parent c5c8fc56a5
commit 4ee7a39e7a
7 changed files with 662 additions and 1337 deletions

View File

@@ -569,6 +569,13 @@ export enum QueueName {
Editor = 'editor',
}
export const JobQueueStatus = {
Pending: 0,
Active: 1,
Failed: 2,
} as const;
export type JobQueueStatus = (typeof JobQueueStatus)[keyof typeof JobQueueStatus];
export enum QueueJobStatus {
Active = 'active',
Failed = 'failed',
@@ -658,6 +665,12 @@ export enum JobName {
WorkflowRun = 'WorkflowRun',
}
type JobNameValue = (typeof JobName)[keyof typeof JobName];
const names = Object.values(JobName);
export const JobCode = Object.fromEntries(names.map((key, i) => [key, i])) as Record<JobNameValue, number>;
export const JOB_CODE_TO_NAME = Object.fromEntries(names.map((key, i) => [i, key])) as Record<number, JobNameValue>;
export type JobCode = (typeof JobCode)[keyof typeof JobCode];
export enum QueueCommand {
Start = 'start',
/** @deprecated Use `updateQueue` instead */

View File

@@ -1,21 +1,30 @@
import { Injectable } from '@nestjs/common';
import { ModuleRef, Reflector } from '@nestjs/core';
import { Kysely, sql } from 'kysely';
import { ClassConstructor } from 'class-transformer';
import { setTimeout } from 'node:timers/promises';
import { Kysely, sql } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { setTimeout } from 'node:timers/promises';
import postgres from 'postgres';
import { JobConfig } from 'src/decorators';
import { QueueJobResponseDto, QueueJobSearchDto } from 'src/dtos/queue.dto';
import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueJobStatus, QueueName } from 'src/enum';
import {
JOB_CODE_TO_NAME,
JobCode,
JobName,
JobQueueStatus,
JobStatus,
MetadataKey,
QueueCleanType,
QueueJobStatus,
QueueName,
} from 'src/enum';
import { ConfigRepository } from 'src/repositories/config.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { QUEUE_TABLE, WriteBuffer } from 'src/repositories/job.write-buffer';
import { charToJobName, jobNameToChar, QueueWorker } from 'src/repositories/job.worker';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { DB } from 'src/schema';
import { JobCounts, JobItem, JobOf } from 'src/types';
import { asPostgresConnectionConfig } from 'src/utils/database';
import { getTable, InsertRow, QueueWorker, WriteBuffer } from 'src/utils/job-queue.util';
import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc';
type JobMapItem = {
@@ -25,57 +34,29 @@ type JobMapItem = {
label: string;
};
// Status char codes
const STATUS_PENDING = 'p';
const STATUS_ACTIVE = 'a';
const STATUS_FAILED = 'f';
// Stall timeouts in milliseconds
const STALL_LONG = 60 * 60 * 1000; // 1 hour
const STALL_MEDIUM = 30 * 60 * 1000; // 30 min
const STALL_DEFAULT = 5 * 60 * 1000; // 5 min
const getStallTimeout = (queueName: QueueName): number => {
switch (queueName) {
case QueueName.VideoConversion:
case QueueName.BackupDatabase:
case QueueName.Editor: {
return STALL_LONG;
}
case QueueName.Library:
case QueueName.StorageTemplateMigration: {
return STALL_MEDIUM;
}
default: {
return STALL_DEFAULT;
}
}
};
const getClaimBatch = (queueName: QueueName): number => {
switch (queueName) {
case QueueName.VideoConversion:
case QueueName.BackupDatabase:
case QueueName.StorageTemplateMigration:
case QueueName.Editor:
case QueueName.FacialRecognition:
case QueueName.DuplicateDetection: {
case QueueName.VideoConversion: {
return 1;
}
case QueueName.FaceDetection:
case QueueName.SmartSearch:
case QueueName.Ocr: {
return 2;
}
default: {
return 100; // will be clamped to slotsAvailable by the worker
}
}
};
// Map QueueJobStatus to our "char" status codes
const STATUS_FILTER: Record<QueueJobStatus, string | null> = {
[QueueJobStatus.Active]: STATUS_ACTIVE,
[QueueJobStatus.Failed]: STATUS_FAILED,
[QueueJobStatus.Waiting]: STATUS_PENDING,
const STATUS_FILTER = {
[QueueJobStatus.Active]: JobQueueStatus.Active,
[QueueJobStatus.Failed]: JobQueueStatus.Failed,
[QueueJobStatus.Waiting]: JobQueueStatus.Pending,
[QueueJobStatus.Complete]: null, // completed jobs are deleted
[QueueJobStatus.Delayed]: STATUS_PENDING, // delayed = pending with future run_after
[QueueJobStatus.Paused]: STATUS_PENDING, // paused queue has pending jobs
[QueueJobStatus.Delayed]: JobQueueStatus.Pending, // delayed = pending with future run_after
[QueueJobStatus.Paused]: JobQueueStatus.Pending, // paused queue has pending jobs
};
@Injectable()
@@ -84,6 +65,7 @@ export class JobRepository {
private handlers: Partial<Record<JobName, JobMapItem>> = {};
private writeBuffer!: WriteBuffer;
private listenConn: postgres.Sql | null = null;
private listenReady = false;
private pauseState: Partial<Record<QueueName, boolean>> = {};
constructor(
@@ -146,57 +128,38 @@ export class JobRepository {
}
}
startWorkers() {
async startWorkers() {
this.writeBuffer = new WriteBuffer(this.db, (queue) => this.notify(queue));
// Startup sweep: reset any active jobs from a previous crash
const startupPromises = Object.values(QueueName).map(async (queueName) => {
const tableName = QUEUE_TABLE[queueName];
await sql`
UPDATE ${sql.table(tableName)}
SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL
WHERE "status" = ${STATUS_ACTIVE}::"char"
`.execute(this.db);
});
await Promise.all(
Object.values(QueueName).map((queueName) =>
this.db
.updateTable(getTable(this.db, queueName))
.set({ status: JobQueueStatus.Pending, startedAt: null, expiresAt: null })
.where('status', '=', JobQueueStatus.Active)
.where('expiresAt', '<', sql<Date>`now()`)
.execute(),
),
);
// Load pause state and setup workers
void Promise.all(startupPromises).then(async () => {
// Load pause state from DB
const metaRows = await this.db.selectFrom('job_queue_meta').selectAll().execute();
for (const row of metaRows) {
this.pauseState[row.queue_name as QueueName] = row.is_paused;
}
// Create workers
for (const queueName of Object.values(QueueName)) {
this.workers[queueName] = new QueueWorker({
queueName,
stallTimeout: 5 * 60 * 1000, // 5 min
claimBatch: getClaimBatch(queueName),
concurrency: 1,
db: this.db,
onJob: (job) => this.eventRepository.emit('JobRun', queueName, job),
});
}
// Create workers
for (const queueName of Object.values(QueueName)) {
const worker = new QueueWorker({
queueName,
tableName: QUEUE_TABLE[queueName],
stallTimeout: getStallTimeout(queueName),
claimBatch: getClaimBatch(queueName),
concurrency: 1,
db: this.db,
onJob: (job) => this.eventRepository.emit('JobRun', queueName, job),
});
if (this.pauseState[queueName]) {
worker.pause();
}
this.workers[queueName] = worker;
}
// Setup LISTEN/NOTIFY
await this.setupListen();
// Trigger initial fetch for all workers
for (const worker of Object.values(this.workers)) {
worker.onNotification();
}
});
// Setup LISTEN/NOTIFY, sync pause state, and trigger initial fetch
await this.setupListen();
}
async run({ name, data }: JobItem) {
run({ name, data }: JobItem) {
const item = this.handlers[name as JobName];
if (!item) {
this.logger.warn(`Skipping unknown job: "${name}"`);
@@ -216,9 +179,14 @@ export class JobRepository {
worker.setConcurrency(concurrency);
}
isActive(name: QueueName): Promise<boolean> {
const worker = this.workers[name];
return Promise.resolve(worker ? worker.activeJobCount > 0 : false);
async isActive(name: QueueName): Promise<boolean> {
const result = await this.db
.selectFrom(getTable(this.db, name))
.select('id')
.where('status', '=', JobQueueStatus.Active)
.limit(1)
.executeTakeFirst();
return result !== undefined;
}
isPaused(name: QueueName): Promise<boolean> {
@@ -229,37 +197,38 @@ export class JobRepository {
this.pauseState[name] = true;
await this.db
.insertInto('job_queue_meta')
.values({ queue_name: name, is_paused: true })
.onConflict((oc) => oc.column('queue_name').doUpdateSet({ is_paused: true }))
.values({ queueName: name, isPaused: true })
.onConflict((oc) => oc.column('queueName').doUpdateSet({ isPaused: true }))
.execute();
this.workers[name]?.pause();
await this.notify(name, 'pause');
}
async resume(name: QueueName) {
this.pauseState[name] = false;
await this.db
.insertInto('job_queue_meta')
.values({ queue_name: name, is_paused: false })
.onConflict((oc) => oc.column('queue_name').doUpdateSet({ is_paused: false }))
.values({ queueName: name, isPaused: false })
.onConflict((oc) => oc.column('queueName').doUpdateSet({ isPaused: false }))
.execute();
this.workers[name]?.resume();
await this.notify(name, 'resume');
}
async empty(name: QueueName) {
const tableName = QUEUE_TABLE[name];
await sql`DELETE FROM ${sql.table(tableName)} WHERE "status" = ${STATUS_PENDING}::"char"`.execute(this.db);
empty(name: QueueName) {
return this.db.deleteFrom(getTable(this.db, name)).where('status', '=', JobQueueStatus.Pending).execute();
}
async clear(name: QueueName, _type: QueueCleanType) {
const tableName = QUEUE_TABLE[name];
await sql`DELETE FROM ${sql.table(tableName)} WHERE "status" = ${STATUS_FAILED}::"char"`.execute(this.db);
clear(name: QueueName, _type: QueueCleanType) {
return this.db.deleteFrom(getTable(this.db, name)).where('status', '=', JobQueueStatus.Failed).execute();
}
async getJobCounts(name: QueueName): Promise<JobCounts> {
const tableName = QUEUE_TABLE[name];
const result = await sql<{ status: string; count: string }>`
SELECT "status", count(*)::text as count FROM ${sql.table(tableName)} GROUP BY "status"
`.execute(this.db);
const result = await this.db
.selectFrom(getTable(this.db, name))
.select((eb) => ['status', eb.fn.countAll<number>().as('count')])
.groupBy('status')
.execute();
const counts: JobCounts = {
active: 0,
@@ -270,29 +239,23 @@ export class JobRepository {
paused: 0,
};
for (const row of result.rows) {
for (const row of result) {
switch (row.status) {
case STATUS_PENDING: {
case JobQueueStatus.Pending: {
counts.waiting = Number(row.count);
break;
}
case STATUS_ACTIVE: {
case JobQueueStatus.Active: {
counts.active = Number(row.count);
break;
}
case STATUS_FAILED: {
case JobQueueStatus.Failed: {
counts.failed = Number(row.count);
break;
}
}
}
// In-memory active count may be more accurate than DB for in-flight jobs
const worker = this.workers[name];
if (worker) {
counts.active = worker.activeJobCount;
}
if (this.pauseState[name]) {
counts.paused = counts.waiting;
counts.waiting = 0;
@@ -305,32 +268,31 @@ export class JobRepository {
return (this.handlers[name] as JobMapItem).queueName;
}
async queueAll(items: JobItem[]): Promise<void> {
queueAll(items: JobItem[]): Promise<void> {
if (items.length === 0) {
return;
return Promise.resolve();
}
const bufferItems: { queue: QueueName; row: { name: string; data: unknown; priority: number; dedup_key: string | null; run_after: Date } }[] = [];
const bufferItems: { queue: QueueName; row: InsertRow }[] = [];
for (const item of items) {
const queueName = this.getQueueName(item.name);
const options = this.getJobOptions(item);
bufferItems.push({
queue: queueName,
row: {
name: jobNameToChar(item.name),
data: item.data || {},
priority: options?.priority ?? 0,
dedup_key: options?.dedupKey ?? null,
run_after: options?.delay ? new Date(Date.now() + options.delay) : new Date(),
code: JobCode[item.name],
data: item.data ?? null,
priority: options?.priority ?? null,
dedupKey: options?.dedupKey ?? null,
runAfter: options?.delay ? new Date(Date.now() + options.delay) : null,
},
});
}
await this.writeBuffer.add(bufferItems);
return this.writeBuffer.add(bufferItems);
}
async queue(item: JobItem): Promise<void> {
queue(item: JobItem): Promise<void> {
return this.queueAll([item]);
}
@@ -350,31 +312,31 @@ export class JobRepository {
}
async searchJobs(name: QueueName, dto: QueueJobSearchDto): Promise<QueueJobResponseDto[]> {
const tableName = QUEUE_TABLE[name];
const statuses = dto.status ?? Object.values(QueueJobStatus);
const charStatuses = statuses
.map((s) => STATUS_FILTER[s])
.filter((s): s is string => s !== null);
const statuses: JobQueueStatus[] = [];
for (const status of dto.status ?? Object.values(QueueJobStatus)) {
const mapped = STATUS_FILTER[status];
if (mapped !== null && !statuses.includes(mapped)) {
statuses.push(mapped);
}
}
if (charStatuses.length === 0) {
if (statuses.length === 0) {
return [];
}
const uniqueStatuses = [...new Set(charStatuses)];
const rows = await this.db
.selectFrom(getTable(this.db, name))
.select(['id', 'code', 'data', 'runAfter'])
.where('status', 'in', statuses)
.orderBy('id', 'desc')
.limit(1000)
.execute();
const rows = await sql<{ id: number; name: string; data: unknown; run_after: Date }>`
SELECT "id", "name", "data", "run_after"
FROM ${sql.table(tableName)}
WHERE "status" = ANY(${sql.val(uniqueStatuses)}::"char"[])
ORDER BY "id" DESC
LIMIT 1000
`.execute(this.db);
return rows.rows.map((row) => ({
return rows.map((row) => ({
id: String(row.id),
name: charToJobName(row.name) ?? (row.name as unknown as JobName),
name: JOB_CODE_TO_NAME[row.code],
data: (row.data ?? {}) as object,
timestamp: new Date(row.run_after).getTime(),
timestamp: new Date(row.runAfter).getTime(),
}));
}
@@ -403,13 +365,20 @@ export class JobRepository {
/** @deprecated */
// todo: remove this when asset notifications no longer need it.
public async removeJob(name: JobName, jobID: string): Promise<void> {
const queueName = this.getQueueName(name);
const tableName = QUEUE_TABLE[queueName];
await sql`DELETE FROM ${sql.table(tableName)} WHERE "id" = ${Number(jobID)}`.execute(this.db);
removeJob(name: JobName, dedupKey: string) {
return this.db
.deleteFrom(getTable(this.db, this.getQueueName(name)))
.where('dedupKey', '=', dedupKey)
.where('status', '=', JobQueueStatus.Pending)
.execute();
}
private async setupListen(): Promise<void> {
if (this.listenConn) {
await this.listenConn.end();
this.listenConn = null;
}
const { database } = this.configRepository.getEnv();
const pgConfig = asPostgresConnectionConfig(database.config);
this.listenConn = postgres({
@@ -423,14 +392,69 @@ export class JobRepository {
});
for (const queueName of Object.values(QueueName)) {
await this.listenConn.listen(`jobs:${queueName}`, () => {
this.workers[queueName]?.onNotification();
});
await this.listenConn.listen(
`jobs:${queueName}`,
(payload) => this.onNotify(queueName, payload),
() => this.onReconnect(),
);
}
this.listenReady = true;
await this.syncPauseState();
for (const worker of Object.values(this.workers)) {
worker.onNotification();
}
}
private async notify(queue: QueueName): Promise<void> {
await sql`SELECT pg_notify(${`jobs:${queue}`}, '')`.execute(this.db);
private onNotify(queueName: QueueName, payload: string) {
switch (payload) {
case 'pause': {
this.pauseState[queueName] = true;
this.workers[queueName]?.pause();
break;
}
case 'resume': {
this.pauseState[queueName] = false;
this.workers[queueName]?.resume();
break;
}
default: {
this.workers[queueName]?.onNotification();
break;
}
}
}
private onReconnect() {
if (!this.listenReady) {
return;
}
this.listenReady = false;
this.logger.log('LISTEN connection re-established, syncing state');
void this.syncPauseState().then(() => {
for (const worker of Object.values(this.workers)) {
worker.onNotification();
}
this.listenReady = true;
});
}
private async syncPauseState(): Promise<void> {
const metaRows = await this.db.selectFrom('job_queue_meta').selectAll().execute();
for (const row of metaRows) {
const queueName = row.queueName as QueueName;
const wasPaused = this.pauseState[queueName] ?? false;
this.pauseState[queueName] = row.isPaused;
if (wasPaused && !row.isPaused) {
this.workers[queueName]?.resume();
} else if (!wasPaused && row.isPaused) {
this.workers[queueName]?.pause();
}
}
}
private notify(queue: QueueName, payload = '') {
return sql`SELECT pg_notify(${`jobs:${queue}`}, ${payload})`.execute(this.db);
}
async onShutdown(): Promise<void> {

View File

@@ -1,298 +0,0 @@
import { Kysely, sql } from 'kysely';
import { JobName, QueueName } from 'src/enum';
import { DB } from 'src/schema';
import { JobItem } from 'src/types';
// Job status codes stored as "char" (single-byte PostgreSQL type)
const STATUS_PENDING = 'p';
const STATUS_ACTIVE = 'a';
const STATUS_FAILED = 'f';
// Bidirectional JobName <-> "char" mapping
const JOB_CHAR: Record<string, string> = {};
const CHAR_JOB: Record<string, JobName> = {};
// Assign sequential character codes starting from 0x01
let charCode = 1;
for (const jobName of Object.values(JobName)) {
const char = String.fromCodePoint(charCode++);
JOB_CHAR[jobName] = char;
CHAR_JOB[char] = jobName;
}
export const jobNameToChar = (name: JobName): string => JOB_CHAR[name];
export const charToJobName = (char: string): JobName | undefined => CHAR_JOB[char];
type JobRow = {
id: number;
name: string;
data: unknown;
priority: number;
status: string;
dedup_key: string | null;
run_after: Date;
started_at: Date | null;
expires_at: Date | null;
error: string | null;
};
export interface QueueWorkerOptions {
queueName: QueueName;
tableName: string;
stallTimeout: number;
claimBatch: number;
concurrency: number;
db: Kysely<DB>;
onJob: (job: JobItem) => Promise<void>;
}
export class QueueWorker {
private concurrency: number;
private activeCount = 0;
private activeJobs = new Map<number, { startedAt: number }>();
private hasPending = true;
private fetching = false;
private paused = false;
private stopped = false;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private readonly queueName: QueueName;
private readonly tableName: string;
private readonly stallTimeout: number;
private readonly claimBatch: number;
private readonly db: Kysely<DB>;
private readonly onJobFn: (job: JobItem) => Promise<void>;
constructor(options: QueueWorkerOptions) {
this.queueName = options.queueName;
this.tableName = options.tableName;
this.stallTimeout = options.stallTimeout;
this.claimBatch = options.claimBatch;
this.concurrency = options.concurrency;
this.db = options.db;
this.onJobFn = options.onJob;
}
get activeJobCount(): number {
return this.activeCount;
}
onNotification(): void {
this.hasPending = true;
void this.tryFetch();
}
setConcurrency(n: number): void {
this.concurrency = n;
void this.tryFetch();
}
pause(): void {
this.paused = true;
}
resume(): void {
this.paused = false;
this.hasPending = true;
void this.tryFetch();
}
async shutdown(): Promise<void> {
this.stopped = true;
this.stopHeartbeat();
// Re-queue active jobs
if (this.activeJobs.size > 0) {
const ids = [...this.activeJobs.keys()];
await sql`
UPDATE ${sql.table(this.tableName)}
SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL
WHERE "id" = ANY(${sql.val(ids)}::bigint[])
`.execute(this.db);
}
}
private get slotsAvailable(): number {
return Math.max(0, this.concurrency - this.activeCount);
}
private async tryFetch(): Promise<void> {
if (this.fetching || this.paused || this.stopped) {
return;
}
this.fetching = true;
try {
while (this.slotsAvailable > 0 && this.hasPending && !this.stopped) {
const limit = Math.min(this.slotsAvailable, this.claimBatch);
const jobs = await this.claim(limit);
if (jobs.length === 0) {
const recovered = await this.recoverStalled();
if (recovered === 0) {
this.hasPending = false;
break;
}
continue;
}
this.activeCount += jobs.length;
for (const job of jobs) {
void this.processJob(job);
}
}
} finally {
this.fetching = false;
}
}
private async processJob(row: JobRow): Promise<void> {
this.activeJobs.set(row.id, { startedAt: Date.now() });
this.startHeartbeat();
try {
const jobName = charToJobName(row.name);
if (!jobName) {
throw new Error(`Unknown job char code: ${row.name.codePointAt(0)}`);
}
await this.onJobFn({ name: jobName, data: row.data } as JobItem);
// Success: delete completed job and try to fetch next
const next = await this.completeAndFetch(row.id, true);
this.activeJobs.delete(row.id);
if (next) {
void this.processJob(next);
} else {
this.activeCount--;
this.hasPending = false;
}
} catch (error: unknown) {
// Failure: mark as failed and try to fetch next
const errorMsg = error instanceof Error ? error.message : String(error);
const next = await this.completeAndFetch(row.id, false, errorMsg);
this.activeJobs.delete(row.id);
if (next) {
void this.processJob(next);
} else {
this.activeCount--;
this.hasPending = false;
}
} finally {
if (this.activeJobs.size === 0) {
this.stopHeartbeat();
}
}
}
/**
* Claim up to `limit` pending jobs using FOR UPDATE SKIP LOCKED
*/
private async claim(limit: number): Promise<JobRow[]> {
const result = await sql<JobRow>`
UPDATE ${sql.table(this.tableName)} SET
"status" = ${STATUS_ACTIVE}::"char",
"started_at" = now(),
"expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval
WHERE "id" IN (
SELECT "id" FROM ${sql.table(this.tableName)}
WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now()
ORDER BY "priority" DESC, "id" ASC
FOR UPDATE SKIP LOCKED
LIMIT ${sql.lit(limit)}
)
RETURNING *
`.execute(this.db);
return result.rows as JobRow[];
}
/**
* Atomically complete a job (delete on success, mark failed on failure) and claim the next one.
* Uses a CTE to combine operations in a single round-trip.
*/
private async completeAndFetch(
jobId: number,
success: boolean,
errorMsg?: string,
): Promise<JobRow | undefined> {
if (success) {
const result = await sql<JobRow>`
WITH completed AS (
DELETE FROM ${sql.table(this.tableName)} WHERE "id" = ${jobId}
),
next AS (
SELECT "id" FROM ${sql.table(this.tableName)}
WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now()
ORDER BY "priority" DESC, "id" ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE ${sql.table(this.tableName)} SET
"status" = ${STATUS_ACTIVE}::"char",
"started_at" = now(),
"expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval
WHERE "id" = (SELECT "id" FROM next)
RETURNING *
`.execute(this.db);
return (result.rows as JobRow[])[0];
}
const result = await sql<JobRow>`
WITH failed AS (
UPDATE ${sql.table(this.tableName)}
SET "status" = ${STATUS_FAILED}::"char", "error" = ${errorMsg ?? null}
WHERE "id" = ${jobId}
),
next AS (
SELECT "id" FROM ${sql.table(this.tableName)}
WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now()
ORDER BY "priority" DESC, "id" ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE ${sql.table(this.tableName)} SET
"status" = ${STATUS_ACTIVE}::"char",
"started_at" = now(),
"expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval
WHERE "id" = (SELECT "id" FROM next)
RETURNING *
`.execute(this.db);
return (result.rows as JobRow[])[0];
}
/**
* Recover stalled jobs: reset jobs whose expires_at has passed
*/
private async recoverStalled(): Promise<number> {
const result = await sql`
UPDATE ${sql.table(this.tableName)}
SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL
WHERE "status" = ${STATUS_ACTIVE}::"char" AND "expires_at" < now()
`.execute(this.db);
return Number(result.numAffectedRows ?? 0);
}
/**
* Extend expiry for all active jobs (heartbeat)
*/
private async extendExpiry(): Promise<void> {
if (this.activeJobs.size === 0) {
return;
}
const ids = [...this.activeJobs.keys()];
await sql`
UPDATE ${sql.table(this.tableName)}
SET "expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval
WHERE "id" = ANY(${sql.val(ids)}::bigint[])
`.execute(this.db);
}
private startHeartbeat(): void {
if (this.heartbeatTimer) {
return;
}
const interval = Math.max(1000, Math.floor(this.stallTimeout / 2));
this.heartbeatTimer = setInterval(() => void this.extendExpiry(), interval);
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}

View File

@@ -1,121 +0,0 @@
import { Kysely, sql } from 'kysely';
import { QueueName } from 'src/enum';
import { DB } from 'src/schema';
export type InsertRow = {
name: string;
data: unknown;
priority: number;
dedup_key: string | null;
run_after: Date;
};
type QueueTableName = keyof DB & `jobs_${string}`;
export const QUEUE_TABLE: Record<QueueName, QueueTableName> = {
[QueueName.ThumbnailGeneration]: 'jobs_thumbnail_generation',
[QueueName.MetadataExtraction]: 'jobs_metadata_extraction',
[QueueName.VideoConversion]: 'jobs_video_conversion',
[QueueName.FaceDetection]: 'jobs_face_detection',
[QueueName.FacialRecognition]: 'jobs_facial_recognition',
[QueueName.SmartSearch]: 'jobs_smart_search',
[QueueName.DuplicateDetection]: 'jobs_duplicate_detection',
[QueueName.BackgroundTask]: 'jobs_background_task',
[QueueName.StorageTemplateMigration]: 'jobs_storage_template_migration',
[QueueName.Migration]: 'jobs_migration',
[QueueName.Search]: 'jobs_search',
[QueueName.Sidecar]: 'jobs_sidecar',
[QueueName.Library]: 'jobs_library',
[QueueName.Notification]: 'jobs_notification',
[QueueName.BackupDatabase]: 'jobs_backup_database',
[QueueName.Ocr]: 'jobs_ocr',
[QueueName.Workflow]: 'jobs_workflow',
[QueueName.Editor]: 'jobs_editor',
};
type Deferred = { promise: Promise<void>; resolve: () => void };
const createDeferred = (): Deferred => {
let resolve!: () => void;
const promise = new Promise<void>((r) => (resolve = r));
return { promise, resolve };
};
const CHUNK_SIZE = 5000;
export class WriteBuffer {
private buffers = new Map<QueueName, InsertRow[]>();
private pending: Deferred | null = null;
private timer: ReturnType<typeof setTimeout> | null = null;
constructor(
private db: Kysely<DB>,
private notify: (queue: QueueName) => Promise<void>,
) {}
async add(items: { queue: QueueName; row: InsertRow }[]): Promise<void> {
for (const { queue, row } of items) {
let buf = this.buffers.get(queue);
if (!buf) {
buf = [];
this.buffers.set(queue, buf);
}
buf.push(row);
}
if (!this.timer) {
this.pending = createDeferred();
this.timer = setTimeout(() => void this.flush(), 10);
}
return this.pending!.promise;
}
async flush(): Promise<void> {
const snapshot = this.buffers;
this.buffers = new Map();
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
const deferred = this.pending;
this.pending = null;
if (snapshot.size === 0) {
deferred?.resolve();
return;
}
try {
for (const [queue, rows] of snapshot) {
const tableName = QUEUE_TABLE[queue];
for (let i = 0; i < rows.length; i += CHUNK_SIZE) {
const chunk = rows.slice(i, i + CHUNK_SIZE);
await this.insertChunk(tableName, chunk);
}
await this.notify(queue);
}
} finally {
deferred?.resolve();
}
}
private async insertChunk(tableName: string, rows: InsertRow[]): Promise<void> {
const names = rows.map((r) => r.name);
const datas = rows.map((r) => JSON.stringify(r.data));
const priorities = rows.map((r) => r.priority);
const dedupKeys = rows.map((r) => r.dedup_key);
const runAfters = rows.map((r) => r.run_after.toISOString());
await sql`
INSERT INTO ${sql.table(tableName)} ("name", "data", "priority", "dedup_key", "run_after")
SELECT * FROM unnest(
${sql.val(names)}::"char"[],
${sql.val(datas)}::jsonb[],
${sql.val(priorities)}::smallint[],
${sql.val(dedupKeys)}::text[],
${sql.val(runAfters)}::timestamptz[]
)
ON CONFLICT ("dedup_key") WHERE "dedup_key" IS NOT NULL AND "status" = 'p'::"char"
DO NOTHING
`.execute(this.db);
}
}

View File

@@ -1,25 +1,12 @@
import { JobCode, JobQueueStatus } from 'src/enum';
import { Column, ConfigurationParameter, Generated, Index, PrimaryColumn, Table } from 'src/sql-tools';
// Job status values stored as "char" (single-byte PostgreSQL type):
// 'p' = pending, 'a' = active, 'c' = completed, 'f' = failed
@Table('jobs_thumbnail_generation')
@Index({ name: 'IDX_jobs_thumbnail_generation_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_thumbnail_generation_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsThumbnailGenerationTable {
export class JobTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'smallint' })
code!: JobCode;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@@ -27,774 +14,89 @@ export class JobsThumbnailGenerationTable {
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_metadata_extraction')
@Index({ name: 'IDX_jobs_metadata_extraction_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_metadata_extraction_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsMetadataExtractionTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
status!: Generated<JobQueueStatus>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
dedupKey!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
runAfter!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
startedAt!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
expiresAt!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_video_conversion')
@Index({ name: 'IDX_jobs_video_conversion_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_video_conversion_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsVideoConversionTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
function defineJobTable(name: string) {
class NewJobTable extends JobTable {}
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
const decorated = [
ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }),
ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }),
ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }),
Index({
name: `IDX_${name}_dedup`,
columns: ['dedupKey'],
unique: true,
where: `"dedupKey" IS NOT NULL AND status = 0`,
}),
Index({ name: `IDX_${name}_pending`, columns: ['priority', 'id'], where: 'status = 0' }),
Table(name),
].reduce((cls, dec) => dec(cls) || cls, NewJobTable);
Object.defineProperty(decorated, 'name', { value: name });
return decorated;
}
@Table('jobs_face_detection')
@Index({ name: 'IDX_jobs_face_detection_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_face_detection_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsFaceDetectionTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_facial_recognition')
@Index({
name: 'IDX_jobs_facial_recognition_pending',
columns: ['priority', 'id'],
where: `"status" = 'p'::"char"`,
})
@Index({
name: 'IDX_jobs_facial_recognition_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsFacialRecognitionTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_smart_search')
@Index({ name: 'IDX_jobs_smart_search_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_smart_search_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsSmartSearchTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_duplicate_detection')
@Index({
name: 'IDX_jobs_duplicate_detection_pending',
columns: ['priority', 'id'],
where: `"status" = 'p'::"char"`,
})
@Index({
name: 'IDX_jobs_duplicate_detection_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsDuplicateDetectionTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_background_task')
@Index({ name: 'IDX_jobs_background_task_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_background_task_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsBackgroundTaskTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_storage_template_migration')
@Index({
name: 'IDX_jobs_storage_template_migration_pending',
columns: ['priority', 'id'],
where: `"status" = 'p'::"char"`,
})
@Index({
name: 'IDX_jobs_storage_template_migration_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsStorageTemplateMigrationTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_migration')
@Index({ name: 'IDX_jobs_migration_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_migration_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsMigrationTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_search')
@Index({ name: 'IDX_jobs_search_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_search_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsSearchTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_sidecar')
@Index({ name: 'IDX_jobs_sidecar_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_sidecar_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsSidecarTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_library')
@Index({ name: 'IDX_jobs_library_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_library_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsLibraryTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_notification')
@Index({ name: 'IDX_jobs_notification_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_notification_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsNotificationTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_backup_database')
@Index({ name: 'IDX_jobs_backup_database_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_backup_database_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsBackupDatabaseTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_ocr')
@Index({ name: 'IDX_jobs_ocr_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_ocr_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsOcrTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_workflow')
@Index({ name: 'IDX_jobs_workflow_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_workflow_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsWorkflowTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_editor')
@Index({ name: 'IDX_jobs_editor_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_editor_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsEditorTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
export const JobsThumbnailGenerationTable = defineJobTable('jobs_thumbnail_generation');
export const JobsMetadataExtractionTable = defineJobTable('jobs_metadata_extraction');
export const JobsVideoConversionTable = defineJobTable('jobs_video_conversion');
export const JobsFaceDetectionTable = defineJobTable('jobs_face_detection');
export const JobsFacialRecognitionTable = defineJobTable('jobs_facial_recognition');
export const JobsSmartSearchTable = defineJobTable('jobs_smart_search');
export const JobsDuplicateDetectionTable = defineJobTable('jobs_duplicate_detection');
export const JobsBackgroundTaskTable = defineJobTable('jobs_background_task');
export const JobsStorageTemplateMigrationTable = defineJobTable('jobs_storage_template_migration');
export const JobsMigrationTable = defineJobTable('jobs_migration');
export const JobsSearchTable = defineJobTable('jobs_search');
export const JobsSidecarTable = defineJobTable('jobs_sidecar');
export const JobsLibraryTable = defineJobTable('jobs_library');
export const JobsNotificationTable = defineJobTable('jobs_notification');
export const JobsBackupDatabaseTable = defineJobTable('jobs_backup_database');
export const JobsOcrTable = defineJobTable('jobs_ocr');
export const JobsWorkflowTable = defineJobTable('jobs_workflow');
export const JobsEditorTable = defineJobTable('jobs_editor');
export type JobsThumbnailGenerationTable = InstanceType<typeof JobsThumbnailGenerationTable>;
export type JobsMetadataExtractionTable = InstanceType<typeof JobsMetadataExtractionTable>;
export type JobsVideoConversionTable = InstanceType<typeof JobsVideoConversionTable>;
export type JobsFaceDetectionTable = InstanceType<typeof JobsFaceDetectionTable>;
export type JobsFacialRecognitionTable = InstanceType<typeof JobsFacialRecognitionTable>;
export type JobsSmartSearchTable = InstanceType<typeof JobsSmartSearchTable>;
export type JobsDuplicateDetectionTable = InstanceType<typeof JobsDuplicateDetectionTable>;
export type JobsBackgroundTaskTable = InstanceType<typeof JobsBackgroundTaskTable>;
export type JobsStorageTemplateMigrationTable = InstanceType<typeof JobsStorageTemplateMigrationTable>;
export type JobsMigrationTable = InstanceType<typeof JobsMigrationTable>;
export type JobsSearchTable = InstanceType<typeof JobsSearchTable>;
export type JobsSidecarTable = InstanceType<typeof JobsSidecarTable>;
export type JobsLibraryTable = InstanceType<typeof JobsLibraryTable>;
export type JobsNotificationTable = InstanceType<typeof JobsNotificationTable>;
export type JobsBackupDatabaseTable = InstanceType<typeof JobsBackupDatabaseTable>;
export type JobsOcrTable = InstanceType<typeof JobsOcrTable>;
export type JobsWorkflowTable = InstanceType<typeof JobsWorkflowTable>;
export type JobsEditorTable = InstanceType<typeof JobsEditorTable>;
// Queue metadata table
@Table('job_queue_meta')
export class JobQueueMetaTable {
@PrimaryColumn({ type: 'text' })
queue_name!: string;
queueName!: string;
@Column({ type: 'boolean', default: false })
is_paused!: Generated<boolean>;
isPaused!: Generated<boolean>;
}

View File

@@ -80,7 +80,7 @@ export class QueueService extends BaseService {
onBootstrap() {
this.jobRepository.setup(this.services);
if (this.worker === ImmichWorker.Microservices) {
this.jobRepository.startWorkers();
void this.jobRepository.startWorkers();
}
}

View File

@@ -0,0 +1,405 @@
import { Kysely, Selectable, sql } from 'kysely';
import { JOB_CODE_TO_NAME, JobCode, JobQueueStatus, QueueName } from 'src/enum';
import { DB } from 'src/schema';
import { JobTable } from 'src/schema/tables/job.table';
import { JobItem } from 'src/types';
export type InsertRow = {
code: JobCode;
data: unknown;
priority: number | null;
dedupKey: string | null;
runAfter: Date | null;
};
export const getTable = (db: Kysely<DB>, queueName: QueueName) => db.dynamic.table(QUEUE_TABLE[queueName]).as('t');
export class QueueWorker {
activeJobCount = 0;
private concurrency: number;
private activeJobs = new Map<number, { startedAt: number }>();
private hasPending = true;
private fetching = false;
private paused = false;
private stopped = false;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private sweepTimer: ReturnType<typeof setTimeout> | null = null;
private readonly table: ReturnType<typeof getTable>;
private readonly stallTimeout: number;
private readonly claimBatch: number;
private readonly db: Kysely<DB>;
private readonly onJobFn: (job: JobItem) => Promise<unknown>;
constructor(options: QueueWorkerOptions) {
this.stallTimeout = options.stallTimeout;
this.claimBatch = options.claimBatch;
this.concurrency = options.concurrency;
this.db = options.db;
this.table = getTable(this.db, options.queueName);
this.onJobFn = options.onJob;
// One-shot sweep after stallTimeout to recover jobs orphaned by a crash
// that restarted before their expiry passed
this.sweepTimer = setTimeout(() => this.onNotification(), this.stallTimeout);
}
onNotification() {
this.hasPending = true;
void this.tryFetch();
}
setConcurrency(n: number) {
this.concurrency = n;
void this.tryFetch();
}
pause() {
this.paused = true;
}
resume() {
this.paused = false;
this.hasPending = true;
void this.tryFetch();
}
shutdown() {
this.stopped = true;
this.stopHeartbeat();
if (this.sweepTimer) {
clearTimeout(this.sweepTimer);
this.sweepTimer = null;
}
if (this.activeJobs.size === 0) {
return Promise.resolve();
}
// Re-queue active jobs
const ids = [...this.activeJobs.keys()];
return this.db
.updateTable(this.table)
.set({
status: JobQueueStatus.Pending,
startedAt: null,
expiresAt: null,
})
.where('id', 'in', ids)
.execute();
}
private get slotsAvailable() {
return Math.max(0, this.concurrency - this.activeJobCount);
}
private async tryFetch() {
if (this.fetching || this.paused || this.stopped) {
return;
}
this.fetching = true;
try {
while (this.slotsAvailable > 0 && this.hasPending && !this.stopped) {
const limit = Math.min(this.slotsAvailable, this.claimBatch);
const jobs = await this.claim(limit);
if (jobs.length === 0) {
const recovered = await this.recoverStalled();
if (recovered.numChangedRows === 0n) {
this.hasPending = false;
break;
}
continue;
}
this.activeJobCount += jobs.length;
for (const job of jobs) {
void this.processJob(job);
}
}
} finally {
this.fetching = false;
}
}
private async processJob(row: Selectable<JobTable>) {
this.activeJobs.set(row.id, { startedAt: Date.now() });
this.startHeartbeat();
try {
const jobName = JOB_CODE_TO_NAME[row.code];
if (!jobName) {
throw new Error(`Unknown job char code: ${row.code}`);
}
await this.onJobFn({ name: jobName, data: row.data } as JobItem);
// Success: delete completed job and try to fetch next
const next = this.stopped ? undefined : await this.completeAndFetch(row.id, true).catch(() => undefined);
this.activeJobs.delete(row.id);
if (next) {
void this.processJob(next);
} else {
this.activeJobCount--;
this.hasPending = false;
}
} catch (error: unknown) {
// Failure: mark as failed and try to fetch next
const errorMsg = error instanceof Error ? error.message : String(error);
const next = await this.completeAndFetch(row.id, false, errorMsg).catch(() => undefined);
this.activeJobs.delete(row.id);
if (next) {
void this.processJob(next);
} else {
this.activeJobCount--;
this.hasPending = false;
}
} finally {
if (this.activeJobs.size === 0) {
this.stopHeartbeat();
}
}
}
/**
* Claim up to `limit` pending jobs.
* Uses a materialized CTE with FOR NO KEY UPDATE SKIP LOCKED
* to avoid race conditions and excessive locking.
*/
private claim(limit: number) {
return this.db
.with(
(wb) => wb('candidates').materialized(),
(qb) =>
qb
.selectFrom(this.table)
.select('id')
.where('status', '=', JobQueueStatus.Pending)
.where('runAfter', '<=', sql<Date>`now()`)
.orderBy('priority', 'desc')
.orderBy('id', 'asc')
.limit(limit)
.forNoKeyUpdate()
.skipLocked(),
)
.updateTable(this.table)
.set({
status: JobQueueStatus.Active,
startedAt: sql<Date>`now()`,
expiresAt: sql<Date>`now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval`,
})
.where((eb) => eb('id', 'in', eb.selectFrom('candidates').select('id')))
.returningAll()
.execute();
}
/**
* Atomically complete a job (delete on success, mark failed on failure) and claim the next one.
* Uses a CTE to combine operations in a single round-trip.
*/
private completeAndFetch(jobId: number, success: boolean, errorMsg?: string) {
const query = this.db.with('mark', (qb) =>
success
? qb.deleteFrom(this.table).where('id', '=', jobId)
: qb
.updateTable(this.table)
.set({ status: JobQueueStatus.Failed, error: errorMsg ?? null })
.where('id', '=', jobId),
);
return query
.with('next', (qb) =>
qb
.selectFrom(this.table)
.where('status', '=', JobQueueStatus.Pending)
.where('runAfter', '<=', sql<Date>`now()`)
.orderBy('priority', 'desc')
.orderBy('id', 'asc')
.limit(1)
.forNoKeyUpdate()
.skipLocked(),
)
.updateTable(this.table)
.set({
status: JobQueueStatus.Active,
startedAt: sql<Date>`now()`,
expiresAt: sql<Date>`now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval`,
})
.where((eb) => eb('id', '=', eb.selectFrom('next').select('id')))
.returningAll()
.executeTakeFirst();
}
/**
* Recover stalled jobs: reset jobs whose expires_at has passed
*/
private recoverStalled() {
return this.db
.updateTable(this.table)
.set({
status: JobQueueStatus.Pending,
startedAt: null,
expiresAt: null,
})
.where('status', '=', JobQueueStatus.Active)
.where('expiresAt', '<', sql<Date>`now()`)
.executeTakeFirst();
}
/**
* Extend expiry for all active jobs (heartbeat)
*/
private extendExpiry() {
if (this.activeJobs.size === 0) {
return;
}
const ids = [...this.activeJobs.keys()];
return this.db
.updateTable(this.table)
.set({
expiresAt: sql<Date>`now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval`,
})
.where('id', 'in', ids)
.execute();
}
private startHeartbeat() {
if (this.heartbeatTimer) {
return;
}
this.heartbeatTimer = setInterval(
() => this.extendExpiry()?.catch(() => setTimeout(() => this.extendExpiry(), 5000)),
Math.floor(this.stallTimeout / 2),
);
}
private stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}
export class WriteBuffer {
private buffers = Object.fromEntries(Object.values(QueueName).map((name) => [name as QueueName, [] as InsertRow[]]));
private pending: Deferred | null = null;
private timer: ReturnType<typeof setTimeout> | null = null;
constructor(
private db: Kysely<DB>,
private notify: (queue: QueueName) => Promise<unknown>,
) {}
async add(items: { queue: QueueName; row: InsertRow }[]): Promise<void> {
if (items.length === 0) {
return;
}
for (const { queue, row } of items) {
this.buffers[queue].push(row);
}
if (!this.timer) {
this.pending = createDeferred();
this.timer = setTimeout(() => void this.flush(), 10);
}
return this.pending!.promise;
}
async flush(): Promise<void> {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
const deferred = this.pending;
this.pending = null;
const promises = [];
try {
for (const [queue, rows] of Object.entries(this.buffers)) {
if (rows.length === 0) {
continue;
}
const tableName = QUEUE_TABLE[queue as QueueName];
promises.push(this.insertChunk(tableName, rows).then(() => this.notify(queue as QueueName)));
rows.length = 0;
}
await Promise.all(promises);
deferred?.resolve();
} catch (error) {
deferred?.reject(error);
}
}
private insertChunk(tableName: keyof JobTables, rows: InsertRow[]) {
return this.db
.insertInto(tableName)
.columns(['code', 'data', 'priority', 'dedupKey', 'runAfter'])
.expression((eb) =>
eb
.selectFrom(
eb
.fn('unnest', [
sql`${`{${rows.map((r) => r.code)}}`}::"smallint"[]`,
sql`${`{${rows.map((r) => {
if (!r.data) return null;
const json = JSON.stringify(r.data);
return '"' + json.replace(/\\/g, '\\\\').replace(/"/g, '\\"') + '"';
})}}`}::jsonb[]`,
sql`${`{${rows.map((r) => r.priority)}}`}::smallint[]`,
sql`${`{${rows.map((r) => r.dedupKey)}}`}::text[]`,
sql`${`{${rows.map((r) => r.runAfter)}}`}::timestamptz[]`,
])
.as('v'),
)
.selectAll(),
)
.onConflict((oc) =>
oc
.column('dedupKey')
.where('dedupKey', 'is not', null)
.where('status', '=', JobQueueStatus.Pending)
.doNothing(),
)
.execute();
}
}
const QUEUE_TABLE = {
[QueueName.ThumbnailGeneration]: 'jobs_thumbnail_generation',
[QueueName.MetadataExtraction]: 'jobs_metadata_extraction',
[QueueName.VideoConversion]: 'jobs_video_conversion',
[QueueName.FaceDetection]: 'jobs_face_detection',
[QueueName.FacialRecognition]: 'jobs_facial_recognition',
[QueueName.SmartSearch]: 'jobs_smart_search',
[QueueName.DuplicateDetection]: 'jobs_duplicate_detection',
[QueueName.BackgroundTask]: 'jobs_background_task',
[QueueName.StorageTemplateMigration]: 'jobs_storage_template_migration',
[QueueName.Migration]: 'jobs_migration',
[QueueName.Search]: 'jobs_search',
[QueueName.Sidecar]: 'jobs_sidecar',
[QueueName.Library]: 'jobs_library',
[QueueName.Notification]: 'jobs_notification',
[QueueName.BackupDatabase]: 'jobs_backup_database',
[QueueName.Ocr]: 'jobs_ocr',
[QueueName.Workflow]: 'jobs_workflow',
[QueueName.Editor]: 'jobs_editor',
} as const;
const createDeferred = (): Deferred => {
let resolve!: () => void;
let reject!: (error: unknown) => void;
const promise = new Promise<void>((_resolve, _reject) => ((resolve = _resolve), (reject = _reject)));
return { promise, resolve, reject };
};
interface QueueWorkerOptions {
queueName: QueueName;
stallTimeout: number;
claimBatch: number;
concurrency: number;
db: Kysely<DB>;
onJob: (job: JobItem) => Promise<unknown>;
}
type PickByValue<T, V> = {
[K in keyof T as T[K] extends V ? K : never]: T[K];
};
type JobTables = PickByValue<DB, JobTable>;
type Deferred = { promise: Promise<void>; resolve: () => void; reject: (error: unknown) => void };