在 Node.js 中通过子进程操作标准输入/输出

翻译:疯狂的技术宅 原文:http://2ality.com/2018/05/child-process-streams.html

在本文中,我们在 Node.js 中把 shell 命令作为子进程运行。然后异步读取这些进程的 stdout 并写入其 stdin。

在子进程中运行 shell 命令

首先从在子进程中运行 shell 命令开始:

const {onExit} = require('@rauschma/stringio');
const {spawn} = require('child_process');

async function main() {
  const filePath = process.argv[2];
  console.log('INPUT: '+filePath);

  const childProcess = spawn('cat', [filePath],
    {stdio: [process.stdin, process.stdout, process.stderr]}); // (A)

  await onExit(childProcess); // (B)

  console.log('### DONE');
}
main();

解释:

  • 我们用了 spawn(),它可以使我们在命令运行时访问命令的 stdin,stdout 和 stderr。
    • 在 A 行中,我们将子进程的 stdin 连接到当前进程的 stdin。
    • B 行等待该过程完成。

等待子进程通过 Promise 退出

函数 onExit()如下所示。

function onExit(childProcess: ChildProcess): Promise<void> {
  return new Promise((resolve, reject) => {
    childProcess.once('exit', (code: number, signal: string) => {
      if (code === 0) {
        resolve(undefined);
      } else {
        reject(new Error('Exit with error code: '+code));
      }
    });
    childProcess.once('error', (err: Error) => {
      reject(err);
    });
  });
}

子进程的实现

以下代码用 @rauschma/stringio 异步写入以 shell 命令运行的子进程的 stdin

const {streamWrite, streamEnd, onExit} = require('@rauschma/stringio');
const {spawn} = require('child_process');

async function main() {
  const sink = spawn('cat', [],
    {stdio: ['pipe', process.stdout, process.stderr]}); // (A)

  writeToWritable(sink.stdin); // (B)
  await onExit(sink);

  console.log('### DONE');
}
main();

async function writeToWritable(writable) {
  await streamWrite(writable, 'First line\n');
  await streamWrite(writable, 'Second line\n');
  await streamEnd(writable);
}

我们为 shell 命令生成一个名为 sink 的独立进程。用 writeToWritable 写入 sink.stdin。它借助 await 异步执行并暂停,以避免缓冲区被消耗太多。 解释:

  • 在A行中,我们告诉 spawn() 通过 sink.stdin'pipe')访问 stdin。 stdout 和 stderr 被转发到 process.stdinprocess.stderr,如前面所述。
  • 在B行中不会 await 写完成。而是 await 子进程 sink 完成。

接下来了解 streamWrite() 的工作原理。

写流操作的 promise

Node.js 写流的操作通常涉及回调(参见文档)。代码如下。

function streamWrite(
  stream: Writable,
  chunk: string|Buffer|Uint8Array,
  encoding='utf8'): Promise<void> {
    return new Promise((resolve, reject) => {
      const errListener = (err: Error) => {
        stream.removeListener('error', errListener);
        reject(err);
      };
      stream.addListener('error', errListener);
      const callback = () => {
        stream.removeListener('error', errListener);
        resolve(undefined);
      };
      stream.write(chunk, encoding, callback);
    });
}

streamEnd()的工作方式是类似的。

从子进程中读取数据

下面的代码使用异步迭代(C行)来读取子进程的 stdout 中的内容:

const {chunksToLinesAsync, chomp} = require('@rauschma/stringio');
const {spawn} = require('child_process');

async function main() {
  const filePath = process.argv[2];
  console.log('INPUT: '+filePath);

  const source = spawn('cat', [filePath],
    {stdio: ['ignore', 'pipe', process.stderr]}); // (A)

  await echoReadable(source.stdout); // (B)

  console.log('### DONE');
}
main();

async function echoReadable(readable) {
  for await (const line of chunksToLinesAsync(readable)) { // (C)
    console.log('LINE: '+chomp(line))
  }
}

解释:

  • A行:我们忽略 stdin,希望通过流访问 stdout 并将 stderr 转发到process.stderr
  • B行:开始 awat 直到 echoReadable() 完成。没有这个 awaitDONE 将会在调用 source.stdout 之前被输出。

在子进程之间进行管道连接

在下面的例子中,函数transform() 将会:

  • source 子进程的 stdout 中读取内容。
    • 将内容写入 sink 子进程的 stdin

换句话说,我们正在实现类似 Unix 管道的功能:

cat someFile.txt | transform() | cat

这是代码:

const {chunksToLinesAsync, streamWrite, streamEnd, onExit}
  = require('@rauschma/stringio');
const {spawn} = require('child_process');

async function main() {
  const filePath = process.argv[2];
  console.log('INPUT: '+filePath);

  const source = spawn('cat', [filePath],
    {stdio: ['ignore', 'pipe', process.stderr]});
  const sink = spawn('cat', [],
    {stdio: ['pipe', process.stdout, process.stderr]});

  transform(source.stdout, sink.stdin);
  await onExit(sink);

  console.log('### DONE');
}
main();

async function transform(readable, writable) {
  for await (const line of chunksToLinesAsync(readable)) {
    await streamWrite(writable, '@ '+line);
  }
  await streamEnd(writable);
}

扩展阅读