This commit is contained in:
mertalev
2026-02-14 00:48:36 -05:00
parent 880b2ab665
commit 27de6e7c1a
2 changed files with 22 additions and 13 deletions

View File

@@ -127,12 +127,12 @@ export class JobRepository {
throw new ImmichStartupError(errorMessage);
}
}
this.writePool = this.createPgConnection({ max: 4, connection: { synchronous_commit: 'off' } });
this.writeBuffer = new WriteBuffer(this.writePool, (queue) => this.notify(queue));
}
async startWorkers() {
this.writePool = this.createPgConnection({ max: 4, connection: { synchronous_commit: 'off' } });
this.writeBuffer = new WriteBuffer(this.writePool, (queue) => this.notify(queue));
// Startup sweep: reset any active jobs from a previous crash
await Promise.all(
Object.values(QueueName).map((queueName) =>
@@ -145,7 +145,6 @@ export class JobRepository {
),
);
// Create workers
for (const queueName of Object.values(QueueName)) {
this.workers[queueName] = new QueueWorker({
queueName,
@@ -157,7 +156,6 @@ export class JobRepository {
});
}
// Setup LISTEN/NOTIFY, sync pause state, and trigger initial fetch
await this.setupListen();
}
@@ -465,16 +463,13 @@ export class JobRepository {
}
async onShutdown(): Promise<void> {
// Stop workers
const shutdownPromises = Object.values(this.workers).map((worker) => worker.shutdown());
await Promise.all(shutdownPromises);
// Flush write buffer
if (this.writeBuffer) {
await this.writeBuffer.flush();
}
// Close dedicated connections
if (this.writePool) {
await this.writePool.end();
this.writePool = null;

View File

@@ -208,6 +208,7 @@ export class QueueWorker {
.with('next', (qb) =>
qb
.selectFrom(this.table)
.select('id')
.where('status', '=', JobQueueStatus.Pending)
.where('runAfter', '<=', sql<Date>`now()`)
.orderBy('priority', 'desc')
@@ -367,14 +368,27 @@ export class WriteBuffer {
private insertChunk(tableName: string, rows: InsertRow[]) {
const now = new Date().toISOString();
const code = [];
const data = [];
const priority = [];
const dedupKey = [];
const runAfter = [];
for (const row of rows) {
code.push(row.code);
data.push(row.data ?? null);
priority.push(row.priority ?? 0);
dedupKey.push(row.dedupKey);
runAfter.push(row.runAfter?.toISOString() ?? now);
}
return this.pgPool`
INSERT INTO ${this.pgPool(tableName)} (code, data, priority, "dedupKey", "runAfter")
SELECT * FROM unnest(
${rows.map((r) => r.code)}::smallint[],
${rows.map((r) => (r.data != null ? JSON.stringify(r.data) : null))}::jsonb[],
${rows.map((r) => r.priority ?? 0)}::smallint[],
${rows.map((r) => r.dedupKey)}::text[],
${rows.map((r) => r.runAfter?.toISOString() ?? now)}::timestamptz[]
${code}::smallint[],
${data as any}::jsonb[],
${priority}::smallint[],
${dedupKey}::text[],
${runAfter}::timestamptz[]
)
ON CONFLICT ("dedupKey") WHERE "dedupKey" IS NOT NULL AND status = ${JobQueueStatus.Pending}
DO NOTHING