diff --git a/server/src/utils/job-queue.util.ts b/server/src/utils/job-queue.util.ts index 85f29daff7..47419f2b30 100644 --- a/server/src/utils/job-queue.util.ts +++ b/server/src/utils/job-queue.util.ts @@ -394,20 +394,24 @@ export class WriteBuffer { } private async copyInsert(tableName: string, rows: InsertRow[]) { - const writable = await this - .pgPool`COPY ${this.pgPool(tableName)} (code, data, priority, "runAfter") FROM STDIN WITH (FORMAT csv)`.writable(); - const now = new Date().toISOString(); - for (const row of rows) { - const data = row.data != null ? csvEscape(JSON.stringify(row.data)) : ''; - const priority = row.priority ?? 0; - const runAfter = row.runAfter ? row.runAfter.toISOString() : now; - writable.write(`${row.code},${data},${priority},${runAfter}\n`); + const conn = await this.pgPool.reserve(); + try { + const writable = await conn`COPY ${conn(tableName)} (code, data, priority, "runAfter") FROM STDIN WITH (FORMAT csv)`.writable(); + const now = new Date().toISOString(); + for (const row of rows) { + const data = row.data != null ? csvEscape(JSON.stringify(row.data)) : ''; + const priority = row.priority ?? 0; + const runAfter = row.runAfter ? row.runAfter.toISOString() : now; + writable.write(`${row.code},${data},${priority},${runAfter}\n`); + } + writable.end(); + await new Promise((resolve, reject) => { + writable.on('finish', resolve); + writable.on('error', reject); + }); + } finally { + conn.release(); } - writable.end(); - await new Promise((resolve, reject) => { - writable.on('finish', resolve); - writable.on('error', reject); - }); } private insertChunk(tableName: string, rows: InsertRow[]) {