This commit is contained in:
mertalev
2026-02-12 20:42:38 -05:00
parent 295ab7a11a
commit c5c8fc56a5
11 changed files with 1582 additions and 381 deletions

View File

@@ -35,7 +35,6 @@
},
"dependencies": {
"@extism/extism": "2.0.0-rc13",
"@nestjs/bullmq": "^11.0.1",
"@nestjs/common": "^11.0.4",
"@nestjs/core": "^11.0.4",
"@nestjs/platform-express": "^11.0.4",
@@ -62,7 +61,6 @@
"async-lock": "^1.4.0",
"bcrypt": "^6.0.0",
"body-parser": "^2.2.0",
"bullmq": "^5.51.0",
"chokidar": "^4.0.3",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",

View File

@@ -1,4 +1,3 @@
import { BullModule } from '@nestjs/bullmq';
import { Inject, Module, OnModuleDestroy, OnModuleInit, ValidationPipe } from '@nestjs/common';
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from '@nestjs/core';
import { ScheduleModule, SchedulerRegistry } from '@nestjs/schedule';
@@ -22,6 +21,7 @@ import { LoggingInterceptor } from 'src/middleware/logging.interceptor';
import { repositories } from 'src/repositories';
import { AppRepository } from 'src/repositories/app.repository';
import { ConfigRepository } from 'src/repositories/config.repository';
import { JobRepository } from 'src/repositories/job.repository';
import { DatabaseRepository } from 'src/repositories/database.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
@@ -49,7 +49,7 @@ const commonMiddleware = [
const apiMiddleware = [FileUploadInterceptor, ...commonMiddleware, { provide: APP_GUARD, useClass: AuthGuard }];
const configRepository = new ConfigRepository();
const { bull, cls, database, otel } = configRepository.getEnv();
const { cls, database, otel } = configRepository.getEnv();
const commonImports = [
ClsModule.forRoot(cls.config),
@@ -57,7 +57,6 @@ const commonImports = [
OpenTelemetryModule.forRoot(otel),
];
const bullImports = [BullModule.forRoot(bull.config), BullModule.registerQueue(...bull.queues)];
export class BaseModule implements OnModuleInit, OnModuleDestroy {
constructor(
@@ -65,6 +64,7 @@ export class BaseModule implements OnModuleInit, OnModuleDestroy {
logger: LoggingRepository,
private authService: AuthService,
private eventRepository: EventRepository,
private jobRepository: JobRepository,
private queueService: QueueService,
private telemetryRepository: TelemetryRepository,
private websocketRepository: WebsocketRepository,
@@ -91,12 +91,13 @@ export class BaseModule implements OnModuleInit, OnModuleDestroy {
async onModuleDestroy() {
await this.eventRepository.emit('AppShutdown');
await this.jobRepository.onShutdown();
await teardownTelemetry();
}
}
@Module({
imports: [...bullImports, ...commonImports, ScheduleModule.forRoot()],
imports: [...commonImports, ScheduleModule.forRoot()],
controllers: [...controllers],
providers: [...common, ...apiMiddleware, { provide: IWorker, useValue: ImmichWorker.Api }],
})
@@ -137,13 +138,13 @@ export class MaintenanceModule {
}
@Module({
imports: [...bullImports, ...commonImports],
imports: [...commonImports],
providers: [...common, { provide: IWorker, useValue: ImmichWorker.Microservices }, SchedulerRegistry],
})
export class MicroservicesModule extends BaseModule {}
@Module({
imports: [...bullImports, ...commonImports],
imports: [...commonImports],
providers: [...common, ...commandsAndQuestions, SchedulerRegistry],
})
export class ImmichAdminModule implements OnModuleDestroy {

View File

@@ -1,6 +1,4 @@
import { RegisterQueueOptions } from '@nestjs/bullmq';
import { Inject, Injectable, Optional } from '@nestjs/common';
import { QueueOptions } from 'bullmq';
import { plainToInstance } from 'class-transformer';
import { validateSync } from 'class-validator';
import { Request, Response } from 'express';
@@ -19,7 +17,6 @@ import {
ImmichWorker,
LogFormat,
LogLevel,
QueueName,
} from 'src/enum';
import { DatabaseConnectionParams, VectorExtension } from 'src/types';
import { setDifference } from 'src/utils/set';
@@ -48,11 +45,6 @@ export interface EnvData {
thirdPartySupportUrl?: string;
};
bull: {
config: QueueOptions;
queues: RegisterQueueOptions[];
};
cls: {
config: ClsModuleOptions;
};
@@ -253,19 +245,6 @@ const getEnv = (): EnvData => {
thirdPartySupportUrl: dto.IMMICH_THIRD_PARTY_SUPPORT_URL,
},
bull: {
config: {
prefix: 'immich_bull',
connection: { ...redisConfig },
defaultJobOptions: {
attempts: 1,
removeOnComplete: true,
removeOnFail: false,
},
},
queues: Object.values(QueueName).map((name) => ({ name })),
},
cls: {
config: {
middleware: {

View File

@@ -1,16 +1,21 @@
import { getQueueToken } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { ModuleRef, Reflector } from '@nestjs/core';
import { JobsOptions, Queue, Worker } from 'bullmq';
import { Kysely, sql } from 'kysely';
import { ClassConstructor } from 'class-transformer';
import { setTimeout } from 'node:timers/promises';
import { InjectKysely } from 'nestjs-kysely';
import postgres from 'postgres';
import { JobConfig } from 'src/decorators';
import { QueueJobResponseDto, QueueJobSearchDto } from 'src/dtos/queue.dto';
import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueJobStatus, QueueName } from 'src/enum';
import { ConfigRepository } from 'src/repositories/config.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { QUEUE_TABLE, WriteBuffer } from 'src/repositories/job.write-buffer';
import { charToJobName, jobNameToChar, QueueWorker } from 'src/repositories/job.worker';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { DB } from 'src/schema';
import { JobCounts, JobItem, JobOf } from 'src/types';
import { asPostgresConnectionConfig } from 'src/utils/database';
import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc';
type JobMapItem = {
@@ -20,16 +25,73 @@ type JobMapItem = {
label: string;
};
// Status char codes
const STATUS_PENDING = 'p';
const STATUS_ACTIVE = 'a';
const STATUS_FAILED = 'f';
// Stall timeouts in milliseconds
const STALL_LONG = 60 * 60 * 1000; // 1 hour
const STALL_MEDIUM = 30 * 60 * 1000; // 30 min
const STALL_DEFAULT = 5 * 60 * 1000; // 5 min
const getStallTimeout = (queueName: QueueName): number => {
switch (queueName) {
case QueueName.VideoConversion:
case QueueName.BackupDatabase:
case QueueName.Editor: {
return STALL_LONG;
}
case QueueName.Library:
case QueueName.StorageTemplateMigration: {
return STALL_MEDIUM;
}
default: {
return STALL_DEFAULT;
}
}
};
const getClaimBatch = (queueName: QueueName): number => {
switch (queueName) {
case QueueName.VideoConversion:
case QueueName.BackupDatabase:
case QueueName.StorageTemplateMigration:
case QueueName.Editor:
case QueueName.FacialRecognition:
case QueueName.DuplicateDetection: {
return 1;
}
default: {
return 100; // will be clamped to slotsAvailable by the worker
}
}
};
// Map QueueJobStatus to our "char" status codes
const STATUS_FILTER: Record<QueueJobStatus, string | null> = {
[QueueJobStatus.Active]: STATUS_ACTIVE,
[QueueJobStatus.Failed]: STATUS_FAILED,
[QueueJobStatus.Waiting]: STATUS_PENDING,
[QueueJobStatus.Complete]: null, // completed jobs are deleted
[QueueJobStatus.Delayed]: STATUS_PENDING, // delayed = pending with future run_after
[QueueJobStatus.Paused]: STATUS_PENDING, // paused queue has pending jobs
};
@Injectable()
export class JobRepository {
private workers: Partial<Record<QueueName, Worker>> = {};
private workers: Partial<Record<QueueName, QueueWorker>> = {};
private handlers: Partial<Record<JobName, JobMapItem>> = {};
private writeBuffer!: WriteBuffer;
private listenConn: postgres.Sql | null = null;
private pauseState: Partial<Record<QueueName, boolean>> = {};
constructor(
private moduleRef: ModuleRef,
private configRepository: ConfigRepository,
private eventRepository: EventRepository,
private logger: LoggingRepository,
@InjectKysely() private db: Kysely<DB>,
) {
this.logger.setContext(JobRepository.name);
}
@@ -85,15 +147,53 @@ export class JobRepository {
}
startWorkers() {
const { bull } = this.configRepository.getEnv();
for (const queueName of Object.values(QueueName)) {
this.logger.debug(`Starting worker for queue: ${queueName}`);
this.workers[queueName] = new Worker(
queueName,
(job) => this.eventRepository.emit('JobRun', queueName, job as JobItem),
{ ...bull.config, concurrency: 1 },
);
}
this.writeBuffer = new WriteBuffer(this.db, (queue) => this.notify(queue));
// Startup sweep: reset any active jobs from a previous crash
const startupPromises = Object.values(QueueName).map(async (queueName) => {
const tableName = QUEUE_TABLE[queueName];
await sql`
UPDATE ${sql.table(tableName)}
SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL
WHERE "status" = ${STATUS_ACTIVE}::"char"
`.execute(this.db);
});
// Load pause state and setup workers
void Promise.all(startupPromises).then(async () => {
// Load pause state from DB
const metaRows = await this.db.selectFrom('job_queue_meta').selectAll().execute();
for (const row of metaRows) {
this.pauseState[row.queue_name as QueueName] = row.is_paused;
}
// Create workers
for (const queueName of Object.values(QueueName)) {
const worker = new QueueWorker({
queueName,
tableName: QUEUE_TABLE[queueName],
stallTimeout: getStallTimeout(queueName),
claimBatch: getClaimBatch(queueName),
concurrency: 1,
db: this.db,
onJob: (job) => this.eventRepository.emit('JobRun', queueName, job),
});
if (this.pauseState[queueName]) {
worker.pause();
}
this.workers[queueName] = worker;
}
// Setup LISTEN/NOTIFY
await this.setupListen();
// Trigger initial fetch for all workers
for (const worker of Object.values(this.workers)) {
worker.onNotification();
}
});
}
async run({ name, data }: JobItem) {
@@ -113,44 +213,92 @@ export class JobRepository {
return;
}
worker.concurrency = concurrency;
worker.setConcurrency(concurrency);
}
async isActive(name: QueueName): Promise<boolean> {
const queue = this.getQueue(name);
const count = await queue.getActiveCount();
return count > 0;
isActive(name: QueueName): Promise<boolean> {
const worker = this.workers[name];
return Promise.resolve(worker ? worker.activeJobCount > 0 : false);
}
async isPaused(name: QueueName): Promise<boolean> {
return this.getQueue(name).isPaused();
isPaused(name: QueueName): Promise<boolean> {
return Promise.resolve(this.pauseState[name] ?? false);
}
pause(name: QueueName) {
return this.getQueue(name).pause();
async pause(name: QueueName) {
this.pauseState[name] = true;
await this.db
.insertInto('job_queue_meta')
.values({ queue_name: name, is_paused: true })
.onConflict((oc) => oc.column('queue_name').doUpdateSet({ is_paused: true }))
.execute();
this.workers[name]?.pause();
}
resume(name: QueueName) {
return this.getQueue(name).resume();
async resume(name: QueueName) {
this.pauseState[name] = false;
await this.db
.insertInto('job_queue_meta')
.values({ queue_name: name, is_paused: false })
.onConflict((oc) => oc.column('queue_name').doUpdateSet({ is_paused: false }))
.execute();
this.workers[name]?.resume();
}
empty(name: QueueName) {
return this.getQueue(name).drain();
async empty(name: QueueName) {
const tableName = QUEUE_TABLE[name];
await sql`DELETE FROM ${sql.table(tableName)} WHERE "status" = ${STATUS_PENDING}::"char"`.execute(this.db);
}
clear(name: QueueName, type: QueueCleanType) {
return this.getQueue(name).clean(0, 1000, type);
async clear(name: QueueName, _type: QueueCleanType) {
const tableName = QUEUE_TABLE[name];
await sql`DELETE FROM ${sql.table(tableName)} WHERE "status" = ${STATUS_FAILED}::"char"`.execute(this.db);
}
getJobCounts(name: QueueName): Promise<JobCounts> {
return this.getQueue(name).getJobCounts(
'active',
'completed',
'failed',
'delayed',
'waiting',
'paused',
) as unknown as Promise<JobCounts>;
async getJobCounts(name: QueueName): Promise<JobCounts> {
const tableName = QUEUE_TABLE[name];
const result = await sql<{ status: string; count: string }>`
SELECT "status", count(*)::text as count FROM ${sql.table(tableName)} GROUP BY "status"
`.execute(this.db);
const counts: JobCounts = {
active: 0,
completed: 0,
failed: 0,
delayed: 0,
waiting: 0,
paused: 0,
};
for (const row of result.rows) {
switch (row.status) {
case STATUS_PENDING: {
counts.waiting = Number(row.count);
break;
}
case STATUS_ACTIVE: {
counts.active = Number(row.count);
break;
}
case STATUS_FAILED: {
counts.failed = Number(row.count);
break;
}
}
}
// In-memory active count may be more accurate than DB for in-flight jobs
const worker = this.workers[name];
if (worker) {
counts.active = worker.activeJobCount;
}
if (this.pauseState[name]) {
counts.paused = counts.waiting;
counts.waiting = 0;
}
return counts;
}
private getQueueName(name: JobName) {
@@ -162,31 +310,24 @@ export class JobRepository {
return;
}
const promises = [];
const itemsByQueue = {} as Record<string, (JobItem & { data: any; options: JobsOptions | undefined })[]>;
const bufferItems: { queue: QueueName; row: { name: string; data: unknown; priority: number; dedup_key: string | null; run_after: Date } }[] = [];
for (const item of items) {
const queueName = this.getQueueName(item.name);
const job = {
name: item.name,
data: item.data || {},
options: this.getJobOptions(item) || undefined,
} as JobItem & { data: any; options: JobsOptions | undefined };
if (job.options?.jobId) {
// need to use add() instead of addBulk() for jobId deduplication
promises.push(this.getQueue(queueName).add(item.name, item.data, job.options));
} else {
itemsByQueue[queueName] = itemsByQueue[queueName] || [];
itemsByQueue[queueName].push(job);
}
const options = this.getJobOptions(item);
bufferItems.push({
queue: queueName,
row: {
name: jobNameToChar(item.name),
data: item.data || {},
priority: options?.priority ?? 0,
dedup_key: options?.dedupKey ?? null,
run_after: options?.delay ? new Date(Date.now() + options.delay) : new Date(),
},
});
}
for (const [queueName, jobs] of Object.entries(itemsByQueue)) {
const queue = this.getQueue(queueName as QueueName);
promises.push(queue.addBulk(jobs));
}
await Promise.all(promises);
await this.writeBuffer.add(bufferItems);
}
async queue(item: JobItem): Promise<void> {
@@ -209,29 +350,50 @@ export class JobRepository {
}
async searchJobs(name: QueueName, dto: QueueJobSearchDto): Promise<QueueJobResponseDto[]> {
const jobs = await this.getQueue(name).getJobs(dto.status ?? Object.values(QueueJobStatus), 0, 1000);
return jobs.map((job) => {
const { id, name, timestamp, data } = job;
return { id, name: name as JobName, timestamp, data };
});
const tableName = QUEUE_TABLE[name];
const statuses = dto.status ?? Object.values(QueueJobStatus);
const charStatuses = statuses
.map((s) => STATUS_FILTER[s])
.filter((s): s is string => s !== null);
if (charStatuses.length === 0) {
return [];
}
const uniqueStatuses = [...new Set(charStatuses)];
const rows = await sql<{ id: number; name: string; data: unknown; run_after: Date }>`
SELECT "id", "name", "data", "run_after"
FROM ${sql.table(tableName)}
WHERE "status" = ANY(${sql.val(uniqueStatuses)}::"char"[])
ORDER BY "id" DESC
LIMIT 1000
`.execute(this.db);
return rows.rows.map((row) => ({
id: String(row.id),
name: charToJobName(row.name) ?? (row.name as unknown as JobName),
data: (row.data ?? {}) as object,
timestamp: new Date(row.run_after).getTime(),
}));
}
private getJobOptions(item: JobItem): JobsOptions | null {
private getJobOptions(item: JobItem): { dedupKey?: string; priority?: number; delay?: number } | null {
switch (item.name) {
case JobName.NotifyAlbumUpdate: {
return {
jobId: `${item.data.id}/${item.data.recipientId}`,
dedupKey: `${item.data.id}/${item.data.recipientId}`,
delay: item.data?.delay,
};
}
case JobName.StorageTemplateMigrationSingle: {
return { jobId: item.data.id };
return { dedupKey: item.data.id };
}
case JobName.PersonGenerateThumbnail: {
return { priority: 1 };
}
case JobName.FacialRecognitionQueueAll: {
return { jobId: JobName.FacialRecognitionQueueAll };
return { dedupKey: JobName.FacialRecognitionQueueAll };
}
default: {
return null;
@@ -239,16 +401,52 @@ export class JobRepository {
}
}
private getQueue(queue: QueueName): Queue {
return this.moduleRef.get<Queue>(getQueueToken(queue), { strict: false });
}
/** @deprecated */
// todo: remove this when asset notifications no longer need it.
public async removeJob(name: JobName, jobID: string): Promise<void> {
const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobID);
if (existingJob) {
await existingJob.remove();
const queueName = this.getQueueName(name);
const tableName = QUEUE_TABLE[queueName];
await sql`DELETE FROM ${sql.table(tableName)} WHERE "id" = ${Number(jobID)}`.execute(this.db);
}
private async setupListen(): Promise<void> {
const { database } = this.configRepository.getEnv();
const pgConfig = asPostgresConnectionConfig(database.config);
this.listenConn = postgres({
host: pgConfig.host,
port: pgConfig.port,
username: pgConfig.username,
password: pgConfig.password as string | undefined,
database: pgConfig.database,
ssl: pgConfig.ssl as boolean | undefined,
max: 1,
});
for (const queueName of Object.values(QueueName)) {
await this.listenConn.listen(`jobs:${queueName}`, () => {
this.workers[queueName]?.onNotification();
});
}
}
private async notify(queue: QueueName): Promise<void> {
await sql`SELECT pg_notify(${`jobs:${queue}`}, '')`.execute(this.db);
}
async onShutdown(): Promise<void> {
// Stop workers
const shutdownPromises = Object.values(this.workers).map((worker) => worker.shutdown());
await Promise.all(shutdownPromises);
// Flush write buffer
if (this.writeBuffer) {
await this.writeBuffer.flush();
}
// Close LISTEN connection
if (this.listenConn) {
await this.listenConn.end();
this.listenConn = null;
}
}
}

View File

@@ -0,0 +1,298 @@
import { Kysely, sql } from 'kysely';
import { JobName, QueueName } from 'src/enum';
import { DB } from 'src/schema';
import { JobItem } from 'src/types';
// Job status codes stored as "char" (single-byte PostgreSQL type)
const STATUS_PENDING = 'p';
const STATUS_ACTIVE = 'a';
const STATUS_FAILED = 'f';
// Bidirectional JobName <-> "char" mapping
const JOB_CHAR: Record<string, string> = {};
const CHAR_JOB: Record<string, JobName> = {};
// Assign sequential character codes starting from 0x01
let charCode = 1;
for (const jobName of Object.values(JobName)) {
const char = String.fromCodePoint(charCode++);
JOB_CHAR[jobName] = char;
CHAR_JOB[char] = jobName;
}
export const jobNameToChar = (name: JobName): string => JOB_CHAR[name];
export const charToJobName = (char: string): JobName | undefined => CHAR_JOB[char];
type JobRow = {
id: number;
name: string;
data: unknown;
priority: number;
status: string;
dedup_key: string | null;
run_after: Date;
started_at: Date | null;
expires_at: Date | null;
error: string | null;
};
export interface QueueWorkerOptions {
queueName: QueueName;
tableName: string;
stallTimeout: number;
claimBatch: number;
concurrency: number;
db: Kysely<DB>;
onJob: (job: JobItem) => Promise<void>;
}
export class QueueWorker {
private concurrency: number;
private activeCount = 0;
private activeJobs = new Map<number, { startedAt: number }>();
private hasPending = true;
private fetching = false;
private paused = false;
private stopped = false;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private readonly queueName: QueueName;
private readonly tableName: string;
private readonly stallTimeout: number;
private readonly claimBatch: number;
private readonly db: Kysely<DB>;
private readonly onJobFn: (job: JobItem) => Promise<void>;
constructor(options: QueueWorkerOptions) {
this.queueName = options.queueName;
this.tableName = options.tableName;
this.stallTimeout = options.stallTimeout;
this.claimBatch = options.claimBatch;
this.concurrency = options.concurrency;
this.db = options.db;
this.onJobFn = options.onJob;
}
get activeJobCount(): number {
return this.activeCount;
}
onNotification(): void {
this.hasPending = true;
void this.tryFetch();
}
setConcurrency(n: number): void {
this.concurrency = n;
void this.tryFetch();
}
pause(): void {
this.paused = true;
}
resume(): void {
this.paused = false;
this.hasPending = true;
void this.tryFetch();
}
async shutdown(): Promise<void> {
this.stopped = true;
this.stopHeartbeat();
// Re-queue active jobs
if (this.activeJobs.size > 0) {
const ids = [...this.activeJobs.keys()];
await sql`
UPDATE ${sql.table(this.tableName)}
SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL
WHERE "id" = ANY(${sql.val(ids)}::bigint[])
`.execute(this.db);
}
}
private get slotsAvailable(): number {
return Math.max(0, this.concurrency - this.activeCount);
}
private async tryFetch(): Promise<void> {
if (this.fetching || this.paused || this.stopped) {
return;
}
this.fetching = true;
try {
while (this.slotsAvailable > 0 && this.hasPending && !this.stopped) {
const limit = Math.min(this.slotsAvailable, this.claimBatch);
const jobs = await this.claim(limit);
if (jobs.length === 0) {
const recovered = await this.recoverStalled();
if (recovered === 0) {
this.hasPending = false;
break;
}
continue;
}
this.activeCount += jobs.length;
for (const job of jobs) {
void this.processJob(job);
}
}
} finally {
this.fetching = false;
}
}
private async processJob(row: JobRow): Promise<void> {
this.activeJobs.set(row.id, { startedAt: Date.now() });
this.startHeartbeat();
try {
const jobName = charToJobName(row.name);
if (!jobName) {
throw new Error(`Unknown job char code: ${row.name.codePointAt(0)}`);
}
await this.onJobFn({ name: jobName, data: row.data } as JobItem);
// Success: delete completed job and try to fetch next
const next = await this.completeAndFetch(row.id, true);
this.activeJobs.delete(row.id);
if (next) {
void this.processJob(next);
} else {
this.activeCount--;
this.hasPending = false;
}
} catch (error: unknown) {
// Failure: mark as failed and try to fetch next
const errorMsg = error instanceof Error ? error.message : String(error);
const next = await this.completeAndFetch(row.id, false, errorMsg);
this.activeJobs.delete(row.id);
if (next) {
void this.processJob(next);
} else {
this.activeCount--;
this.hasPending = false;
}
} finally {
if (this.activeJobs.size === 0) {
this.stopHeartbeat();
}
}
}
/**
* Claim up to `limit` pending jobs using FOR UPDATE SKIP LOCKED
*/
private async claim(limit: number): Promise<JobRow[]> {
const result = await sql<JobRow>`
UPDATE ${sql.table(this.tableName)} SET
"status" = ${STATUS_ACTIVE}::"char",
"started_at" = now(),
"expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval
WHERE "id" IN (
SELECT "id" FROM ${sql.table(this.tableName)}
WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now()
ORDER BY "priority" DESC, "id" ASC
FOR UPDATE SKIP LOCKED
LIMIT ${sql.lit(limit)}
)
RETURNING *
`.execute(this.db);
return result.rows as JobRow[];
}
/**
* Atomically complete a job (delete on success, mark failed on failure) and claim the next one.
* Uses a CTE to combine operations in a single round-trip.
*/
private async completeAndFetch(
jobId: number,
success: boolean,
errorMsg?: string,
): Promise<JobRow | undefined> {
if (success) {
const result = await sql<JobRow>`
WITH completed AS (
DELETE FROM ${sql.table(this.tableName)} WHERE "id" = ${jobId}
),
next AS (
SELECT "id" FROM ${sql.table(this.tableName)}
WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now()
ORDER BY "priority" DESC, "id" ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE ${sql.table(this.tableName)} SET
"status" = ${STATUS_ACTIVE}::"char",
"started_at" = now(),
"expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval
WHERE "id" = (SELECT "id" FROM next)
RETURNING *
`.execute(this.db);
return (result.rows as JobRow[])[0];
}
const result = await sql<JobRow>`
WITH failed AS (
UPDATE ${sql.table(this.tableName)}
SET "status" = ${STATUS_FAILED}::"char", "error" = ${errorMsg ?? null}
WHERE "id" = ${jobId}
),
next AS (
SELECT "id" FROM ${sql.table(this.tableName)}
WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now()
ORDER BY "priority" DESC, "id" ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE ${sql.table(this.tableName)} SET
"status" = ${STATUS_ACTIVE}::"char",
"started_at" = now(),
"expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval
WHERE "id" = (SELECT "id" FROM next)
RETURNING *
`.execute(this.db);
return (result.rows as JobRow[])[0];
}
/**
* Recover stalled jobs: reset jobs whose expires_at has passed
*/
private async recoverStalled(): Promise<number> {
const result = await sql`
UPDATE ${sql.table(this.tableName)}
SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL
WHERE "status" = ${STATUS_ACTIVE}::"char" AND "expires_at" < now()
`.execute(this.db);
return Number(result.numAffectedRows ?? 0);
}
/**
* Extend expiry for all active jobs (heartbeat)
*/
private async extendExpiry(): Promise<void> {
if (this.activeJobs.size === 0) {
return;
}
const ids = [...this.activeJobs.keys()];
await sql`
UPDATE ${sql.table(this.tableName)}
SET "expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval
WHERE "id" = ANY(${sql.val(ids)}::bigint[])
`.execute(this.db);
}
private startHeartbeat(): void {
if (this.heartbeatTimer) {
return;
}
const interval = Math.max(1000, Math.floor(this.stallTimeout / 2));
this.heartbeatTimer = setInterval(() => void this.extendExpiry(), interval);
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}

View File

@@ -0,0 +1,121 @@
import { Kysely, sql } from 'kysely';
import { QueueName } from 'src/enum';
import { DB } from 'src/schema';
export type InsertRow = {
name: string;
data: unknown;
priority: number;
dedup_key: string | null;
run_after: Date;
};
type QueueTableName = keyof DB & `jobs_${string}`;
export const QUEUE_TABLE: Record<QueueName, QueueTableName> = {
[QueueName.ThumbnailGeneration]: 'jobs_thumbnail_generation',
[QueueName.MetadataExtraction]: 'jobs_metadata_extraction',
[QueueName.VideoConversion]: 'jobs_video_conversion',
[QueueName.FaceDetection]: 'jobs_face_detection',
[QueueName.FacialRecognition]: 'jobs_facial_recognition',
[QueueName.SmartSearch]: 'jobs_smart_search',
[QueueName.DuplicateDetection]: 'jobs_duplicate_detection',
[QueueName.BackgroundTask]: 'jobs_background_task',
[QueueName.StorageTemplateMigration]: 'jobs_storage_template_migration',
[QueueName.Migration]: 'jobs_migration',
[QueueName.Search]: 'jobs_search',
[QueueName.Sidecar]: 'jobs_sidecar',
[QueueName.Library]: 'jobs_library',
[QueueName.Notification]: 'jobs_notification',
[QueueName.BackupDatabase]: 'jobs_backup_database',
[QueueName.Ocr]: 'jobs_ocr',
[QueueName.Workflow]: 'jobs_workflow',
[QueueName.Editor]: 'jobs_editor',
};
type Deferred = { promise: Promise<void>; resolve: () => void };
const createDeferred = (): Deferred => {
let resolve!: () => void;
const promise = new Promise<void>((r) => (resolve = r));
return { promise, resolve };
};
const CHUNK_SIZE = 5000;
export class WriteBuffer {
private buffers = new Map<QueueName, InsertRow[]>();
private pending: Deferred | null = null;
private timer: ReturnType<typeof setTimeout> | null = null;
constructor(
private db: Kysely<DB>,
private notify: (queue: QueueName) => Promise<void>,
) {}
async add(items: { queue: QueueName; row: InsertRow }[]): Promise<void> {
for (const { queue, row } of items) {
let buf = this.buffers.get(queue);
if (!buf) {
buf = [];
this.buffers.set(queue, buf);
}
buf.push(row);
}
if (!this.timer) {
this.pending = createDeferred();
this.timer = setTimeout(() => void this.flush(), 10);
}
return this.pending!.promise;
}
async flush(): Promise<void> {
const snapshot = this.buffers;
this.buffers = new Map();
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
const deferred = this.pending;
this.pending = null;
if (snapshot.size === 0) {
deferred?.resolve();
return;
}
try {
for (const [queue, rows] of snapshot) {
const tableName = QUEUE_TABLE[queue];
for (let i = 0; i < rows.length; i += CHUNK_SIZE) {
const chunk = rows.slice(i, i + CHUNK_SIZE);
await this.insertChunk(tableName, chunk);
}
await this.notify(queue);
}
} finally {
deferred?.resolve();
}
}
private async insertChunk(tableName: string, rows: InsertRow[]): Promise<void> {
const names = rows.map((r) => r.name);
const datas = rows.map((r) => JSON.stringify(r.data));
const priorities = rows.map((r) => r.priority);
const dedupKeys = rows.map((r) => r.dedup_key);
const runAfters = rows.map((r) => r.run_after.toISOString());
await sql`
INSERT INTO ${sql.table(tableName)} ("name", "data", "priority", "dedup_key", "run_after")
SELECT * FROM unnest(
${sql.val(names)}::"char"[],
${sql.val(datas)}::jsonb[],
${sql.val(priorities)}::smallint[],
${sql.val(dedupKeys)}::text[],
${sql.val(runAfters)}::timestamptz[]
)
ON CONFLICT ("dedup_key") WHERE "dedup_key" IS NOT NULL AND "status" = 'p'::"char"
DO NOTHING
`.execute(this.db);
}
}

View File

@@ -41,6 +41,27 @@ import { AssetTable } from 'src/schema/tables/asset.table';
import { AuditTable } from 'src/schema/tables/audit.table';
import { FaceSearchTable } from 'src/schema/tables/face-search.table';
import { GeodataPlacesTable } from 'src/schema/tables/geodata-places.table';
import {
JobQueueMetaTable,
JobsBackgroundTaskTable,
JobsBackupDatabaseTable,
JobsDuplicateDetectionTable,
JobsEditorTable,
JobsFaceDetectionTable,
JobsFacialRecognitionTable,
JobsLibraryTable,
JobsMetadataExtractionTable,
JobsMigrationTable,
JobsNotificationTable,
JobsOcrTable,
JobsSearchTable,
JobsSidecarTable,
JobsSmartSearchTable,
JobsStorageTemplateMigrationTable,
JobsThumbnailGenerationTable,
JobsVideoConversionTable,
JobsWorkflowTable,
} from 'src/schema/tables/job.table';
import { LibraryTable } from 'src/schema/tables/library.table';
import { MemoryAssetAuditTable } from 'src/schema/tables/memory-asset-audit.table';
import { MemoryAssetTable } from 'src/schema/tables/memory-asset.table';
@@ -135,6 +156,25 @@ export class ImmichDatabase {
WorkflowTable,
WorkflowFilterTable,
WorkflowActionTable,
JobsThumbnailGenerationTable,
JobsMetadataExtractionTable,
JobsVideoConversionTable,
JobsFaceDetectionTable,
JobsFacialRecognitionTable,
JobsSmartSearchTable,
JobsDuplicateDetectionTable,
JobsBackgroundTaskTable,
JobsStorageTemplateMigrationTable,
JobsMigrationTable,
JobsSearchTable,
JobsSidecarTable,
JobsLibraryTable,
JobsNotificationTable,
JobsBackupDatabaseTable,
JobsOcrTable,
JobsWorkflowTable,
JobsEditorTable,
JobQueueMetaTable,
];
functions = [
@@ -252,4 +292,24 @@ export interface DB {
workflow: WorkflowTable;
workflow_filter: WorkflowFilterTable;
workflow_action: WorkflowActionTable;
jobs_thumbnail_generation: JobsThumbnailGenerationTable;
jobs_metadata_extraction: JobsMetadataExtractionTable;
jobs_video_conversion: JobsVideoConversionTable;
jobs_face_detection: JobsFaceDetectionTable;
jobs_facial_recognition: JobsFacialRecognitionTable;
jobs_smart_search: JobsSmartSearchTable;
jobs_duplicate_detection: JobsDuplicateDetectionTable;
jobs_background_task: JobsBackgroundTaskTable;
jobs_storage_template_migration: JobsStorageTemplateMigrationTable;
jobs_migration: JobsMigrationTable;
jobs_search: JobsSearchTable;
jobs_sidecar: JobsSidecarTable;
jobs_library: JobsLibraryTable;
jobs_notification: JobsNotificationTable;
jobs_backup_database: JobsBackupDatabaseTable;
jobs_ocr: JobsOcrTable;
jobs_workflow: JobsWorkflowTable;
jobs_editor: JobsEditorTable;
job_queue_meta: JobQueueMetaTable;
}

View File

@@ -0,0 +1,800 @@
import { Column, ConfigurationParameter, Generated, Index, PrimaryColumn, Table } from 'src/sql-tools';
// Job status values stored as "char" (single-byte PostgreSQL type):
// 'p' = pending, 'a' = active, 'c' = completed, 'f' = failed
@Table('jobs_thumbnail_generation')
@Index({ name: 'IDX_jobs_thumbnail_generation_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_thumbnail_generation_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsThumbnailGenerationTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_metadata_extraction')
@Index({ name: 'IDX_jobs_metadata_extraction_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_metadata_extraction_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsMetadataExtractionTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_video_conversion')
@Index({ name: 'IDX_jobs_video_conversion_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_video_conversion_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsVideoConversionTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_face_detection')
@Index({ name: 'IDX_jobs_face_detection_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_face_detection_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsFaceDetectionTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_facial_recognition')
@Index({
name: 'IDX_jobs_facial_recognition_pending',
columns: ['priority', 'id'],
where: `"status" = 'p'::"char"`,
})
@Index({
name: 'IDX_jobs_facial_recognition_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsFacialRecognitionTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_smart_search')
@Index({ name: 'IDX_jobs_smart_search_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_smart_search_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsSmartSearchTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_duplicate_detection')
@Index({
name: 'IDX_jobs_duplicate_detection_pending',
columns: ['priority', 'id'],
where: `"status" = 'p'::"char"`,
})
@Index({
name: 'IDX_jobs_duplicate_detection_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsDuplicateDetectionTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_background_task')
@Index({ name: 'IDX_jobs_background_task_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_background_task_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsBackgroundTaskTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_storage_template_migration')
@Index({
name: 'IDX_jobs_storage_template_migration_pending',
columns: ['priority', 'id'],
where: `"status" = 'p'::"char"`,
})
@Index({
name: 'IDX_jobs_storage_template_migration_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsStorageTemplateMigrationTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_migration')
@Index({ name: 'IDX_jobs_migration_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_migration_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsMigrationTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_search')
@Index({ name: 'IDX_jobs_search_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_search_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsSearchTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_sidecar')
@Index({ name: 'IDX_jobs_sidecar_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_sidecar_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsSidecarTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_library')
@Index({ name: 'IDX_jobs_library_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_library_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsLibraryTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_notification')
@Index({ name: 'IDX_jobs_notification_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_notification_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsNotificationTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_backup_database')
@Index({ name: 'IDX_jobs_backup_database_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_backup_database_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsBackupDatabaseTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_ocr')
@Index({ name: 'IDX_jobs_ocr_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_ocr_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsOcrTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_workflow')
@Index({ name: 'IDX_jobs_workflow_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_workflow_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsWorkflowTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
@Table('jobs_editor')
@Index({ name: 'IDX_jobs_editor_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` })
@Index({
name: 'IDX_jobs_editor_dedup',
columns: ['dedup_key'],
unique: true,
where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`,
})
@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' })
@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' })
export class JobsEditorTable {
@PrimaryColumn({ type: 'bigint', identity: true })
id!: Generated<number>;
@Column({ type: '"char"' })
name!: string;
@Column({ type: 'jsonb', nullable: true })
data!: unknown;
@Column({ type: 'smallint', default: 0 })
priority!: Generated<number>;
@Column({ type: '"char"', default: 'p' })
status!: Generated<string>;
@Column({ type: 'text', nullable: true })
dedup_key!: string | null;
@Column({ type: 'timestamp with time zone', default: () => 'now()' })
run_after!: Generated<Date>;
@Column({ type: 'timestamp with time zone', nullable: true })
started_at!: Date | null;
@Column({ type: 'timestamp with time zone', nullable: true })
expires_at!: Date | null;
@Column({ type: 'text', nullable: true })
error!: string | null;
}
// Queue metadata table
@Table('job_queue_meta')
export class JobQueueMetaTable {
@PrimaryColumn({ type: 'text' })
queue_name!: string;
@Column({ type: 'boolean', default: false })
is_paused!: Generated<boolean>;
}

View File

@@ -9,13 +9,6 @@ const envData: EnvData = {
logFormat: LogFormat.Console,
buildMetadata: {},
bull: {
config: {
connection: {},
prefix: 'immich_bull',
},
queues: [{ name: 'queue-1' }],
},
cls: {
config: {},

View File

@@ -20,5 +20,6 @@ export const newJobRepositoryMock = (): Mocked<RepositoryInterface<JobRepository
clear: vitest.fn(),
waitForQueueCompletion: vitest.fn(),
removeJob: vitest.fn(),
onShutdown: vitest.fn(),
};
};