feat: ProcessRepository#createSpawnDuplexStream

This commit is contained in:
izzy
2025-11-18 16:51:13 +00:00
parent 7134dd29ca
commit 9cea3d7b2f

View File

@@ -1,9 +1,109 @@
import { Injectable } from '@nestjs/common';
import { ChildProcessWithoutNullStreams, spawn, SpawnOptionsWithoutStdio } from 'node:child_process';
import { Duplex } from 'node:stream';
@Injectable()
export class ProcessRepository {
spawn(command: string, args: readonly string[], options?: SpawnOptionsWithoutStdio): ChildProcessWithoutNullStreams {
spawn(command: string, args?: readonly string[], options?: SpawnOptionsWithoutStdio): ChildProcessWithoutNullStreams {
return spawn(command, args, options);
}
createSpawnDuplexStream(command: string, args?: readonly string[], options?: SpawnOptionsWithoutStdio): Duplex {
let stdinClosed = false;
let drainCallback: undefined | (() => void);
const process = this.spawn(command, args, options);
const duplex = new Duplex({
// duplex -> stdin
write(chunk, encoding, callback) {
// drain the input if process dies
if (stdinClosed) {
return callback();
}
// handle stream backpressure
if (process.stdin.write(chunk, encoding)) {
callback();
} else {
drainCallback = callback;
process.stdin.once('drain', () => {
drainCallback = undefined;
callback();
});
}
},
read() {
// no-op
},
final(callback) {
if (stdinClosed) {
callback();
} else {
process.stdin.end(callback);
}
},
});
// stdout -> duplex
process.stdout.on('data', (chunk) => {
// handle stream backpressure
if (!duplex.push(chunk)) {
process.stdout.pause();
}
});
duplex.on('resume', () => process.stdout.resume());
// end handling
let stdoutClosed = false;
function close(error?: Error) {
stdinClosed = true;
if (error) {
duplex.destroy(error);
} else if (stdoutClosed && typeof process.exitCode === 'number') {
duplex.push(null);
}
}
process.stdout.on('close', () => {
stdoutClosed = true;
close();
});
// error handling
process.on('error', close);
process.stdout.on('error', close);
process.stdin.on('error', (error) => {
if ((error as { code?: 'EPIPE' })?.code === 'EPIPE') {
try {
drainCallback!();
} catch (error) {
close(error as Error);
}
} else {
close(error);
}
});
let stderr = '';
process.stderr.on('data', (chunk) => (stderr += chunk));
process.on('exit', (code) => {
console.info(`${command} exited (${code})`);
if (code !== 0) {
close(new Error(`${command} non-zero exit code (${code})\n${stderr}`));
} else {
close();
}
});
// attach _process to Duplex for testing suite
(duplex as never as { _process: ChildProcessWithoutNullStreams })._process = process;
return duplex;
}
}