fix copy race

This commit is contained in:
mertalev
2026-02-14 03:32:59 -05:00
parent 6b04fa3f94
commit 839fb61340

View File

@@ -394,20 +394,24 @@ export class WriteBuffer {
}
private async copyInsert(tableName: string, rows: InsertRow[]) {
const writable = await this
.pgPool`COPY ${this.pgPool(tableName)} (code, data, priority, "runAfter") FROM STDIN WITH (FORMAT csv)`.writable();
const now = new Date().toISOString();
for (const row of rows) {
const data = row.data != null ? csvEscape(JSON.stringify(row.data)) : '';
const priority = row.priority ?? 0;
const runAfter = row.runAfter ? row.runAfter.toISOString() : now;
writable.write(`${row.code},${data},${priority},${runAfter}\n`);
const conn = await this.pgPool.reserve();
try {
const writable = await conn`COPY ${conn(tableName)} (code, data, priority, "runAfter") FROM STDIN WITH (FORMAT csv)`.writable();
const now = new Date().toISOString();
for (const row of rows) {
const data = row.data != null ? csvEscape(JSON.stringify(row.data)) : '';
const priority = row.priority ?? 0;
const runAfter = row.runAfter ? row.runAfter.toISOString() : now;
writable.write(`${row.code},${data},${priority},${runAfter}\n`);
}
writable.end();
await new Promise<void>((resolve, reject) => {
writable.on('finish', resolve);
writable.on('error', reject);
});
} finally {
conn.release();
}
writable.end();
await new Promise<void>((resolve, reject) => {
writable.on('finish', resolve);
writable.on('error', reject);
});
}
private insertChunk(tableName: string, rows: InsertRow[]) {