Node.jsによるシェルスクリプティング
この本のオフライン版(HTML、PDF、EPUB、MOBI)を購入して、無料のオンライン版をサポートすることができます。
(広告です。ブロックしないでください。)

10 Node.jsでのWebストリームの使用



Webストリーム は、すべての主要なWebプラットフォーム(Webブラウザ、Node.js、およびDeno)で現在サポートされている_ストリーム_の標準です。(ストリームとは、あらゆる種類のソース(ファイル、サーバーでホストされているデータなど)からデータを順次、小さな単位で読み書きするための抽象化です。)

たとえば、グローバル関数 fetch()(オンラインリソースをダウンロードする)は、Webストリームを持つプロパティ .body を持つレスポンスを非同期的に返します。

この章では、Node.jsのWebストリームについて説明しますが、ここで学ぶことのほとんどは、WebストリームをサポートするすべてのWebプラットフォームに適用されます。

10.1 Webストリームとは?

まずは、Webストリームの基本について概要を説明します。その後、すぐに例に移ります。

ストリームとは、次のようなデータにアクセスするためのデータ構造です。

ストリームの利点の2つは次のとおりです。

Webストリーム(「Web」は省略されることが多い)は、Webブラウザで生まれた比較的新しい標準ですが、現在ではNode.jsとDenoでもサポートされています(このMDN互換性テーブルに示されています)。

Webストリームでは、チャンクは通常次のいずれかです。

10.1.1 ストリームの種類

Webストリームには、主に3種類あります。

ReadableStream、WritableStream、TransformStreamは、テキストまたはバイナリデータの転送に使用できます。この章では、主に前者を行います。バイナリデータ用の_バイトストリーム_については、最後に簡単に説明します。

10.1.2 パイプチェーン

_パイプ_とは、ReadableStreamをWritableStreamに_パイプ_できる操作です。ReadableStreamがデータを生成している限り、この操作はそのデータを読み取り、WritableStreamに書き込みます。2つのストリームだけを接続する場合、ある場所から別の場所にデータを転送する便利な方法が得られます(たとえば、ファイルをコピーする場合)。ただし、2つ以上のストリームを接続して、さまざまな方法でデータを処理できる_パイプチェーン_を取得することもできます。これは、パイプチェーンの例です。

ReadableStreamは、前者を後者の書き込み側にパイプすることでTransformStreamに接続されます。同様に、TransformStreamは、前者の読み取り側を後者の書き込み側にパイプすることで別のTransformStreamに接続されます。また、TransformStreamは、前者の読み取り側を後者にパイプすることでWritableStreamに接続されます。

10.1.3 バックプレッシャー

パイプチェーンの問題の1つは、メンバーが現在処理できる以上のデータを受け取る可能性があることです。 _バックプレッシャー_は、この問題を解決するための手法です。データの受信者が送信者に、受信者が過負荷にならないようにデータの送信を一時的に停止するよう指示できます。

バックプレッシャーを別の見方をすれば、過負荷になっているメンバーからチェーンの先頭まで、パイプチェーンを逆方向に伝わる信号と考えることができます。例として、次のパイプチェーンを考えてみましょう。

ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream

バックプレッシャーはこのチェーンを次のように伝播します。

パイプチェーンの先頭に到達しました。そのため、ReadableStream内にはデータが蓄積されず(これもバッファリングされます)、WriteableStreamは回復する時間があります。回復すると、データを受信できる状態になったことを通知します。この信号もチェーンを逆方向に伝播し、ReadableStreamに到達するとデータ処理が再開されます。

バックプレッシャーのこの最初の説明では、理解を容易にするためにいくつかの詳細が省略されています。これらについては、後ほど説明します。

10.1.4 Node.jsでのWebストリームのサポート

Node.jsでは、Webストリームは2つのソースから利用できます。

現時点では、Node.jsでWebストリームを直接サポートしているAPIは1つだけです - Fetch API

const response = await fetch('https://example.com');
const readableStream = response.body;

他の場合は、モジュール 'node:stream' の次の静的メソッドのいずれかを使用して、Node.jsストリームをWebストリームに変換するか、その逆を行う必要があります。

Webストリームを部分的にサポートするAPIがもう1つあります。FileHandlesには、.readableWebStream()メソッドがあります。

10.2 ReadableStreamからの読み取り

ReadableStreamを使用すると、さまざまなソースからデータのチャンクを読み取ることができます。ReadableStreamには、次の型があります(この型とそのプロパティの説明は、ざっと読んでください。例で出てきたときに改めて説明します)。

interface ReadableStream<TChunk> {
  getReader(): ReadableStreamDefaultReader<TChunk>;
  readonly locked: boolean;
  [Symbol.asyncIterator](): AsyncIterator<TChunk>;

  cancel(reason?: any): Promise<void>;

  pipeTo(
    destination: WritableStream<TChunk>,
    options?: StreamPipeOptions
  ): Promise<void>;
  pipeThrough<TChunk2>(
    transform: ReadableWritablePair<TChunk2, TChunk>,
    options?: StreamPipeOptions
  ): ReadableStream<TChunk2>;
  
  // Not used in this chapter:
  tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];
}

interface StreamPipeOptions {
  signal?: AbortSignal;
  preventClose?: boolean;
  preventAbort?: boolean;
  preventCancel?: boolean;
}

これらのプロパティの説明

次のサブセクションでは、ReadableStreamを消費する3つの方法について説明します。

10.2.1 リーダーによるReadableStreamの消費

_リーダー_を使用して、ReadableStreamからデータを読み取ることができます。リーダーには、次の型があります(この型とそのプロパティの説明は、ざっと読んでください。例で出てきたときに改めて説明します)。

interface ReadableStreamGenericReader {
  readonly closed: Promise<undefined>;
  cancel(reason?: any): Promise<void>;
}
interface ReadableStreamDefaultReader<TChunk>
  extends ReadableStreamGenericReader
{
  releaseLock(): void;
  read(): Promise<ReadableStreamReadResult<TChunk>>;
}

interface ReadableStreamReadResult<TChunk> {
  done: boolean;
  value: TChunk | undefined;
}

これらのプロパティの説明

ReadableStreamReadResult は、イテレーションの仕組みをご存知であれば、馴染みがあるかもしれません。ReadableStream は iterable に、Reader は iterator に、ReadableStreamReadResult は iterator メソッド .next() が返すオブジェクトに似ています。

次のコードは、Reader を使用するためのプロトコルを示しています。

const reader = readableStream.getReader(); // (A)
assert.equal(readableStream.locked, true); // (B)
try {
  while (true) {
    const {done, value: chunk} = await reader.read(); // (C)
    if (done) break;
    // Use `chunk`
  }
} finally {
  reader.releaseLock(); // (D)
}

Reader の取得。 readableStream から直接読み取ることはできません。最初に *Reader* を取得する必要があります(行 A)。各 ReadableStream は、最大で 1 つの Reader を持つことができます。Reader が取得されると、readableStream はロックされます(行 B)。.getReader() を再び呼び出す前に、.releaseLock() を呼び出す必要があります(行 D)。

チャンクの読み取り。 .read() は、.done.value プロパティを持つオブジェクトの Promise を返します(行 C)。最後のチャンクが読み取られた後、.donetrue になります。このアプローチは、JavaScript の非同期イテレーションの仕組みと似ています。

10.2.1.1 例: ReadableStream を介したファイルの読み取り

次の例では、テキストファイル data.txt からチャンク(文字列)を読み取ります。

import * as fs from 'node:fs';
import {Readable} from 'node:stream';

const nodeReadable = fs.createReadStream(
  'data.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable); // (A)

const reader = webReadableStream.getReader();
try {
  while (true) {
    const {done, value} = await reader.read();
    if (done) break;
    console.log(value);
  }
} finally {
  reader.releaseLock();
}
// Output:
// 'Content of text file\n'

Node.js の Readable を Web の ReadableStream に変換しています(行 A)。そして、前述のプロトコルを使用してチャンクを読み取ります。

10.2.1.2 例: ReadableStream の内容で文字列を組み立てる

次の例では、ReadableStream のすべてのチャンクを連結して文字列にし、それを返します。

/**
 * Returns a string with the contents of `readableStream`.
 */
async function readableStreamToString(readableStream) {
  const reader = readableStream.getReader();
  try {
    let result = '';
    while (true) {
      const {done, value} = await reader.read();
      if (done) {
        return result; // (A)
      }
      result += value;
    }
  } finally {
    reader.releaseLock(); // (B)
  }
}

便利なことに、finally 句は、try 句をどのように抜けても常に実行されます。つまり、結果を返す場合(行 A)、ロックは正しく解放されます(行 B)。

10.2.2 非同期イテレーションによる ReadableStream の消費

ReadableStream は、非同期イテレーションを介して消費することもできます。

const iterator = readableStream[Symbol.asyncIterator]();
let exhaustive = false;
try {
  while (true) {
    let chunk;
    ({done: exhaustive, value: chunk} = await iterator.next());
    if (exhaustive) break;
    console.log(chunk);
  }
} finally {
  // If the loop was terminated before we could iterate exhaustively
  // (via an exception or `return`), we must call `iterator.return()`.
  // Check if that was the case.
  if (!exhaustive) {
    iterator.return();
  }
}

ありがたいことに、for-await-of ループは、非同期イテレーションのすべての詳細を処理してくれます。

for await (const chunk of readableStream) {
  console.log(chunk);
}
10.2.2.1 例: 非同期イテレーションを使用してストリームを読み取る

ファイルからテキストを読み取る以前の試みをやり直してみましょう。今回は、Reader の代わりに非同期イテレーションを使用します。

import * as fs from 'node:fs';
import {Readable} from 'node:stream';

const nodeReadable = fs.createReadStream(
  'text-file.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable);
for await (const chunk of webReadableStream) {
  console.log(chunk);
}
// Output:
// 'Content of text file'
10.2.2.2 例: ReadableStream の内容で文字列を組み立てる

以前は、Reader を使用して ReadableStream の内容で文字列を組み立てました。非同期イテレーションを使用すると、コードがよりシンプルになります。

/**
 * Returns a string with the contents of `readableStream`.
 */
async function readableStreamToString2(readableStream) {
  let result = '';
  for await (const chunk of readableStream) {
    result += chunk;
  }
  return result;
}
10.2.2.3 注意点: ブラウザは ReadableStream に対する非同期イテレーションをサポートしていません

現時点では、Node.js と Deno は ReadableStream に対する非同期イテレーションをサポートしていますが、Web ブラウザはサポートしていません。バグレポートへのリンクを含むGitHub issueがあります。

ブラウザで非同期イテレーションがどのようにサポートされるかはまだ完全には明らかではないため、ポリフィルよりもラッピングの方が安全な選択肢です。次のコードは、Chromium のバグレポートの提案に基づいています。

async function* getAsyncIterableFor(readableStream) {
  const reader = readableStream.getReader();
  try {
    while (true) {
      const {done, value} = await reader.read();
      if (done) return;
      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}

10.2.3 ReadableStream を WritableStream にパイプする

ReadableStream には、パイプするための 2 つのメソッドがあります。

10.3 ラッピングによってデータソースを ReadableStream に変換する

外部ソースを ReadableStream を介して読み取る場合、アダプターオブジェクトにラップして、そのオブジェクトを ReadableStream コンストラクターに渡すことができます。アダプターオブジェクトは、ReadableStream の *基になるソース* と呼ばれます(キューイング戦略については、バックプレッシャーについて詳しく見ていく際に後述します)。

new ReadableStream(underlyingSource?, queuingStrategy?)

これは、基になるソースの型です(この型とそのプロパティの説明は、例で encountered するまで読み飛ばしても構いません)。

interface UnderlyingSource<TChunk> {
  start?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;
  pull?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;
  cancel?(reason?: any): void | Promise<void>;

  // Only used in byte streams and ignored in this section:
  type: 'bytes' | undefined;
  autoAllocateChunkSize: bigint;
}

ReadableStream がこれらのメソッドを呼び出すのは、以下の場合です。

これらのメソッドはそれぞれ Promise を返すことができ、Promise が解決されるまでそれ以上のステップは実行されません。これは、非同期で何かを実行したい場合に便利です。

.start().pull() のパラメーター controller によって、ストリームにアクセスできます。以下の型を持ちます。

type ReadableStreamController<TChunk> =
  | ReadableStreamDefaultController<TChunk>
  | ReadableByteStreamController<TChunk> // ignored here
;

interface ReadableStreamDefaultController<TChunk> {
  enqueue(chunk?: TChunk): void;
  readonly desiredSize: number | null;
  close(): void;
  error(err?: any): void;
}

今のところ、チャンクは文字列です。後ほど、Uint8Array が一般的なバイトストリームについて説明します。メソッドの動作は以下のとおりです。

10.3.1 基になるソースを実装する最初の例

基になるソースを実装する最初の例では、メソッド .start() のみを提供します。.pull() のユースケースについては、次のサブセクションで説明します。

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('First line\n'); // (A)
    controller.enqueue('Second line\n'); // (B)
    controller.close(); // (C)
  },
});
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'First line\n'
// 'Second line\n'

コントローラーを使用して、2 つのチャンクを持つストリームを作成します(行 A と行 B)。ストリームを閉じる(行 C)ことが重要です。そうでないと、for-await-of ループは永遠に終了しません!

このエンキュー方法は完全に安全ではないことに注意してください。内部キューの容量を超えるリスクがあります。そのリスクを回避する方法については、すぐに説明します。

10.3.2 ReadableStream を使用してプッシュソースまたはプルソースをラップする

一般的なシナリオは、プッシュソースまたはプルソースを ReadableStream に変換することです。ソースがプッシュかプルかによって、UnderlyingSource で ReadableStream にどのようにフックするかが決まります。

次に、両方の種類のソースの例を示します。

10.3.2.1 例: バックプレッシャーサポート付きのプッシュソースから ReadableStream を作成する

次の例では、ソケットの周りに ReadableStream をラップします。ソケットはデータをプッシュします(呼び出します)。この例は、Web ストリームの仕様から引用したものです。

function makeReadableBackpressureSocketStream(host, port) {
  const socket = createBackpressureSocket(host, port);

  return new ReadableStream({
    start(controller) {
      socket.ondata = event => {
        controller.enqueue(event.data);

        if (controller.desiredSize <= 0) {
          // The internal queue is full, so propagate
          // the backpressure signal to the underlying source.
          socket.readStop();
        }
      };

      socket.onend = () => controller.close();
      socket.onerror = () => controller.error(
        new Error('The socket errored!'));
    },

    pull() {
      // This is called if the internal queue has been emptied, but the
      // stream’s consumer still wants more data. In that case, restart
      // the flow of data if we have previously paused it.
      socket.readStart();
    },

    cancel() {
      socket.close();
    },
  });
}
10.3.2.2 例: プルソースから ReadableStream を作成する

ツール関数 iterableToReadableStream() は、チャンクの iterable を受け取り、ReadableStream に変換します。

/**
 * @param iterable an iterable (asynchronous or synchronous)
 */
 function iterableToReadableStream(iterable) {
  return new ReadableStream({
    start() {
      if (typeof iterable[Symbol.asyncIterator] === 'function') {
        this.iterator = iterable[Symbol.asyncIterator]();
      } else if (typeof iterable[Symbol.iterator] === 'function') {
        this.iterator = iterable[Symbol.iterator]();
      } else {
        throw new Error('Not an iterable: ' + iterable);
      }
    },

    async pull(controller) {
      if (this.iterator === null) return;
      // Sync iterators return non-Promise values,
      // but `await` doesn’t mind and simply passes them on
      const {value, done} = await this.iterator.next();
      if (done) {
        this.iterator = null;
        controller.close();
        return;
      }
      controller.enqueue(value);
    },

    cancel() {
      this.iterator = null;
      controller.close();
    },
  });
}

非同期ジェネレーター関数を使用して非同期 iterable を作成し、その iterable を ReadableStream に変換してみましょう。

async function* genAsyncIterable() {
  yield 'how';
  yield 'are';
  yield 'you';
}
const readableStream = iterableToReadableStream(genAsyncIterable());
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'how'
// 'are'
// 'you'

iterableToReadableStream() は、同期 iterable でも機能します。

const syncIterable = ['hello', 'everyone'];
const readableStream = iterableToReadableStream(syncIterable);
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'hello'
// 'everyone'

最終的には、この機能を提供する静的ヘルパーメソッド ReadableStream.from() が存在する可能性があります(詳細については、プルリクエストを参照してください)。

10.4 WritableStream への書き込み

WritableStream を使用すると、さまざまなシンクにデータチャンクを書き込むことができます。以下の型を持ちます(この型とそのプロパティの説明は、例で encountered するまで読み飛ばしても構いません)。

interface WritableStream<TChunk> {
  getWriter(): WritableStreamDefaultWriter<TChunk>;
  readonly locked: boolean;

  close(): Promise<void>;
  abort(reason?: any): Promise<void>;
}

これらのプロパティの説明

以下のサブセクションでは、WritableStream にデータを送信する 2 つのアプローチについて説明します。

10.4.1 Writer を介した WritableStream への書き込み

*Writer* を使用して WritableStream に書き込むことができます。以下の型を持ちます(この型とそのプロパティの説明は、例で encountered するまで読み飛ばしても構いません)。

interface WritableStreamDefaultWriter<TChunk> {
  readonly desiredSize: number | null;
  readonly ready: Promise<undefined>;
  write(chunk?: TChunk): Promise<void>;
  releaseLock(): void;

  close(): Promise<void>;
  readonly closed: Promise<undefined>;
  abort(reason?: any): Promise<void>;
}

これらのプロパティの説明

以下のコードは、Writer を使用するためのプロトコルを示しています。

const writer = writableStream.getWriter(); // (A)
assert.equal(writableStream.locked, true); // (B)
try {
  // Writing the chunks (explained later)
} finally {
  writer.releaseLock(); // (C)
}

writableStream に直接書き込むことはできません。最初に *Writer* を取得する必要があります(行 A)。各 WritableStream は、最大で 1 つの Writer を持つことができます。Writer が取得された後、writableStream はロックされます(行 B)。.getWriter() を再度呼び出す前に、.releaseLock() を呼び出す必要があります(行 C)。

チャンクを書き込むには、3 つの方法があります。

10.4.1.1 書き込み方法 1: .write() を待つ(逆圧力の処理が非効率的)

最初の書き込み方法は、.write() の各結果を待つことです。

await writer.write('Chunk 1');
await writer.write('Chunk 2');
await writer.close();

.write() によって返される Promise は、渡されたチャンクが正常に書き込まれたときに履行されます。「正常に書き込まれた」とはどういう意味か正確には、WritableStream の実装方法によって異なります。たとえば、ファイルストリームでは、チャンクはオペレーティングシステムに送信されたものの、まだキャッシュに存在しており、実際にはディスクに書き込まれていない可能性があります。

.close() によって返される Promise は、ストリームが閉じられたときに履行されます。

この書き込み方法の欠点は、書き込みが成功するまで待つと、キューが使用されないことです。結果として、データスループットが低下する可能性があります。

10.4.1.2 書き込み方法 2: .write() の拒否を無視する(逆圧力を無視する)

2 番目の書き込み方法では、.write() によって返される Promise を無視し、.close() によって返される Promise のみ ​​を待ちます。

writer.write('Chunk 1').catch(() => {}); // (A)
writer.write('Chunk 2').catch(() => {}); // (B)
await writer.close(); // reports errors

.write() の同期呼び出しは、チャンクを WritableStream の内部キューに追加します。返された Promise を待たないことで、各チャンクが書き込まれるまで待ちません。ただし、.close() を待つことで、続行する前にキューが空になり、すべての書き込みが成功したことを保証します。

行 A と行 B で .catch() を呼び出すことは、書き込み中に問題が発生した場合に、処理されない Promise の拒否に関する警告を回避するために必要です。このような警告は、多くの場合、コンソールに記録されます。.close() もエラーを報告するため、.write() によって報告されたエラーは無視できます。

Promise の拒否を無視するヘルパー関数を使用することで、上記のコードを改善できます。

ignoreRejections(
  writer.write('Chunk 1'),
  writer.write('Chunk 2'),
);
await writer.close(); // reports errors

function ignoreRejections(...promises) {
  for (const promise of promises) {
    promise.catch(() => {});
  }
}

このアプローチの 1 つの欠点は、逆圧力が無視されることです。キューは書き込むすべてのものを保持するのに十分な大きさであると単純に想定しています。

10.4.1.3 書き込み方法 3: .ready を待つ(逆圧力を効率的に処理する)

この書き込み方法では、Writer ゲッター .ready を待つことで、逆圧力を効率的に処理します。

await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 1').catch(() => {});

await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 2').catch(() => {});

await writer.close(); // reports errors

.ready の Promise は、ストリームが逆圧力のある状態から逆圧力のない状態に移行するたびに履行されます。

10.4.1.4 例: Writer を介したファイルへの書き込み

この例では、WritableStream を介してテキストファイル data.txt を作成します。

import * as fs from 'node:fs';
import {Writable} from 'node:stream';

const nodeWritable = fs.createWriteStream(
  'new-file.txt', {encoding: 'utf-8'}); // (A)
const webWritableStream = Writable.toWeb(nodeWritable); // (B)

const writer = webWritableStream.getWriter();
try {
  await writer.write('First line\n');
  await writer.write('Second line\n');
  await writer.close();
} finally {
  writer.releaseLock()
}

行 A では、ファイル data.txt の Node.js ストリームを作成します。行 B では、このストリームを Web ストリームに変換します。次に、Writer を使用して文字列を書き込みます。

10.4.2 WritableStream へのパイプ

Writer を使用する代わりに、ReadableStream をパイプすることで WritableStream に書き込むこともできます。

await readableStream.pipeTo(writableStream);

.pipeTo() によって返される Promise は、パイプが正常に完了すると履行されます。

10.4.2.1 パイプは非同期的に発生する

パイプは、現在のタスクが完了または一時停止した後に実行されます。次のコードはそれを示しています。

const readableStream = new ReadableStream({ // (A)
  start(controller) {
    controller.enqueue('First line\n');
    controller.enqueue('Second line\n');
    controller.close();
  },
});
const writableStream = new WritableStream({ // (B)
  write(chunk) {
    console.log('WRITE: ' + JSON.stringify(chunk));
  },
  close() {
    console.log('CLOSE WritableStream');
  },
});


console.log('Before .pipeTo()');
const promise = readableStream.pipeTo(writableStream); // (C)
promise.then(() => console.log('Promise fulfilled'));
console.log('After .pipeTo()');

// Output:
// 'Before .pipeTo()'
// 'After .pipeTo()'
// 'WRITE: "First line\n"'
// 'WRITE: "Second line\n"'
// 'CLOSE WritableStream'
// 'Promise fulfilled'

行 A では ReadableStream を作成します。行 B では WritableStream を作成します。

.pipeTo()(行 C)がすぐに返されることがわかります。新しいタスクでは、チャンクが読み書きされます。次に、writableStream が閉じられ、最後に promise が履行されます。

10.4.2.2 例: ファイルの WritableStream へのパイプ

次の例では、ファイルの WritableStream を作成し、ReadableStream をパイプします。

const webReadableStream = new ReadableStream({ // (A)
  async start(controller) {
    controller.enqueue('First line\n');
    controller.enqueue('Second line\n');
    controller.close();
  },
});

const nodeWritable = fs.createWriteStream( // (B)
  'data.txt', {encoding: 'utf-8'});
const webWritableStream = Writable.toWeb(nodeWritable); // (C)

await webReadableStream.pipeTo(webWritableStream); // (D)

行 A では、ReadableStream を作成します。行 B では、ファイル data.txt の Node.js ストリームを作成します。行 C では、このストリームを Web ストリームに変換します。行 D では、webReadableStream をファイルの WritableStream にパイプします。

10.4.2.3 例: 2 つの ReadableStream を 1 つの WritableStream に書き込む

次の例では、2 つの ReadableStream を 1 つの WritableStream に書き込みます。

function createReadableStream(prefix) {
  return new ReadableStream({
    async start(controller) {
      controller.enqueue(prefix + 'chunk 1');
      controller.enqueue(prefix + 'chunk 2');
      controller.close();
    },
  });
}

const writableStream = new WritableStream({
  write(chunk) {
    console.log('WRITE ' + JSON.stringify(chunk));
  },
  close() {
    console.log('CLOSE');
  },
  abort(err) {
    console.log('ABORT ' + err);
  },
});

await createReadableStream('Stream 1: ')
  .pipeTo(writableStream, {preventClose: true}); // (A)
await createReadableStream('Stream 2: ')
  .pipeTo(writableStream, {preventClose: true}); // (B)
await writableStream.close();

// Output
// 'WRITE "Stream 1: chunk 1"'
// 'WRITE "Stream 1: chunk 2"'
// 'WRITE "Stream 2: chunk 1"'
// 'WRITE "Stream 2: chunk 2"'
// 'CLOSE'

ReadableStream が閉じられた後、WritableStream を閉じないように .pipeTo() に指示します(行 A と行 B)。したがって、WritableStream は行 A の後も開いたままで、別の ReadableStream をパイプできます。

10.5 データシンクをラッピングによって WritableStream に変換する

WritableStream を介して外部シンクに書き込む場合は、アダプターオブジェクトにラップして、そのオブジェクトを WritableStream コンストラクターに渡すことができます。アダプターオブジェクトは、WritableStream の *基盤となるシンク* と呼ばれます(キューイング戦略については、後で逆圧力を詳しく調べるときに説明します)。

new WritableStream(underlyingSink?, queuingStrategy?)

これは、基盤となるシンクのタイプです(このタイプとそのプロパティの説明はざっと読んでください。例で encountered するときに再び説明します)。

interface UnderlyingSink<TChunk> {
  start?(
    controller: WritableStreamDefaultController
  ): void | Promise<void>;
  write?(
    chunk: TChunk,
    controller: WritableStreamDefaultController
  ): void | Promise<void>;
  close?(): void | Promise<void>;;
  abort?(reason?: any): void | Promise<void>;
}

これらのプロパティの説明

.start().write() のパラメーター controller を使用すると、WritableStream でエラーが発生します。次のタイプがあります。

interface WritableStreamDefaultController {
  readonly signal: AbortSignal;
  error(err?: any): void;
}

10.5.1 例: ReadableStream のトレース

次の例では、ReadableStream がチャンクをどのように生成するかを確認するために、ReadableStream を WritableStream にパイプします。

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('First chunk');
    controller.enqueue('Second chunk');
    controller.close();
  },
});
await readableStream.pipeTo(
  new WritableStream({
    write(chunk) {
      console.log('WRITE ' + JSON.stringify(chunk));
    },
    close() {
      console.log('CLOSE');
    },
    abort(err) {
      console.log('ABORT ' + err);
    },
  })
);
// Output:
// 'WRITE "First chunk"'
// 'WRITE "Second chunk"'
// 'CLOSE'

10.5.2 例: WriteStream に書き込まれたチャンクを文字列に収集する

次の例では、書き込まれたすべてのチャンクを文字列に収集する WriteStream のサブクラスを作成します。メソッド .getString() を介してその文字列にアクセスできます。

class StringWritableStream extends WritableStream {
  #string = '';
  constructor() {
    super({
      // We need to access the `this` of `StringWritableStream`.
      // Hence the arrow function (and not a method).
      write: (chunk) => {
        this.#string += chunk;
      },
    });
  }
  getString() {
    return this.#string;
  }
}
const stringStream = new StringWritableStream();
const writer = stringStream.getWriter();
try {
  await writer.write('How are');
  await writer.write(' you?');
  await writer.close();
} finally {
  writer.releaseLock()
}
assert.equal(
  stringStream.getString(),
  'How are you?'
);

このアプローチの欠点は、2 つの API、つまり WritableStream の API と新しい文字列ストリーム API を混在させていることです。代替案は、それを拡張する代わりに WritableStream に委任することです。

function StringcreateWritableStream() {
  let string = '';
  return {
    stream: new WritableStream({
      write(chunk) {
        string += chunk;
      },
    }),
    getString() {
      return string;
    },
  };
}

const stringStream = StringcreateWritableStream();
const writer = stringStream.stream.getWriter();
try {
  await writer.write('How are');
  await writer.write(' you?');
  await writer.close();
} finally {
  writer.releaseLock()
}
assert.equal(
  stringStream.getString(),
  'How are you?'
);

この機能は、クラスを介して(オブジェクトのファクトリ関数としてではなく)実装することもできます。

10.6 TransformStream の使用

TransformStream は

TransformStream を使用する最も一般的な方法は、「パイプ スルー」することです。

const transformedStream = readableStream.pipeThrough(transformStream);

.pipeThrough() は、readableStreamtransformStream の書き込み側にパイプし、その読み取り側を返します。つまり、readableStream の変換されたバージョンである新しい ReadableStream を作成しました。

.pipeThrough() は、TransformStream だけでなく、次の形状を持つオブジェクトも受け入れます。

interface ReadableWritablePair<RChunk, WChunk> {
  readable: ReadableStream<RChunk>;
  writable: WritableStream<WChunk>;
}

10.6.1 標準 TransformStream

Node.js は、次の標準 TransformStream をサポートしています。

10.6.1.1 例: UTF-8 エンコードされたバイトのストリームのデコード

次の例では、UTF-8 エンコードされたバイトのストリームをデコードします。

const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream
  .pipeThrough(new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

response.body は、チャンクが Uint8ArrayTypedArrays)のインスタンスである ReadableByteStream です。そのストリームを TextDecoderStream にパイプして、文字列チャンクを持つストリームを取得します。

各バイトチャンクを個別に翻訳する(たとえば、TextDecoder を介して)ことは機能しません。なぜなら、単一の Unicode コードポイントは UTF-8 で最大 4 バイトとしてエンコードされるため、それらのバイトがすべて同じチャンクに含まれているとは限らないからです。

10.6.1.2 例: 標準入力の読み取り可能なテキストストリームの作成

次の Node.js モジュールは、標準入力から送信されたすべてのものをログに記録します。

// echo-stdin.mjs
import {Readable} from 'node:stream';

const webStream = Readable.toWeb(process.stdin)
  .pipeThrough(new TextDecoderStream('utf-8'));
for await (const chunk of webStream) {
  console.log('>>>', chunk);
}

標準入力には、process.stdin に格納されているストリームを介してアクセスできます(process はグローバル Node.js 変数です)。このストリームのエンコーディングを設定せず、Readable.toWeb() を介して変換すると、バイトストリームが取得されます。テキストストリームを取得するために、TextDecoderStream にパイプします。

標準入力を段階的に処理することに注意してください。別のチャンクが使用可能になるとすぐに、ログに記録します。つまり、標準入力が完了するまで待ちません。これは、データが大きい場合、または断続的に送信される場合に役立ちます。

10.7 カスタム TransformStream の実装

Transformer オブジェクトを TransformStream のコンストラクターに渡すことで、カスタム TransformStream を実装できます。このようなオブジェクトには次のタイプがあります(このタイプとそのプロパティの説明はざっと読んでください。例で encountered するときに再び説明します)。

interface Transformer<TInChunk, TOutChunk> {
  start?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
  transform?(
    chunk: TInChunk,
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
  flush?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
}

これらのプロパティの説明

これらのメソッドはそれぞれ Promise を返すことができ、Promise が解決されるまでそれ以上のステップは実行されません。これは、非同期で何かを実行したい場合に便利です。

パラメータ controller は次の型を持ちます

interface TransformStreamDefaultController<TOutChunk> {
  enqueue(chunk?: TOutChunk): void;
  readonly desiredSize: number | null;
  terminate(): void;
  error(err?: any): void;
}

TransformStream のバックプレッシャーはどうでしょうか?このクラスは、読み取り側(出力)からのバックプレッシャーを書き込み側(入力)に伝播します。変換によってデータ量が大きく変化しないという前提があります。したがって、Transform はバックプレッシャーを無視しても問題ありません。ただし、transformStreamDefaultController.desiredSize を介して検出し、transformer.transform() から Promise を返すことで伝播できます。

10.7.1 例:任意のチャンクのストリームを行のストリームに変換する

TransformStream の次のサブクラスは、任意のチャンクを持つストリームを、各チャンクがテキストの 1 行で構成されるストリームに変換します。つまり、最後のチャンクを除いて、各チャンクは行末(EOL)文字列で終わります。Unix(macOS を含む)では '\n'、Windows では '\r\n' です。

class ChunksToLinesTransformer {
  #previous = '';

  transform(chunk, controller) {
    let startSearch = this.#previous.length;
    this.#previous += chunk;
    while (true) {
      // Works for EOL === '\n' and EOL === '\r\n'
      const eolIndex = this.#previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // Line includes the EOL
      const line = this.#previous.slice(0, eolIndex+1);
      controller.enqueue(line);
      this.#previous = this.#previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }

  flush(controller) {
    // Clean up and enqueue any text we’re still holding on to
    if (this.#previous.length > 0) {
      controller.enqueue(this.#previous);
    }
  }
}
class ChunksToLinesStream extends TransformStream {
  constructor() {
    super(new ChunksToLinesTransformer());
  }
}

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue('multiple\nlines of\ntext');
    controller.close();
  },
});
const transformStream = new ChunksToLinesStream();
const transformed = stream.pipeThrough(transformStream);

for await (const line of transformed) {
  console.log('>>>', JSON.stringify(line));
}

// Output:
// '>>> "multiple\n"'
// '>>> "lines of\n"'
// '>>> "text"'

Deno の組み込み TextLineStream は同様の機能を提供することに注意してください。

ヒント:非同期ジェネレーターを使用してこの変換を行うこともできます。ReadableStream を非同期的に反復処理し、行を含む非同期イテラブルを返します。その実装は、§9.4「非同期ジェネレーターによる読み取り可能ストリームの変換」 に示されています。

10.7.2 ヒント:非同期ジェネレーターはストリームの変換にも最適です

ReadableStream は非同期的に反復可能であるため、非同期ジェネレーター を使用して変換できます。これにより、非常にエレガントなコードが実現します

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue('one');
    controller.enqueue('two');
    controller.enqueue('three');
    controller.close();
  },
});

async function* prefixChunks(prefix, asyncIterable) {
  for await (const chunk of asyncIterable) {
    yield '> ' + chunk;
  }
}

const transformedAsyncIterable = prefixChunks('> ', stream);
for await (const transformedChunk of transformedAsyncIterable) {
  console.log(transformedChunk);
}

// Output:
// '> one'
// '> two'
// '> three'

10.8 バックプレッシャーの詳細

バックプレッシャーを詳しく見てみましょう。次のパイプチェーンを考えてみましょう

rs.pipeThrough(ts).pipeTo(ws);

rs は ReadableStream、ts は TransformStream、ws は WritableStream です。これらは、前の式によって作成された接続です(.pipeThrough.pipeTo を使用して rsts の書き込み側に接続します)

rs -pipeTo-> ts{writable,readable} -pipeTo-> ws

観察

ws の基になるシンクが遅く、ws のバッファが最終的にいっぱいになったと仮定しましょう。次に、次の手順が発生します

この例は、2種類の機能が必要であることを示しています

Web Streams API でこれらの機能がどのように実装されているかを見てみましょう。

10.8.1 バックプレッシャーの通知

バックプレッシャーは、データを受信しているエンティティによって通知されます。Web ストリームには、そのようなエンティティが 2 つあります

どちらの場合も、入力はキューを介してバッファリングされます。バックプレッシャーを適用する信号は、キューがいっぱいになったときです。それがどのように検出できるかを見てみましょう。

これらはキューの場所です

キューの*必要なサイズ*は、キューに残っている容量を示す数値です

したがって、必要なサイズがゼロ以下の場合は、バックプレッシャーを適用する必要があります。キューを含むオブジェクトのゲッター .desiredSize を介して利用できます。

必要なサイズはどのように計算されますか?*キューイング戦略*と呼ばれるものを指定するオブジェクトを介して。 ReadableStreamWritableStream にはデフォルトのキューイング戦略があり、コンストラクターのオプションパラメーターを介してオーバーライドできます。 インターフェース QueuingStrategy には 2 つのプロパティがあります

キューの必要なサイズは、ハイウォーターマークからキューの現在のサイズを引いたものです。

10.8.2 バックプレッシャーへの反応

データを送信するエンティティは、バックプレッシャーをかけることによって、通知されたバックプレッシャーに反応する必要があります。

10.8.2.1 Writer を介して WritableStream に書き込むコード

必要に応じて、チャンクのサイズを writer.desiredSize に基づいて追加することもできます。

10.8.2.2 ReadableStream の基になるソース

ReadableStream に渡すことができる基になるソースオブジェクトは、外部ソースをラップします。ある意味で、それは ReadableStream の前にあるパイプチェーンのメンバーでもあります。

10.8.2.3 WritableStream の基になるシンク

WritableStream に渡すことができる基になるシンクオブジェクトは、外部シンクをラップします。ある意味で、それは WritableStream の後にあるパイプチェーンのメンバーでもあります。

各外部シンクは、異なる方法でバックプレッシャーを通知します(場合によってはまったく通知しません)。基になるシンクは、書き込みが完了したら履行されるメソッド .write() から Promise を返すことによって、バックプレッシャーをかけることができます。 Web Streams 標準に例があります。それがどのように機能するかを示しています。

10.8.2.4 transformStream (.writable .readable)

TransformStream は、前者の基になるシンクと後者の基になるソースを実装することにより、書き込み側を読み取り側に接続します。内部バックプレッシャーが現在アクティブかどうかを示す内部スロット .[[backpressure]] があります。

10.8.2.5 .pipeTo() (ReadableStream WritableStream)

.pipeTo() は、リーダーを介して ReadableStream からチャンクを読み取り、Writer を介して WritableStream に書き込みます。 writer.desiredSize がゼロ以下の場合は常に一時停止します(Web Streams 標準:ReadableStreamPipeTo の手順 15)。

10.9 バイトストリーム

これまでは、チャンクが文字列である*テキストストリーム*のみを扱ってきました。しかし、Web Streams API は、バイナリデータの*バイトストリーム*もサポートしており、チャンクは Uint8Arrays(TypedArrays)です

次に、読み取り可能なバイトストリームを作成する方法を学習します。

10.9.1 読み取り可能なバイトストリーム

ReadableStream コンストラクターによって作成されるストリームの種類は、オプションの最初のパラメーター underlyingSource のオプションのプロパティ .type によって異なります

ReadableStream が 'bytes' モードの場合、何が変わりますか?

デフォルトモードでは、基になるソースはあらゆる種類のチャンクを返すことができます。バイトモードでは、チャンクは ArrayBufferViews、つまり TypedArrays(Uint8Arrays など)または DataViews である必要があります。

さらに、読み取り可能なバイトストリームは 2 種類のリーダーを作成できます

「BYOB」は「Bring Your Own Buffer」の略で、バッファ(ArrayBufferView)を reader.read() に渡すことができることを意味します。その後、その ArrayBufferView はデタッチされ、使用できなくなります。ただし、.read() は、同じ型を持ち、同じ ArrayBuffer の同じ領域にアクセスする新しい ArrayBufferView でデータを返します。

さらに、可読バイトストリームは異なるコントローラーを持ちます。それらはReadableByteStreamControllerReadableStreamDefaultControllerではなく)のインスタンスです。基になるソースにArrayBufferView(TypedArrayまたはDataView)をエンキューすることを強制する以外に、プロパティ.byobRequestを介してReadableStreamBYOBReadersもサポートします。基になるソースは、このプロパティに格納されているBYOBRequestにデータを書き込みます。Web Streams標準には、「ストリームの作成例」セクション.byobRequestを使用する2つの例があります。

10.9.2 例:ランダムデータで満たされた無限の可読バイトストリーム

次の例では、チャンクをランダムデータで満たす無限の可読バイトストリームを作成します(インスピレーション:「Node.jsでのWeb Streams APIの実装」のexample4.mjs)。

import {promisify} from 'node:util';
import {randomFill} from 'node:crypto';
const asyncRandomFill = promisify(randomFill);

const readableByteStream = new ReadableStream({
  type: 'bytes',
  async pull(controller) {
    const byobRequest = controller.byobRequest;
    await asyncRandomFill(byobRequest.view);
    byobRequest.respond(byobRequest.view.byteLength);
  },
});

const reader = readableByteStream.getReader({mode: 'byob'});
const buffer = new Uint8Array(10); // (A)
const firstChunk = await reader.read(buffer); // (B)
console.log(firstChunk);

readableByteStreamは無限であるため、ループすることはできません。そのため、最初のチャンク(行B)のみを読み取ります。

行Aで作成したバッファは転送されるため、行Bの後では読み取れません。

10.9.3 例:可読バイトストリームの圧縮

次の例では、可読バイトストリームを作成し、それをGZIP形式に圧縮するストリームにパイプします。

const readableByteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // 256 zeros
    controller.enqueue(new Uint8Array(256));
    controller.close();
  },
});
const transformedStream = readableByteStream.pipeThrough(
  new CompressionStream('gzip'));
await logChunks(transformedStream);

async function logChunks(readableByteStream) {
  const reader = readableByteStream.getReader();
  try {
    while (true) {
      const {done, value} = await reader.read();
      if (done) break;
      console.log(value);
    }
  } finally {
    reader.releaseLock();
  }
}

10.9.4 例:fetch()を使用したWebページの読み取り

fetch()の結果は、プロパティ.bodyが可読バイトストリームであるレスポンスオブジェクトに解決されます。TextDecoderStreamを使用して、そのバイトストリームをテキストストリームに変換します。

const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream.pipeThrough(
  new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

10.10 Node.js固有のヘルパー

Node.jsは、_ユーティリティコンシューマー_と呼ばれる以下のヘルパー関数をサポートする唯一のWebプラットフォームです。

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';

これらの関数は、Web ReadableStreams、Node.js Readables、およびAsyncIteratorsを、以下で履行されるPromiseに変換します。

バイナリデータはUTF-8エンコードされていると想定されます。

import * as streamConsumers from 'node:stream/consumers';

const readableByteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // TextEncoder converts strings to UTF-8 encoded Uint8Arrays
    const encoder = new TextEncoder();
    const view = encoder.encode('"😀"');
    assert.deepEqual(
      view,
      Uint8Array.of(34, 240, 159, 152, 128, 34)
    );
    controller.enqueue(view);
    controller.close();
  },
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');

文字列ストリームは期待どおりに動作します。

import * as streamConsumers from 'node:stream/consumers';

const readableByteStream = new ReadableStream({
  start(controller) {
    controller.enqueue('"😀"');
    controller.close();
  },
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');

10.11 参考文献

このセクションで言及されているすべての資料は、この章のソースでした。

この章では、Web Streams APIのすべての側面を網羅しているわけではありません。詳細については、こちらをご覧ください。

その他の資料