Node.js を使ったシェルスクリプティング
この本のオフライン版 (HTML、PDF、EPUB、MOBI) を購入して、無料のオンライン版をサポートすることができます。
(広告です。ブロックしないでください。)

9 ネイティブ Node.js ストリーム



この章では、Node のネイティブストリームを紹介します。ネイティブストリームは非同期イテレーションをサポートしており、操作が容易になります。本章では主にこれを使用します。

クロスプラットフォームの *Web ストリーム* については、§10 “Node.js での Web ストリームの使用” で説明します。本書では主にこれらを使用します。そのため、必要に応じて本章はスキップできます。

9.1 非同期イテレーションと非同期ジェネレーターのまとめ

非同期イテレーションは、データコンテナの内容を非同期的に取得するためのプロトコルです (つまり、アイテムを取得する前に現在の「タスク」が一時停止される可能性があります)。

非同期ジェネレーターは、非同期イテレーションに役立ちます。たとえば、これは非同期ジェネレーター関数です。

/**
 * @returns an asynchronous iterable
 */
async function* asyncGenerator(asyncIterable) {
  for await (const item of asyncIterable) { // input
    if (···) {
      yield '> ' + item; // output
    }
  }
}

本章の残りの部分では、関数が非同期関数なのか非同期ジェネレーター関数なのかに注意してください。

/** @returns a Promise */
async function asyncFunction() { /*···*/ }

/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }

9.2 ストリーム

ストリームは、大量のデータを「分割して征服する」という中心的なアイデアを持つパターンです。データを小さな部分に分割して一度に 1 つの部分を処理すれば、処理できます。

Node.js は、いくつかの種類のストリームをサポートしています。たとえば、

9.2.1 パイプライン

ストリーミングデータを複数の手順で処理するには、ストリームを *パイプライン* (接続) します。

  1. 入力は readable ストリームを介して受信されます。
  2. 各処理手順は、変換ストリームを介して実行されます。
  3. 最後の処理手順には、2 つのオプションがあります。
    • 最新の readable ストリームのデータを writable ストリームに書き込むことができます。つまり、writable ストリームはパイプラインの最後の要素です。
    • 最新の readable ストリームのデータを別の方法で処理できます。

パート (2) はオプションです。

9.2.2 テキストエンコーディング

テキストストリームを作成するときは、常にエンコーディングを指定することをお勧めします。

エンコーディングのデフォルト値は null で、これは 'utf8' と同等です。

9.2.3 ヘルパー関数: readableToString()

このヘルパー関数を時々使用します。どのように機能するかは理解する必要はなく、(おおまかに) 何をするかだけを理解すれば十分です。

import * as stream from 'stream';

/**
 * Reads all the text in a readable stream and returns it as a string,
 * via a Promise.
 * @param {stream.Readable} readable
 */
function readableToString(readable) {
  return new Promise((resolve, reject) => {
    let data = '';
    readable.on('data', function (chunk) {
      data += chunk;
    });
    readable.on('end', function () {
      resolve(data);
    });
    readable.on('error', function (err) {
      reject(err);
    });
  });
}

この関数は、イベントベースの API を介して実装されています。後ほど、これを行うためのより簡単な方法 (非同期イテレーションを使用) を紹介します。

9.2.4 予備的な解説

9.3 Readable ストリーム

9.3.1 Readable ストリームの作成

9.3.1.1 ファイルからの Readable ストリームの作成

fs.createReadStream() を使用して、readable ストリームを作成できます。

import * as fs from 'fs';

const readableStream = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});

assert.equal(
  await readableToString(readableStream),
  'This is a test!\n');
9.3.1.2 Readable.from(): イテラブルからの Readable ストリームの作成

静的メソッド Readable.from(iterable, options?) は、iterable に含まれるデータを保持する readable ストリームを作成します。iterable は、同期イテラブルまたは非同期イテラブルです。パラメーター options はオプションで、とりわけ、テキストエンコーディングを指定するために使用できます。

import * as stream from 'stream';

function* gen() {
  yield 'One line\n';
  yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
assert.equal(
  await readableToString(readableStream),
  'One line\nAnother line\n');
9.3.1.2.1 文字列からの Readable ストリームの作成

Readable.from() は任意のイテラブルを受け入れるため、文字列をストリームに変換するためにも使用できます。

import {Readable} from 'stream';

const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
assert.equal(
  await readableToString(readable),
  'Some text!');

現時点ではReadable.from() は文字列を他のイテラブルと同様に扱い、コードポイントを反復処理します。これはパフォーマンスの面では理想的ではありませんが、ほとんどのユースケースでは問題ないはずです。Readable.from() は文字列でよく使用されることが予想されるため、将来的に最適化される可能性があります。

9.3.2 for-await-of を使用した Readable ストリームからのチャンクの読み取り

すべての readable ストリームは非同期的に反復可能であるため、for-await-of ループを使用してその内容を読み取ることができます。

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'
9.3.2.1 Readable ストリームの内容を文字列に収集する

次の関数は、本章の冒頭で説明した関数のよりシンプルな再実装です。

import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

この場合、Promise を返す必要があったため、非同期関数を使用する必要がありました。

9.3.3 モジュール 'node:readlines' を使用した Readable ストリームからの行の読み取り

組み込みモジュール 'node:readline' を使用すると、readable ストリームから行を読み取ることができます。

import * as fs from 'node:fs';
import * as readline from 'node:readline/promises';

const filePath = process.argv[2]; // first command line argument

const rl = readline.createInterface({
  input: fs.createReadStream(filePath, {encoding: 'utf-8'}),
});
for await (const line of rl) {
  console.log('>', line);
}
rl.close();

9.4 非同期ジェネレーターによる Readable ストリームの変換

非同期イテレーションは、ストリーミングデータを複数の手順で処理するための変換ストリームに代わる洗練された方法を提供します。

要約すると、これらはそのような処理パイプラインの要素です。

readable
→ 最初の非同期ジェネレーター [→ … → 最後の非同期ジェネレーター]
→ readable または非同期関数

9.4.1 非同期イテラブルでチャンクを行番号付きの行に変換する

次の例では、説明したばかりの処理パイプラインの例を示します。

import {Readable} from 'stream';

/**
 * @param chunkIterable An asynchronous or synchronous iterable
 * over “chunks” (arbitrary strings)
 * @returns An asynchronous iterable over “lines”
 * (strings with at most one newline that always appears at the end)
 */
async function* chunksToLines(chunkIterable) {
  let previous = '';
  for await (const chunk of chunkIterable) {
    let startSearch = previous.length;
    previous += chunk;
    while (true) {
      // Works for EOL === '\n' and EOL === '\r\n'
      const eolIndex = previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // Line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

async function* numberLines(lineIterable) {
  let lineNumber = 1;
  for await (const line of lineIterable) {
    yield lineNumber + ' ' + line;
    lineNumber++;
  }
}

async function logLines(lineIterable) {
  for await (const line of lineIterable) {
    console.log(line);
  }
}

const chunks = Readable.from(
  'Text with\nmultiple\nlines.\n',
  {encoding: 'utf8'});
await logLines(numberLines(chunksToLines(chunks))); // (A)

// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'

処理パイプラインは行 A で設定されます。手順は次のとおりです。

観察

9.5 Writable ストリーム

9.5.1 ファイル用の Writable ストリームの作成

fs.createWriteStream() を使用して、writable ストリームを作成できます。

const writableStream = fs.createWriteStream(
  'tmp/log.txt', {encoding: 'utf8'});

9.5.2 Writable ストリームへの書き込み

このセクションでは、writable ストリームに書き込むためのアプローチについて説明します。

  1. メソッド .write() を介して writable ストリームに直接書き込みます。
  2. モジュール stream の関数 pipeline() を使用して、readable ストリームを writable ストリームにパイプします。

これらのアプローチをデモンストレーションするために、それらを使用して同じ関数 writeIterableToFile() を実装します。

readable ストリームのメソッド .pipe() もパイピングをサポートしていますが、欠点があり、使用しない方がよいでしょう。

9.5.2.1 writable.write(chunk)

ストリームにデータを書き込む場合、役立つ 2 つのコールバックベースのメカニズムがあります。

コールバックを呼び出します。次の例では、これらのメカニズムを Promise 化して、非同期関数で使用できるようにします。

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

stream.finished() のデフォルトバージョンはコールバックベースですが、util.promisify() を介して Promise ベースのバージョンに変換できます (行 A)。

次の 2 つのパターンを使用しました。

9.5.2.2 stream.pipeline() を使用した readable ストリームの writable ストリームへのパイピング

行 A では、stream.pipeline() の Promise 化されたバージョンを使用して、readable ストリーム readable を writable ストリーム writable にパイプします。

import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);

async function writeIterableToFile(iterable, filePath) {
  const readable = stream.Readable.from(
    iterable, {encoding: 'utf8'});
  const writable = fs.createWriteStream(filePath);
  await pipeline(readable, writable); // (A)
}
await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
// ···

メソッド readable.pipe() もパイピングをサポートしていますが、注意点 があります。readable がエラーを発生させた場合、writable は自動的に閉じられません。pipeline() にはその注意点がありません。

モジュール os

モジュール buffer

モジュール stream

モジュール fs

このセクションの静的型情報は、Definitely Typed に基づいています。

9.7 この章の参考文献と情報源