diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index d05b18962d..920f3c57d3 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -64,6 +64,7 @@ export class JobRepository { private workers: Partial> = {}; private handlers: Partial> = {}; private writeBuffer!: WriteBuffer; + private writePool: postgres.Sql | null = null; private listenConn: postgres.Sql | null = null; private listenReady = false; private pauseState: Partial> = {}; @@ -129,7 +130,8 @@ export class JobRepository { } async startWorkers() { - this.writeBuffer = new WriteBuffer(this.db, (queue) => this.notify(queue)); + this.writePool = this.createPgConnection({ max: 4, connection: { synchronous_commit: 'off' } }); + this.writeBuffer = new WriteBuffer(this.writePool, (queue) => this.notify(queue)); // Startup sweep: reset any active jobs from a previous crash await Promise.all( @@ -373,23 +375,28 @@ export class JobRepository { .execute(); } - private async setupListen(): Promise { - if (this.listenConn) { - await this.listenConn.end(); - this.listenConn = null; - } - + private createPgConnection(options?: { max?: number; connection?: Record }) { const { database } = this.configRepository.getEnv(); const pgConfig = asPostgresConnectionConfig(database.config); - this.listenConn = postgres({ + return postgres({ host: pgConfig.host, port: pgConfig.port, username: pgConfig.username, password: pgConfig.password as string | undefined, database: pgConfig.database, ssl: pgConfig.ssl as boolean | undefined, - max: 1, + max: options?.max ?? 1, + connection: options?.connection, }); + } + + private async setupListen(): Promise { + if (this.listenConn) { + await this.listenConn.end(); + this.listenConn = null; + } + + this.listenConn = this.createPgConnection(); for (const queueName of Object.values(QueueName)) { await this.listenConn.listen( @@ -467,7 +474,11 @@ export class JobRepository { await this.writeBuffer.flush(); } - // Close LISTEN connection + // Close dedicated connections + if (this.writePool) { + await this.writePool.end(); + this.writePool = null; + } if (this.listenConn) { await this.listenConn.end(); this.listenConn = null; diff --git a/server/src/utils/job-queue.util.ts b/server/src/utils/job-queue.util.ts index 584623463e..c52c9a77e9 100644 --- a/server/src/utils/job-queue.util.ts +++ b/server/src/utils/job-queue.util.ts @@ -1,9 +1,12 @@ import { Kysely, Selectable, sql } from 'kysely'; +import postgres from 'postgres'; import { JOB_CODE_TO_NAME, JobCode, JobQueueStatus, QueueName } from 'src/enum'; import { DB } from 'src/schema'; import { JobTable } from 'src/schema/tables/job.table'; import { JobItem } from 'src/types'; +const csvEscape = (s: string) => '"' + s.replace(/"/g, '""') + '"'; + export type InsertRow = { code: JobCode; data: unknown; @@ -281,7 +284,7 @@ export class WriteBuffer { private timer: ReturnType | null = null; constructor( - private db: Kysely, + private pgPool: postgres.Sql, private notify: (queue: QueueName) => Promise, ) {} @@ -308,16 +311,36 @@ export class WriteBuffer { const deferred = this.pending; this.pending = null; - const promises = []; - try { - for (const [queue, rows] of Object.entries(this.buffers)) { - if (rows.length === 0) { - continue; - } - const tableName = QUEUE_TABLE[queue as QueueName]; - promises.push(this.insertChunk(tableName, rows).then(() => this.notify(queue as QueueName))); - rows.length = 0; + const promises: Promise[] = []; + + for (const [queue, rows] of Object.entries(this.buffers)) { + if (rows.length === 0) { + continue; } + + const queueName = queue as QueueName; + const tableName = QUEUE_TABLE[queueName]; + + const copyRows: InsertRow[] = []; + const insertRows: InsertRow[] = []; + for (const row of rows) { + if (row.dedupKey) { + insertRows.push(row); + } else { + copyRows.push(row); + } + } + rows.length = 0; + + if (copyRows.length > 0) { + promises.push(this.copyInsert(tableName, copyRows).then(() => this.notify(queueName))); + } + if (insertRows.length > 0) { + promises.push(this.insertChunk(tableName, insertRows).then(() => this.notify(queueName))); + } + } + + try { await Promise.all(promises); deferred?.resolve(); } catch (error) { @@ -325,37 +348,37 @@ export class WriteBuffer { } } - private insertChunk(tableName: keyof JobTables, rows: InsertRow[]) { - return this.db - .insertInto(tableName) - .columns(['code', 'data', 'priority', 'dedupKey', 'runAfter']) - .expression((eb) => - eb - .selectFrom( - eb - .fn('unnest', [ - sql`${`{${rows.map((r) => r.code)}}`}::"smallint"[]`, - sql`${`{${rows.map((r) => { - if (!r.data) return null; - const json = JSON.stringify(r.data); - return '"' + json.replace(/\\/g, '\\\\').replace(/"/g, '\\"') + '"'; - })}}`}::jsonb[]`, - sql`${`{${rows.map((r) => r.priority)}}`}::smallint[]`, - sql`${`{${rows.map((r) => r.dedupKey)}}`}::text[]`, - sql`${`{${rows.map((r) => r.runAfter)}}`}::timestamptz[]`, - ]) - .as('v'), - ) - .selectAll(), + private async copyInsert(tableName: string, rows: InsertRow[]) { + const writable = await this + .pgPool`COPY ${this.pgPool(tableName)} (code, data, priority, "runAfter") FROM STDIN WITH (FORMAT csv)`.writable(); + const now = new Date().toISOString(); + for (const row of rows) { + const data = row.data != null ? csvEscape(JSON.stringify(row.data)) : ''; + const priority = row.priority ?? 0; + const runAfter = row.runAfter ? row.runAfter.toISOString() : now; + writable.write(`${row.code},${data},${priority},${runAfter}\n`); + } + writable.end(); + await new Promise((resolve, reject) => { + writable.on('finish', resolve); + writable.on('error', reject); + }); + } + + private insertChunk(tableName: string, rows: InsertRow[]) { + const now = new Date().toISOString(); + return this.pgPool` + INSERT INTO ${this.pgPool(tableName)} (code, data, priority, "dedupKey", "runAfter") + SELECT * FROM unnest( + ${rows.map((r) => r.code)}::smallint[], + ${rows.map((r) => (r.data != null ? JSON.stringify(r.data) : null))}::jsonb[], + ${rows.map((r) => r.priority ?? 0)}::smallint[], + ${rows.map((r) => r.dedupKey)}::text[], + ${rows.map((r) => r.runAfter?.toISOString() ?? now)}::timestamptz[] ) - .onConflict((oc) => - oc - .column('dedupKey') - .where('dedupKey', 'is not', null) - .where('status', '=', JobQueueStatus.Pending) - .doNothing(), - ) - .execute(); + ON CONFLICT ("dedupKey") WHERE "dedupKey" IS NOT NULL AND status = ${JobQueueStatus.Pending} + DO NOTHING + `; } } @@ -396,10 +419,4 @@ interface QueueWorkerOptions { onJob: (job: JobItem) => Promise; } -type PickByValue = { - [K in keyof T as T[K] extends V ? K : never]: T[K]; -}; - -type JobTables = PickByValue; - type Deferred = { promise: Promise; resolve: () => void; reject: (error: unknown) => void };