This commit is contained in:
mertalev
2026-02-14 07:08:25 -05:00
parent 1989a8bec2
commit 3af4b8d7a7
6 changed files with 19 additions and 33 deletions

View File

@@ -143,7 +143,11 @@ export class JobRepository {
this.pool = this.createPgConnection({ max: 20, connection: { synchronous_commit: 'off' } });
this.db = new Kysely<DB>({ dialect: new PostgresJSDialect({ postgres: this.pool }) });
this.writeBuffer = new WriteBuffer(this.pool, (queue) => this.notify(queue));
this.writeBuffer = new WriteBuffer(
this.pool,
(queue) => this.notify(queue),
(error) => this.logger.error(`Failed to flush job write buffer: ${error}`),
);
}
async startWorkers() {
@@ -287,9 +291,9 @@ export class JobRepository {
return (this.handlers[name] as JobMapItem).queueName;
}
queueAll(items: JobItem[]): Promise<void> {
queueAll(items: JobItem[]): void {
if (items.length === 0) {
return Promise.resolve();
return;
}
const bufferItems: { queue: QueueName; row: InsertRow }[] = [];
@@ -308,11 +312,11 @@ export class JobRepository {
});
}
return this.writeBuffer.add(bufferItems);
this.writeBuffer.add(bufferItems);
}
queue(item: JobItem): Promise<void> {
return this.queueAll([item]);
queue(item: JobItem): void {
this.queueAll([item]);
}
async waitForQueueCompletion(...queues: QueueName[]): Promise<void> {

View File

@@ -29,7 +29,6 @@ import {
UnsupportedPostgresError,
} from 'src/utils/database-backups';
import { ImmichFileResponse } from 'src/utils/file';
import { handlePromiseError } from 'src/utils/misc';
@Injectable()
export class DatabaseBackupService {
@@ -68,7 +67,7 @@ export class DatabaseBackupService {
this.cronRepository.create({
name: 'backupDatabase',
expression: database.cronExpression,
onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.DatabaseBackup }), this.logger),
onTick: () => this.jobRepository.queue({ name: JobName.DatabaseBackup }),
start: database.enabled,
});
}

View File

@@ -47,7 +47,7 @@ export class LibraryService extends BaseService {
this.cronRepository.create({
name: CronJob.LibraryScan,
expression: scan.cronExpression,
onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.LibraryScanQueueAll }), this.logger),
onTick: () => this.jobRepository.queue({ name: JobName.LibraryScanQueueAll }),
start: scan.enabled,
});
}

View File

@@ -135,7 +135,7 @@ export class StorageTemplateService extends BaseService {
@OnEvent({ name: 'AssetMetadataExtracted' })
onAssetMetadataExtracted({ source, assetId }: ArgOf<'AssetMetadataExtracted'>) {
void this.jobRepository.queue({ name: JobName.StorageTemplateMigrationSingle, data: { source, id: assetId } });
this.jobRepository.queue({ name: JobName.StorageTemplateMigrationSingle, data: { source, id: assetId } });
}
@OnJob({ name: JobName.StorageTemplateMigrationSingle, queue: QueueName.StorageTemplateMigration })

View File

@@ -325,15 +325,15 @@ export class QueueWorker {
export class WriteBuffer {
private buffers = Object.fromEntries(Object.values(QueueName).map((name) => [name as QueueName, [] as InsertRow[]]));
private pending: Deferred | null = null;
private timer: ReturnType<typeof setTimeout> | null = null;
constructor(
private pgPool: postgres.Sql,
private notify: (queue: QueueName) => Promise<unknown>,
private onFlushError?: (error: unknown) => void,
) {}
async add(items: { queue: QueueName; row: InsertRow }[]): Promise<void> {
add(items: { queue: QueueName; row: InsertRow }[]): void {
if (items.length === 0) {
return;
}
@@ -342,10 +342,8 @@ export class WriteBuffer {
this.buffers[queue].push(row);
}
if (!this.timer) {
this.pending = createDeferred();
this.timer = setTimeout(() => void this.flush(), 10);
this.timer = setTimeout(() => void this.flush().catch((error) => this.onFlushError?.(error)), 10);
}
return this.pending!.promise;
}
async flush(): Promise<void> {
@@ -353,8 +351,6 @@ export class WriteBuffer {
clearTimeout(this.timer);
this.timer = null;
}
const deferred = this.pending;
this.pending = null;
const promises: Promise<unknown>[] = [];
@@ -385,12 +381,7 @@ export class WriteBuffer {
}
}
try {
await Promise.all(promises);
deferred?.resolve();
} catch (error) {
deferred?.reject(error);
}
await Promise.all(promises);
}
private async copyInsert(tableName: string, rows: InsertRow[]) {
@@ -466,13 +457,6 @@ const QUEUE_TABLE = {
[QueueName.Editor]: 'jobs_editor',
} as const;
const createDeferred = (): Deferred => {
let resolve!: () => void;
let reject!: (error: unknown) => void;
const promise = new Promise<void>((_resolve, _reject) => ((resolve = _resolve), (reject = _reject)));
return { promise, resolve, reject };
};
interface QueueWorkerOptions {
queueName: QueueName;
stallTimeout: number;
@@ -484,4 +468,3 @@ interface QueueWorkerOptions {
onJob: (job: JobItem) => Promise<unknown>;
}
type Deferred = { promise: Promise<void>; resolve: () => void; reject: (error: unknown) => void };

View File

@@ -12,8 +12,8 @@ export const newJobRepositoryMock = (): Mocked<RepositoryInterface<JobRepository
pause: vitest.fn(),
resume: vitest.fn(),
searchJobs: vitest.fn(),
queue: vitest.fn().mockImplementation(() => Promise.resolve()),
queueAll: vitest.fn().mockImplementation(() => Promise.resolve()),
queue: vitest.fn(),
queueAll: vitest.fn(),
isActive: vitest.fn(),
isPaused: vitest.fn(),
getJobCounts: vitest.fn(),