fetch()
を使用したWebページの読み取りWebストリーム は、すべての主要なWebプラットフォーム(Webブラウザ、Node.js、およびDeno)で現在サポートされている_ストリーム_の標準です。(ストリームとは、あらゆる種類のソース(ファイル、サーバーでホストされているデータなど)からデータを順次、小さな単位で読み書きするための抽象化です。)
たとえば、グローバル関数 fetch()
(オンラインリソースをダウンロードする)は、Webストリームを持つプロパティ .body
を持つレスポンスを非同期的に返します。
この章では、Node.jsのWebストリームについて説明しますが、ここで学ぶことのほとんどは、WebストリームをサポートするすべてのWebプラットフォームに適用されます。
まずは、Webストリームの基本について概要を説明します。その後、すぐに例に移ります。
ストリームとは、次のようなデータにアクセスするためのデータ構造です。
ストリームの利点の2つは次のとおりです。
ストリームを使用すると、大量のデータを小さな単位(いわゆる_チャンク_)に分割して一度に1つずつ処理できるため、大量のデータを扱うことができます。
異なるデータを処理しながら、同じデータ構造であるストリームを使用できます。これにより、コードを再利用しやすくなります。
Webストリーム(「Web」は省略されることが多い)は、Webブラウザで生まれた比較的新しい標準ですが、現在ではNode.jsとDenoでもサポートされています(このMDN互換性テーブルに示されています)。
Webストリームでは、チャンクは通常次のいずれかです。
Webストリームには、主に3種類あります。
ReadableStreamは、_ソース_からデータを読み取るために使用されます。これを行うコードは、_コンシューマー_と呼ばれます。
WritableStreamは、_シンク_にデータを書き込むために使用されます。これを行うコードは、_プロデューサー_と呼ばれます。
TransformStreamは、2つのストリームで構成されます。
データはTransformStreamに「パイプ」することで変換されます。つまり、書き込み側にデータを書き込み、変換されたデータを読み取り側から読み取ります。次のTransformStreamは、ほとんどのJavaScriptプラットフォームに組み込まれています(詳細は後述)。
TextDecoderStream
は、そのようなデータを文字列に変換します。TextEncoderStream
は、JavaScriptの文字列をUTF-8データに変換します。CompressionStream
は、バイナリデータをGZIPなどの圧縮形式に圧縮します。DecompressionStream
は、GZIPなどの圧縮形式からバイナリデータを解凍します。ReadableStream、WritableStream、TransformStreamは、テキストまたはバイナリデータの転送に使用できます。この章では、主に前者を行います。バイナリデータ用の_バイトストリーム_については、最後に簡単に説明します。
_パイプ_とは、ReadableStreamをWritableStreamに_パイプ_できる操作です。ReadableStreamがデータを生成している限り、この操作はそのデータを読み取り、WritableStreamに書き込みます。2つのストリームだけを接続する場合、ある場所から別の場所にデータを転送する便利な方法が得られます(たとえば、ファイルをコピーする場合)。ただし、2つ以上のストリームを接続して、さまざまな方法でデータを処理できる_パイプチェーン_を取得することもできます。これは、パイプチェーンの例です。
ReadableStreamは、前者を後者の書き込み側にパイプすることでTransformStreamに接続されます。同様に、TransformStreamは、前者の読み取り側を後者の書き込み側にパイプすることで別のTransformStreamに接続されます。また、TransformStreamは、前者の読み取り側を後者にパイプすることでWritableStreamに接続されます。
パイプチェーンの問題の1つは、メンバーが現在処理できる以上のデータを受け取る可能性があることです。 _バックプレッシャー_は、この問題を解決するための手法です。データの受信者が送信者に、受信者が過負荷にならないようにデータの送信を一時的に停止するよう指示できます。
バックプレッシャーを別の見方をすれば、過負荷になっているメンバーからチェーンの先頭まで、パイプチェーンを逆方向に伝わる信号と考えることができます。例として、次のパイプチェーンを考えてみましょう。
ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream
バックプレッシャーはこのチェーンを次のように伝播します。
パイプチェーンの先頭に到達しました。そのため、ReadableStream内にはデータが蓄積されず(これもバッファリングされます)、WriteableStreamは回復する時間があります。回復すると、データを受信できる状態になったことを通知します。この信号もチェーンを逆方向に伝播し、ReadableStreamに到達するとデータ処理が再開されます。
バックプレッシャーのこの最初の説明では、理解を容易にするためにいくつかの詳細が省略されています。これらについては、後ほど説明します。
Node.jsでは、Webストリームは2つのソースから利用できます。
'node:stream/web'
から現時点では、Node.jsでWebストリームを直接サポートしているAPIは1つだけです - Fetch API
const response = await fetch('https://example.com');
const readableStream = response.body;
他の場合は、モジュール 'node:stream'
の次の静的メソッドのいずれかを使用して、Node.jsストリームをWebストリームに変換するか、その逆を行う必要があります。
Readable.toWeb(nodeReadable)
Readable.fromWeb(webReadableStream, options?)
Writable.toWeb(nodeWritable)
Writable.fromWeb(webWritableStream, options?)
Duplex.toWeb(nodeDuplex)
Duplex.fromWeb(webTransformStream, options?)
Webストリームを部分的にサポートするAPIがもう1つあります。FileHandlesには、.readableWebStream()
メソッドがあります。
ReadableStreamを使用すると、さまざまなソースからデータのチャンクを読み取ることができます。ReadableStreamには、次の型があります(この型とそのプロパティの説明は、ざっと読んでください。例で出てきたときに改めて説明します)。
interface ReadableStream<TChunk> {
getReader(): ReadableStreamDefaultReader<TChunk>;
readonly locked: boolean;
Symbol.asyncIterator](): AsyncIterator<TChunk>;
[
cancel(reason?: any): Promise<void>;
pipeTo(
: WritableStream<TChunk>,
destination?: StreamPipeOptions
options: Promise<void>;
)pipeThrough<TChunk2>(
: ReadableWritablePair<TChunk2, TChunk>,
transform?: StreamPipeOptions
options: ReadableStream<TChunk2>;
)
// Not used in this chapter:
tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];
}
interface StreamPipeOptions {
?: AbortSignal;
signal?: boolean;
preventClose?: boolean;
preventAbort?: boolean;
preventCancel }
これらのプロパティの説明
.getReader()
は、リーダー、つまりReadableStreamから読み取ることができるオブジェクトを返します。リーダーを返すReadableStreamは、イテラブルがイテレーターを返すのと似ています。.locked
:ReadableStreamごとに、一度にアクティブにできるリーダーは1つだけです。1つのリーダーが使用されている間、ReadableStreamはロックされ、.getReader()
を呼び出すことはできません。[Symbol.asyncIterator](https://exploringjs.dokyumento.jp/impatient-js/ch_async-iteration.html)
:このメソッドは、ReadableStreamを非同期的に反復可能にします。これは、現在一部のプラットフォームでのみ実装されています。.cancel(reason)
は、コンシューマーがストリームにもはや関心がないため、ストリームをキャンセルします。 reason
は、ReadableStreamの_基になるソース_の.cancel()
メソッドに渡されます(詳細は後述)。この操作が完了すると、返されたPromiseは履行されます。.pipeTo()
は、ReadableStreamの内容をWritableStreamにフィードします。この操作が完了すると、返されたPromiseは履行されます。 .pipeTo()
は、バックプレッシャー、クローズ、エラーなどがパイプチェーン全体に正しく伝播されるようにします。2番目のパラメーターを介してオプションを指定できます。.signal
を使用すると、このメソッドにAbortSignalを渡すことができ、AbortControllerを介してパイプを中止できます。.preventClose
:true
の場合、ReadableStreamが閉じられたときにWritableStreamが閉じられるのを防ぎます。これは、複数のReadableStreamを同じWritableStreamにパイプする場合に役立ちます。.pipeThrough()
は、ReadableStreamをReadableWritablePair(おおまかに言うとTransformStream、詳細は後述)に接続します。結果のReadableStream(つまり、ReadableWritablePairの読み取り側)を返します。次のサブセクションでは、ReadableStreamを消費する3つの方法について説明します。
_リーダー_を使用して、ReadableStreamからデータを読み取ることができます。リーダーには、次の型があります(この型とそのプロパティの説明は、ざっと読んでください。例で出てきたときに改めて説明します)。
interface ReadableStreamGenericReader {
readonly closed: Promise<undefined>;
cancel(reason?: any): Promise<void>;
}interface ReadableStreamDefaultReader<TChunk>
extends ReadableStreamGenericReader
{releaseLock(): void;
read(): Promise<ReadableStreamReadResult<TChunk>>;
}
interface ReadableStreamReadResult<TChunk> {
: boolean;
done: TChunk | undefined;
value }
これらのプロパティの説明
.closed
:このPromiseは、ストリームが閉じられた後に履行されます。ストリームでエラーが発生した場合、またはストリームが閉じられる前にリーダーのロックが解放された場合、拒否されます。.cancel()
:アクティブなリーダーでは、このメソッドは関連付けられたReadableStreamをキャンセルします。.releaseLock()
は、リーダーを非アクティブ化し、そのストリームのロックを解除します。.read()
は、2つのプロパティを持つReadableStreamReadResult(ラップされたチャンク)のPromiseを返します。
.done
は、チャンクを読み取ることができる限り false
で、最後のチャンクの後には true
になるブール値です。.value
はチャンクです(最後のチャンクの後には undefined
になります)。ReadableStreamReadResult は、イテレーションの仕組みをご存知であれば、馴染みがあるかもしれません。ReadableStream は iterable に、Reader は iterator に、ReadableStreamReadResult は iterator メソッド .next()
が返すオブジェクトに似ています。
次のコードは、Reader を使用するためのプロトコルを示しています。
const reader = readableStream.getReader(); // (A)
.equal(readableStream.locked, true); // (B)
asserttry {
while (true) {
const {done, value: chunk} = await reader.read(); // (C)
if (done) break;
// Use `chunk`
}finally {
} .releaseLock(); // (D)
reader }
Reader の取得。 readableStream
から直接読み取ることはできません。最初に *Reader* を取得する必要があります(行 A)。各 ReadableStream は、最大で 1 つの Reader を持つことができます。Reader が取得されると、readableStream
はロックされます(行 B)。.getReader()
を再び呼び出す前に、.releaseLock()
を呼び出す必要があります(行 D)。
チャンクの読み取り。 .read()
は、.done
と .value
プロパティを持つオブジェクトの Promise を返します(行 C)。最後のチャンクが読み取られた後、.done
は true
になります。このアプローチは、JavaScript の非同期イテレーションの仕組みと似ています。
次の例では、テキストファイル data.txt
からチャンク(文字列)を読み取ります。
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
const nodeReadable = fs.createReadStream(
'data.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable); // (A)
const reader = webReadableStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(value);
}finally {
} .releaseLock();
reader
}// Output:
// 'Content of text file\n'
Node.js の Readable を Web の ReadableStream に変換しています(行 A)。そして、前述のプロトコルを使用してチャンクを読み取ります。
次の例では、ReadableStream のすべてのチャンクを連結して文字列にし、それを返します。
/**
* Returns a string with the contents of `readableStream`.
*/
async function readableStreamToString(readableStream) {
const reader = readableStream.getReader();
try {
let result = '';
while (true) {
const {done, value} = await reader.read();
if (done) {
return result; // (A)
}+= value;
result
}finally {
} .releaseLock(); // (B)
reader
} }
便利なことに、finally
句は、try
句をどのように抜けても常に実行されます。つまり、結果を返す場合(行 A)、ロックは正しく解放されます(行 B)。
ReadableStream は、非同期イテレーションを介して消費することもできます。
const iterator = readableStream[Symbol.asyncIterator]();
let exhaustive = false;
try {
while (true) {
let chunk;
done: exhaustive, value: chunk} = await iterator.next());
({if (exhaustive) break;
console.log(chunk);
}finally {
} // If the loop was terminated before we could iterate exhaustively
// (via an exception or `return`), we must call `iterator.return()`.
// Check if that was the case.
if (!exhaustive) {
.return();
iterator
} }
ありがたいことに、for-await-of
ループは、非同期イテレーションのすべての詳細を処理してくれます。
for await (const chunk of readableStream) {
console.log(chunk);
}
ファイルからテキストを読み取る以前の試みをやり直してみましょう。今回は、Reader の代わりに非同期イテレーションを使用します。
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
const nodeReadable = fs.createReadStream(
'text-file.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable);
for await (const chunk of webReadableStream) {
console.log(chunk);
}// Output:
// 'Content of text file'
以前は、Reader を使用して ReadableStream の内容で文字列を組み立てました。非同期イテレーションを使用すると、コードがよりシンプルになります。
/**
* Returns a string with the contents of `readableStream`.
*/
async function readableStreamToString2(readableStream) {
let result = '';
for await (const chunk of readableStream) {
+= chunk;
result
}return result;
}
現時点では、Node.js と Deno は ReadableStream に対する非同期イテレーションをサポートしていますが、Web ブラウザはサポートしていません。バグレポートへのリンクを含むGitHub issueがあります。
ブラウザで非同期イテレーションがどのようにサポートされるかはまだ完全には明らかではないため、ポリフィルよりもラッピングの方が安全な選択肢です。次のコードは、Chromium のバグレポートの提案に基づいています。
async function* getAsyncIterableFor(readableStream) {
const reader = readableStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) return;
yield value;
}finally {
} .releaseLock();
reader
} }
ReadableStream には、パイプするための 2 つのメソッドがあります。
readableStream.pipeTo(writeableStream)
は、Promise p
を同期的に返します。これは、readableStream
のすべてのチャンクを非同期的に読み取り、writableStream
に書き込みます。完了すると、p
を履行します。
.pipeTo()
の例は、WritableStream を調べていく際に、データ転送の便利な方法として示します。
readableStream.pipeThrough(transformStream)
は、readableStream
を transformStream.writable
にパイプし、transformStream.readable
を返します(すべての TransformStream には、書き込み側と読み取り側を参照するこれらのプロパティがあります)。この操作を別の見方をすると、transformStream
を readableStream
に接続することで、新しい ReadableStream を作成することになります。
.pipeThrough()
の例は、TransformStream を調べていく際に、このメソッドがそれらが使用される主な方法であるため示します。
外部ソースを ReadableStream を介して読み取る場合、アダプターオブジェクトにラップして、そのオブジェクトを ReadableStream
コンストラクターに渡すことができます。アダプターオブジェクトは、ReadableStream の *基になるソース* と呼ばれます(キューイング戦略については、バックプレッシャーについて詳しく見ていく際に後述します)。
new ReadableStream(underlyingSource?, queuingStrategy?)
これは、基になるソースの型です(この型とそのプロパティの説明は、例で encountered するまで読み飛ばしても構いません)。
interface UnderlyingSource<TChunk> {
?(
start: ReadableStreamController<TChunk>
controller: void | Promise<void>;
)?(
pull: ReadableStreamController<TChunk>
controller: void | Promise<void>;
)?(reason?: any): void | Promise<void>;
cancel
// Only used in byte streams and ignored in this section:
: 'bytes' | undefined;
type: bigint;
autoAllocateChunkSize }
ReadableStream がこれらのメソッドを呼び出すのは、以下の場合です。
.start(controller)
は、ReadableStream
のコンストラクターを呼び出した直後に呼び出されます。
.pull(controller)
は、ReadableStream の内部キューに空きがあるたびに呼び出されます。キューが再びいっぱいになるまで繰り返し呼び出されます。このメソッドは、.start()
が完了した後にのみ呼び出されます。.pull()
が何もエンキューしない場合、再び呼び出されることはありません。
.cancel(reason)
は、ReadableStream のコンシューマーが readableStream.cancel()
または reader.cancel()
を介してキャンセルした場合に呼び出されます。reason
は、これらのメソッドに渡された値です。
これらのメソッドはそれぞれ Promise を返すことができ、Promise が解決されるまでそれ以上のステップは実行されません。これは、非同期で何かを実行したい場合に便利です。
.start()
と .pull()
のパラメーター controller
によって、ストリームにアクセスできます。以下の型を持ちます。
type ReadableStreamController<TChunk> =
| ReadableStreamDefaultController<TChunk>
| ReadableByteStreamController<TChunk> // ignored here
;
interface ReadableStreamDefaultController<TChunk> {
enqueue(chunk?: TChunk): void;
readonly desiredSize: number | null;
close(): void;
error(err?: any): void;
}
今のところ、チャンクは文字列です。後ほど、Uint8Array が一般的なバイトストリームについて説明します。メソッドの動作は以下のとおりです。
.enqueue(chunk)
は、chunk
を ReadableStream の内部キューに追加します。.desiredSize
は、.enqueue()
が書き込むキューにどれだけの空きがあるかを示します。キューがいっぱいの場合はゼロ、最大サイズを超えている場合は負になります。したがって、必要なサイズがゼロ以下の場合は、エンキューを停止する必要があります。null
です。.close()
は ReadableStream を閉じます。コンシューマーは引き続きキューを空にすることができますが、その後、ストリームは終了します。基になるソースがこのメソッドを呼び出すことが重要です。そうでない場合、ストリームの読み取りは永遠に終了しません。.error(err)
は、ストリームをエラーモードにします。これ以降のストリームとのすべてのインタラクションは、エラー値 err
で失敗します。基になるソースを実装する最初の例では、メソッド .start()
のみを提供します。.pull()
のユースケースについては、次のサブセクションで説明します。
const readableStream = new ReadableStream({
start(controller) {
.enqueue('First line\n'); // (A)
controller.enqueue('Second line\n'); // (B)
controller.close(); // (C)
controller,
};
})for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'First line\n'
// 'Second line\n'
コントローラーを使用して、2 つのチャンクを持つストリームを作成します(行 A と行 B)。ストリームを閉じる(行 C)ことが重要です。そうでないと、for-await-of
ループは永遠に終了しません!
このエンキュー方法は完全に安全ではないことに注意してください。内部キューの容量を超えるリスクがあります。そのリスクを回避する方法については、すぐに説明します。
一般的なシナリオは、プッシュソースまたはプルソースを ReadableStream に変換することです。ソースがプッシュかプルかによって、UnderlyingSource で ReadableStream にどのようにフックするかが決まります。
プッシュソース: このようなソースは、新しいデータがあるときに通知します。.start()
を使用して、リスナーとサポートデータ構造を設定します。データが多すぎて必要なサイズが正でなくなった場合は、ソースに一時停止するように指示する必要があります。後で .pull()
が呼び出された場合は、一時停止を解除できます。必要なサイズが正でなくなったことに反応して外部ソースを一時停止することを、*バックプレッシャーの適用* と呼びます。
プルソース: このようなソースに新しいデータを要求します。多くの場合、非同期的に行います。したがって、通常は .start()
ではあまり多くのことはせず、.pull()
が呼び出されるたびにデータを取得します。
次に、両方の種類のソースの例を示します。
次の例では、ソケットの周りに ReadableStream をラップします。ソケットはデータをプッシュします(呼び出します)。この例は、Web ストリームの仕様から引用したものです。
function makeReadableBackpressureSocketStream(host, port) {
const socket = createBackpressureSocket(host, port);
return new ReadableStream({
start(controller) {
.ondata = event => {
socket.enqueue(event.data);
controller
if (controller.desiredSize <= 0) {
// The internal queue is full, so propagate
// the backpressure signal to the underlying source.
.readStop();
socket
};
}
.onend = () => controller.close();
socket.onerror = () => controller.error(
socketnew Error('The socket errored!'));
,
}
pull() {
// This is called if the internal queue has been emptied, but the
// stream’s consumer still wants more data. In that case, restart
// the flow of data if we have previously paused it.
.readStart();
socket,
}
cancel() {
.close();
socket,
};
}) }
ツール関数 iterableToReadableStream()
は、チャンクの iterable を受け取り、ReadableStream に変換します。
/**
* @param iterable an iterable (asynchronous or synchronous)
*/
function iterableToReadableStream(iterable) {
return new ReadableStream({
start() {
if (typeof iterable[Symbol.asyncIterator] === 'function') {
this.iterator = iterable[Symbol.asyncIterator]();
else if (typeof iterable[Symbol.iterator] === 'function') {
} this.iterator = iterable[Symbol.iterator]();
else {
} throw new Error('Not an iterable: ' + iterable);
},
}
async pull(controller) {
if (this.iterator === null) return;
// Sync iterators return non-Promise values,
// but `await` doesn’t mind and simply passes them on
const {value, done} = await this.iterator.next();
if (done) {
this.iterator = null;
.close();
controllerreturn;
}.enqueue(value);
controller,
}
cancel() {
this.iterator = null;
.close();
controller,
};
}) }
非同期ジェネレーター関数を使用して非同期 iterable を作成し、その iterable を ReadableStream に変換してみましょう。
async function* genAsyncIterable() {
yield 'how';
yield 'are';
yield 'you';
}const readableStream = iterableToReadableStream(genAsyncIterable());
for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'how'
// 'are'
// 'you'
iterableToReadableStream()
は、同期 iterable でも機能します。
const syncIterable = ['hello', 'everyone'];
const readableStream = iterableToReadableStream(syncIterable);
for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'hello'
// 'everyone'
最終的には、この機能を提供する静的ヘルパーメソッド ReadableStream.from()
が存在する可能性があります(詳細については、プルリクエストを参照してください)。
WritableStream を使用すると、さまざまなシンクにデータチャンクを書き込むことができます。以下の型を持ちます(この型とそのプロパティの説明は、例で encountered するまで読み飛ばしても構いません)。
interface WritableStream<TChunk> {
getWriter(): WritableStreamDefaultWriter<TChunk>;
readonly locked: boolean;
close(): Promise<void>;
abort(reason?: any): Promise<void>;
}
これらのプロパティの説明
.getWriter()
は、WritableStream に書き込むためのオブジェクトである Writer を返します。.locked
: WritableStream ごとに、一度にアクティブにできる Writer は 1 つだけです。1 つの Writer が使用されている間、WritableStream はロックされ、.getWriter()
を呼び出すことはできません。.close()
はストリームを閉じます。.abort()
はストリームを中止します。以下のサブセクションでは、WritableStream にデータを送信する 2 つのアプローチについて説明します。
*Writer* を使用して WritableStream に書き込むことができます。以下の型を持ちます(この型とそのプロパティの説明は、例で encountered するまで読み飛ばしても構いません)。
interface WritableStreamDefaultWriter<TChunk> {
readonly desiredSize: number | null;
readonly ready: Promise<undefined>;
write(chunk?: TChunk): Promise<void>;
releaseLock(): void;
close(): Promise<void>;
readonly closed: Promise<undefined>;
abort(reason?: any): Promise<void>;
}
これらのプロパティの説明
.desiredSize
は、この WriteStream のキューにどれだけの空きがあるかを示します。キューがいっぱいの場合はゼロ、最大サイズを超えている場合は負になります。したがって、必要なサイズがゼロ以下の場合は、書き込みを停止する必要があります。
null
です。.ready
は、必要なサイズが負以外から正に変化したときに履行される Promise を返します。これは、バックプレッシャーがアクティブではなく、データを書き込んでも問題ないことを意味します。必要なサイズが後で負以外に戻ると、新しい保留中の Promise が作成されて返されます。
.write()
は、チャンクをストリームに書き込みます。書き込みが成功した後に履行され、エラーが発生した場合は拒否される Promise を返します。
.releaseLock()
は、ストリームに対する Writer のロックを解放します。
.close()
は、Writer のストリームを閉じるのと同じ効果があります。
.closed
は、ストリームが閉じられたときに履行される Promise を返します。
.abort()
は、Writer のストリームを中止するのと同じ効果があります。
以下のコードは、Writer を使用するためのプロトコルを示しています。
const writer = writableStream.getWriter(); // (A)
.equal(writableStream.locked, true); // (B)
asserttry {
// Writing the chunks (explained later)
finally {
} .releaseLock(); // (C)
writer }
writableStream
に直接書き込むことはできません。最初に *Writer* を取得する必要があります(行 A)。各 WritableStream は、最大で 1 つの Writer を持つことができます。Writer が取得された後、writableStream
はロックされます(行 B)。.getWriter()
を再度呼び出す前に、.releaseLock()
を呼び出す必要があります(行 C)。
チャンクを書き込むには、3 つの方法があります。
.write()
を待つ(逆圧力の処理が非効率的)最初の書き込み方法は、.write()
の各結果を待つことです。
await writer.write('Chunk 1');
await writer.write('Chunk 2');
await writer.close();
.write()
によって返される Promise は、渡されたチャンクが正常に書き込まれたときに履行されます。「正常に書き込まれた」とはどういう意味か正確には、WritableStream の実装方法によって異なります。たとえば、ファイルストリームでは、チャンクはオペレーティングシステムに送信されたものの、まだキャッシュに存在しており、実際にはディスクに書き込まれていない可能性があります。
.close()
によって返される Promise は、ストリームが閉じられたときに履行されます。
この書き込み方法の欠点は、書き込みが成功するまで待つと、キューが使用されないことです。結果として、データスループットが低下する可能性があります。
.write()
の拒否を無視する(逆圧力を無視する)2 番目の書き込み方法では、.write()
によって返される Promise を無視し、.close()
によって返される Promise のみ を待ちます。
.write('Chunk 1').catch(() => {}); // (A)
writer.write('Chunk 2').catch(() => {}); // (B)
writerawait writer.close(); // reports errors
.write()
の同期呼び出しは、チャンクを WritableStream の内部キューに追加します。返された Promise を待たないことで、各チャンクが書き込まれるまで待ちません。ただし、.close()
を待つことで、続行する前にキューが空になり、すべての書き込みが成功したことを保証します。
行 A と行 B で .catch()
を呼び出すことは、書き込み中に問題が発生した場合に、処理されない Promise の拒否に関する警告を回避するために必要です。このような警告は、多くの場合、コンソールに記録されます。.close()
もエラーを報告するため、.write()
によって報告されたエラーは無視できます。
Promise の拒否を無視するヘルパー関数を使用することで、上記のコードを改善できます。
ignoreRejections(
.write('Chunk 1'),
writer.write('Chunk 2'),
writer;
)await writer.close(); // reports errors
function ignoreRejections(...promises) {
for (const promise of promises) {
.catch(() => {});
promise
} }
このアプローチの 1 つの欠点は、逆圧力が無視されることです。キューは書き込むすべてのものを保持するのに十分な大きさであると単純に想定しています。
.ready
を待つ(逆圧力を効率的に処理する)この書き込み方法では、Writer ゲッター .ready
を待つことで、逆圧力を効率的に処理します。
await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
.write('Chunk 1').catch(() => {});
writer
await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
.write('Chunk 2').catch(() => {});
writer
await writer.close(); // reports errors
.ready
の Promise は、ストリームが逆圧力のある状態から逆圧力のない状態に移行するたびに履行されます。
この例では、WritableStream を介してテキストファイル data.txt
を作成します。
import * as fs from 'node:fs';
import {Writable} from 'node:stream';
const nodeWritable = fs.createWriteStream(
'new-file.txt', {encoding: 'utf-8'}); // (A)
const webWritableStream = Writable.toWeb(nodeWritable); // (B)
const writer = webWritableStream.getWriter();
try {
await writer.write('First line\n');
await writer.write('Second line\n');
await writer.close();
finally {
} .releaseLock()
writer }
行 A では、ファイル data.txt
の Node.js ストリームを作成します。行 B では、このストリームを Web ストリームに変換します。次に、Writer を使用して文字列を書き込みます。
Writer を使用する代わりに、ReadableStream をパイプすることで WritableStream に書き込むこともできます。
await readableStream.pipeTo(writableStream);
.pipeTo()
によって返される Promise は、パイプが正常に完了すると履行されます。
パイプは、現在のタスクが完了または一時停止した後に実行されます。次のコードはそれを示しています。
const readableStream = new ReadableStream({ // (A)
start(controller) {
.enqueue('First line\n');
controller.enqueue('Second line\n');
controller.close();
controller,
};
})const writableStream = new WritableStream({ // (B)
write(chunk) {
console.log('WRITE: ' + JSON.stringify(chunk));
,
}close() {
console.log('CLOSE WritableStream');
,
};
})
console.log('Before .pipeTo()');
const promise = readableStream.pipeTo(writableStream); // (C)
.then(() => console.log('Promise fulfilled'));
promiseconsole.log('After .pipeTo()');
// Output:
// 'Before .pipeTo()'
// 'After .pipeTo()'
// 'WRITE: "First line\n"'
// 'WRITE: "Second line\n"'
// 'CLOSE WritableStream'
// 'Promise fulfilled'
行 A では ReadableStream を作成します。行 B では WritableStream を作成します。
.pipeTo()
(行 C)がすぐに返されることがわかります。新しいタスクでは、チャンクが読み書きされます。次に、writableStream
が閉じられ、最後に promise
が履行されます。
次の例では、ファイルの WritableStream を作成し、ReadableStream をパイプします。
const webReadableStream = new ReadableStream({ // (A)
async start(controller) {
.enqueue('First line\n');
controller.enqueue('Second line\n');
controller.close();
controller,
};
})
const nodeWritable = fs.createWriteStream( // (B)
'data.txt', {encoding: 'utf-8'});
const webWritableStream = Writable.toWeb(nodeWritable); // (C)
await webReadableStream.pipeTo(webWritableStream); // (D)
行 A では、ReadableStream を作成します。行 B では、ファイル data.txt
の Node.js ストリームを作成します。行 C では、このストリームを Web ストリームに変換します。行 D では、webReadableStream
をファイルの WritableStream にパイプします。
次の例では、2 つの ReadableStream を 1 つの WritableStream に書き込みます。
function createReadableStream(prefix) {
return new ReadableStream({
async start(controller) {
.enqueue(prefix + 'chunk 1');
controller.enqueue(prefix + 'chunk 2');
controller.close();
controller,
};
})
}
const writableStream = new WritableStream({
write(chunk) {
console.log('WRITE ' + JSON.stringify(chunk));
,
}close() {
console.log('CLOSE');
,
}abort(err) {
console.log('ABORT ' + err);
,
};
})
await createReadableStream('Stream 1: ')
.pipeTo(writableStream, {preventClose: true}); // (A)
await createReadableStream('Stream 2: ')
.pipeTo(writableStream, {preventClose: true}); // (B)
await writableStream.close();
// Output
// 'WRITE "Stream 1: chunk 1"'
// 'WRITE "Stream 1: chunk 2"'
// 'WRITE "Stream 2: chunk 1"'
// 'WRITE "Stream 2: chunk 2"'
// 'CLOSE'
ReadableStream が閉じられた後、WritableStream を閉じないように .pipeTo()
に指示します(行 A と行 B)。したがって、WritableStream は行 A の後も開いたままで、別の ReadableStream をパイプできます。
WritableStream を介して外部シンクに書き込む場合は、アダプターオブジェクトにラップして、そのオブジェクトを WritableStream
コンストラクターに渡すことができます。アダプターオブジェクトは、WritableStream の *基盤となるシンク* と呼ばれます(キューイング戦略については、後で逆圧力を詳しく調べるときに説明します)。
new WritableStream(underlyingSink?, queuingStrategy?)
これは、基盤となるシンクのタイプです(このタイプとそのプロパティの説明はざっと読んでください。例で encountered するときに再び説明します)。
interface UnderlyingSink<TChunk> {
?(
start: WritableStreamDefaultController
controller: void | Promise<void>;
)?(
write: TChunk,
chunk: WritableStreamDefaultController
controller: void | Promise<void>;
)?(): void | Promise<void>;;
close?(reason?: any): void | Promise<void>;
abort }
これらのプロパティの説明
.start(controller)
は、WritableStream
のコンストラクターを呼び出した直後に呼び出されます。非同期処理を行う場合は、Promise を返すことができます。このメソッドでは、書き込みの準備ができます。
.write(chunk, controller)
は、新しいチャンクが外部シンクに書き込まれる準備ができると呼び出されます。逆圧力がなくなった後に履行される Promise を返すことで、逆圧力をかけることができます。
.close()
は、writer.close()
が呼び出され、キューに入れられたすべての書き込みが成功した後に呼び出されます。このメソッドでは、書き込み後にクリーンアップできます。
.abort(reason)
は、writeStream.abort()
または writer.abort()
が呼び出された場合に呼び出されます。reason
は、これらのメソッドに渡される値です。
.start()
と .write()
のパラメーター controller
を使用すると、WritableStream でエラーが発生します。次のタイプがあります。
interface WritableStreamDefaultController {
readonly signal: AbortSignal;
error(err?: any): void;
}
.signal
は AbortSignal で、ストリームが中止されたときに書き込みまたはクローズ操作を中止する場合にリッスンできます。.error(err)
は WritableStream でエラーを発生させます。閉じられ、それとの今後のすべてのインタラクションはエラー値 err
で失敗します。次の例では、ReadableStream がチャンクをどのように生成するかを確認するために、ReadableStream を WritableStream にパイプします。
const readableStream = new ReadableStream({
start(controller) {
.enqueue('First chunk');
controller.enqueue('Second chunk');
controller.close();
controller,
};
})await readableStream.pipeTo(
new WritableStream({
write(chunk) {
console.log('WRITE ' + JSON.stringify(chunk));
,
}close() {
console.log('CLOSE');
,
}abort(err) {
console.log('ABORT ' + err);
,
}
});
)// Output:
// 'WRITE "First chunk"'
// 'WRITE "Second chunk"'
// 'CLOSE'
次の例では、書き込まれたすべてのチャンクを文字列に収集する WriteStream
のサブクラスを作成します。メソッド .getString()
を介してその文字列にアクセスできます。
class StringWritableStream extends WritableStream {
= '';
#string constructor() {
super({
// We need to access the `this` of `StringWritableStream`.
// Hence the arrow function (and not a method).
write: (chunk) => {
this.#string += chunk;
,
};
})
}getString() {
return this.#string;
}
}const stringStream = new StringWritableStream();
const writer = stringStream.getWriter();
try {
await writer.write('How are');
await writer.write(' you?');
await writer.close();
finally {
} .releaseLock()
writer
}.equal(
assert.getString(),
stringStream'How are you?'
; )
このアプローチの欠点は、2 つの API、つまり WritableStream
の API と新しい文字列ストリーム API を混在させていることです。代替案は、それを拡張する代わりに WritableStream に委任することです。
function StringcreateWritableStream() {
let string = '';
return {
stream: new WritableStream({
write(chunk) {
+= chunk;
string ,
},
})getString() {
return string;
,
};
}
}
const stringStream = StringcreateWritableStream();
const writer = stringStream.stream.getWriter();
try {
await writer.write('How are');
await writer.write(' you?');
await writer.close();
finally {
} .releaseLock()
writer
}.equal(
assert.getString(),
stringStream'How are you?'
; )
この機能は、クラスを介して(オブジェクトのファクトリ関数としてではなく)実装することもできます。
TransformStream は
TransformStream を使用する最も一般的な方法は、「パイプ スルー」することです。
const transformedStream = readableStream.pipeThrough(transformStream);
.pipeThrough()
は、readableStream
を transformStream
の書き込み側にパイプし、その読み取り側を返します。つまり、readableStream
の変換されたバージョンである新しい ReadableStream を作成しました。
.pipeThrough()
は、TransformStream だけでなく、次の形状を持つオブジェクトも受け入れます。
interface ReadableWritablePair<RChunk, WChunk> {
: ReadableStream<RChunk>;
readable: WritableStream<WChunk>;
writable }
Node.js は、次の標準 TransformStream をサポートしています。
TextEncoderStream
および TextDecoderStream
TextDecoderStream
は、これらのケースを正しく処理します。TextEncoderStream
、TextDecoderStream
)。CompressionStream
、DecompressionStream
deflate
(ZLIB 圧縮データ形式)、deflate-raw
(DEFLATE アルゴリズム)、gzip
(GZIP ファイル形式)。CompressionStream
、DecompressionStream
)。次の例では、UTF-8 エンコードされたバイトのストリームをデコードします。
const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream
.pipeThrough(new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
console.log(stringChunk);
}
response.body
は、チャンクが Uint8Array
(TypedArrays)のインスタンスである ReadableByteStream です。そのストリームを TextDecoderStream
にパイプして、文字列チャンクを持つストリームを取得します。
各バイトチャンクを個別に翻訳する(たとえば、TextDecoder
を介して)ことは機能しません。なぜなら、単一の Unicode コードポイントは UTF-8 で最大 4 バイトとしてエンコードされるため、それらのバイトがすべて同じチャンクに含まれているとは限らないからです。
次の Node.js モジュールは、標準入力から送信されたすべてのものをログに記録します。
// echo-stdin.mjs
import {Readable} from 'node:stream';
const webStream = Readable.toWeb(process.stdin)
.pipeThrough(new TextDecoderStream('utf-8'));
for await (const chunk of webStream) {
console.log('>>>', chunk);
}
標準入力には、process.stdin
に格納されているストリームを介してアクセスできます(process
はグローバル Node.js 変数です)。このストリームのエンコーディングを設定せず、Readable.toWeb()
を介して変換すると、バイトストリームが取得されます。テキストストリームを取得するために、TextDecoderStream にパイプします。
標準入力を段階的に処理することに注意してください。別のチャンクが使用可能になるとすぐに、ログに記録します。つまり、標準入力が完了するまで待ちません。これは、データが大きい場合、または断続的に送信される場合に役立ちます。
Transformer オブジェクトを TransformStream
のコンストラクターに渡すことで、カスタム TransformStream を実装できます。このようなオブジェクトには次のタイプがあります(このタイプとそのプロパティの説明はざっと読んでください。例で encountered するときに再び説明します)。
interface Transformer<TInChunk, TOutChunk> {
?(
start: TransformStreamDefaultController<TOutChunk>
controller: void | Promise<void>;
)?(
transform: TInChunk,
chunk: TransformStreamDefaultController<TOutChunk>
controller: void | Promise<void>;
)?(
flush: TransformStreamDefaultController<TOutChunk>
controller: void | Promise<void>;
) }
これらのプロパティの説明
.start(controller)
は、TransformStream
のコンストラクターを呼び出した直後に呼び出されます。ここでは、変換を開始する前に準備を行うことができます。.transform(chunk, controller)
は、実際の変換を実行します。入力チャンクを受信し、そのパラメーター controller
を使用して、1 つ以上の変換された出力チャンクをキューに入れることができます。何もキューに入れないことも選択できます。.flush(controller)
は、すべての入力チャンクが正常に変換された後に呼び出されます。ここでは、変換が完了した後のクリーンアップを実行できます。これらのメソッドはそれぞれ Promise を返すことができ、Promise が解決されるまでそれ以上のステップは実行されません。これは、非同期で何かを実行したい場合に便利です。
パラメータ controller
は次の型を持ちます
interface TransformStreamDefaultController<TOutChunk> {
enqueue(chunk?: TOutChunk): void;
readonly desiredSize: number | null;
terminate(): void;
error(err?: any): void;
}
.enqueue(chunk)
は、chunk
を TransformStream の読み取り側(出力)に追加します。.desiredSize
は、TransformStream の読み取り側(出力)の内部キューの必要なサイズを返します。.terminate()
は、TransformStream の読み取り側(出力)を閉じ、書き込み側(入力)にエラーを発生させます。トランスフォーマーが書き込み側(入力)の残りのチャンクに興味がなく、それらをスキップしたい場合に使用できます。.error(err)
は、TransformStream にエラーを発生させます。これ以降のすべての操作は、エラー値 err
で失敗します。TransformStream のバックプレッシャーはどうでしょうか?このクラスは、読み取り側(出力)からのバックプレッシャーを書き込み側(入力)に伝播します。変換によってデータ量が大きく変化しないという前提があります。したがって、Transform はバックプレッシャーを無視しても問題ありません。ただし、transformStreamDefaultController.desiredSize
を介して検出し、transformer.transform()
から Promise を返すことで伝播できます。
TransformStream
の次のサブクラスは、任意のチャンクを持つストリームを、各チャンクがテキストの 1 行で構成されるストリームに変換します。つまり、最後のチャンクを除いて、各チャンクは行末(EOL)文字列で終わります。Unix(macOS を含む)では '\n'
、Windows では '\r\n'
です。
class ChunksToLinesTransformer {
= '';
#previous
transform(chunk, controller) {
let startSearch = this.#previous.length;
this.#previous += chunk;
while (true) {
// Works for EOL === '\n' and EOL === '\r\n'
const eolIndex = this.#previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// Line includes the EOL
const line = this.#previous.slice(0, eolIndex+1);
.enqueue(line);
controllerthis.#previous = this.#previous.slice(eolIndex+1);
= 0;
startSearch
}
}
flush(controller) {
// Clean up and enqueue any text we’re still holding on to
if (this.#previous.length > 0) {
.enqueue(this.#previous);
controller
}
}
}class ChunksToLinesStream extends TransformStream {
constructor() {
super(new ChunksToLinesTransformer());
}
}
const stream = new ReadableStream({
async start(controller) {
.enqueue('multiple\nlines of\ntext');
controller.close();
controller,
};
})const transformStream = new ChunksToLinesStream();
const transformed = stream.pipeThrough(transformStream);
for await (const line of transformed) {
console.log('>>>', JSON.stringify(line));
}
// Output:
// '>>> "multiple\n"'
// '>>> "lines of\n"'
// '>>> "text"'
Deno の組み込み TextLineStream
は同様の機能を提供することに注意してください。
ヒント:非同期ジェネレーターを使用してこの変換を行うこともできます。ReadableStream を非同期的に反復処理し、行を含む非同期イテラブルを返します。その実装は、§9.4「非同期ジェネレーターによる読み取り可能ストリームの変換」 に示されています。
ReadableStream は非同期的に反復可能であるため、非同期ジェネレーター を使用して変換できます。これにより、非常にエレガントなコードが実現します
const stream = new ReadableStream({
async start(controller) {
.enqueue('one');
controller.enqueue('two');
controller.enqueue('three');
controller.close();
controller,
};
})
async function* prefixChunks(prefix, asyncIterable) {
for await (const chunk of asyncIterable) {
yield '> ' + chunk;
}
}
const transformedAsyncIterable = prefixChunks('> ', stream);
for await (const transformedChunk of transformedAsyncIterable) {
console.log(transformedChunk);
}
// Output:
// '> one'
// '> two'
// '> three'
バックプレッシャーを詳しく見てみましょう。次のパイプチェーンを考えてみましょう
.pipeThrough(ts).pipeTo(ws); rs
rs
は ReadableStream、ts
は TransformStream、ws
は WritableStream です。これらは、前の式によって作成された接続です(.pipeThrough
は .pipeTo
を使用して rs
を ts
の書き込み側に接続します)
rs -pipeTo-> ts{writable,readable} -pipeTo-> ws
観察
rs
の基になるソースは、rs
の前にあるパイプチェーンメンバーと見なすことができます。ws
の基になるシンクは、ws
の後にあるパイプチェーンメンバーと見なすことができます。ws
の基になるシンクが遅く、ws
のバッファが最終的にいっぱいになったと仮定しましょう。次に、次の手順が発生します
ws
はいっぱいであることを通知します。pipeTo
は ts.readable
からの読み取りを停止します。ts.readable
はいっぱいであることを通知します。ts
は、チャンクを ts.writable
から ts.readable
に移動するのを停止します。ts.writable
はいっぱいであることを通知します。pipeTo
は rs
からの読み取りを停止します。rs
は、基になるソースにいっぱいであることを通知します。この例は、2種類の機能が必要であることを示しています
Web Streams API でこれらの機能がどのように実装されているかを見てみましょう。
バックプレッシャーは、データを受信しているエンティティによって通知されます。Web ストリームには、そのようなエンティティが 2 つあります
.write()
を介してデータを受信します。.enqueue()
を呼び出すとデータを受信します。どちらの場合も、入力はキューを介してバッファリングされます。バックプレッシャーを適用する信号は、キューがいっぱいになったときです。それがどのように検出できるかを見てみましょう。
これらはキューの場所です
キューの*必要なサイズ*は、キューに残っている容量を示す数値です
したがって、必要なサイズがゼロ以下の場合は、バックプレッシャーを適用する必要があります。キューを含むオブジェクトのゲッター .desiredSize
を介して利用できます。
必要なサイズはどのように計算されますか?*キューイング戦略*と呼ばれるものを指定するオブジェクトを介して。 ReadableStream
と WritableStream
にはデフォルトのキューイング戦略があり、コンストラクターのオプションパラメーターを介してオーバーライドできます。 インターフェース QueuingStrategy
には 2 つのプロパティがあります
.size(chunk)
は、chunk
のサイズを返します。.highWaterMark
は、キューの最大サイズを指定します。キューの必要なサイズは、ハイウォーターマークからキューの現在のサイズを引いたものです。
データを送信するエンティティは、バックプレッシャーをかけることによって、通知されたバックプレッシャーに反応する必要があります。
writer.ready
の Promise を待つことができます。待っている間、ブロックされ、必要なバックプレッシャーが達成されます。キューに空きができると、Promise は履行されます。 writer.desiredSize
の値がゼロより大きくなると、履行がトリガーされます。
あるいは、writer.write()
によって返された Promise を待つこともできます。そうすれば、キューはいっぱいになりません。
必要に応じて、チャンクのサイズを writer.desiredSize
に基づいて追加することもできます。
ReadableStream に渡すことができる基になるソースオブジェクトは、外部ソースをラップします。ある意味で、それは ReadableStream の前にあるパイプチェーンのメンバーでもあります。
基になるプルソースは、キューに空きがある場合にのみ、新しいデータが要求されます。ない間は、データがプルされないため、バックプレッシャーが自動的にかけられます。
基になるプッシュソースは、何かをエンキューした後、controller.desiredSize
をチェックする必要があります。ゼロ以下の場合、外部ソースを一時停止することによってバックプレッシャーをかける必要があります。
WritableStream に渡すことができる基になるシンクオブジェクトは、外部シンクをラップします。ある意味で、それは WritableStream の後にあるパイプチェーンのメンバーでもあります。
各外部シンクは、異なる方法でバックプレッシャーを通知します(場合によってはまったく通知しません)。基になるシンクは、書き込みが完了したら履行されるメソッド .write()
から Promise を返すことによって、バックプレッシャーをかけることができます。 Web Streams 標準に例があります。それがどのように機能するかを示しています。
.writable
→
.readable
)TransformStream は、前者の基になるシンクと後者の基になるソースを実装することにより、書き込み側を読み取り側に接続します。内部バックプレッシャーが現在アクティブかどうかを示す内部スロット .[[backpressure]]
があります。
書き込み側の基になるシンクのメソッド .write()
は、内部バックプレッシャーがなくなるまで非同期的に待機してから、別のチャンクを TransformStream のトランスフォーマーにフィードします(Web Streams 標準:TransformStreamDefaultSinkWriteAlgorithm
)。その後、トランスフォーマーは TransformStreamDefaultController を介して何かをエンキューする可能性があります。 .write()
は、メソッドが終了すると履行される Promise を返すことに注意してください。それが起こるまで、WriteStream はキューを介して着信書き込みリクエストをバッファリングします。したがって、書き込み側のバックプレッシャーは、そのキューとその必要なサイズを介して通知されます。
TransformStreamDefaultController を介してチャンクがエンキューされ、読み取り側のキューがいっぱいになると、TransformStream のバックプレッシャーがアクティブになります(Web Streams 標準:TransformStreamDefaultControllerEnqueue
)。
Reader から何かが読み取られると、TransformStream のバックプレッシャーが非アクティブになる場合があります(Web Streams 標準:ReadableStreamDefaultReaderRead
)
.pull()
を呼び出す時期かもしれません(Web Streams 標準:.[[PullSteps]]
)。.pull()
は、バックプレッシャーを非アクティブにします(Web Streams 標準:TransformStreamDefaultSourcePullAlgorithm
)。.pipeTo()
(ReadableStream →
WritableStream).pipeTo()
は、リーダーを介して ReadableStream からチャンクを読み取り、Writer を介して WritableStream に書き込みます。 writer.desiredSize
がゼロ以下の場合は常に一時停止します(Web Streams 標準:ReadableStreamPipeTo
の手順 15)。
これまでは、チャンクが文字列である*テキストストリーム*のみを扱ってきました。しかし、Web Streams API は、バイナリデータの*バイトストリーム*もサポートしており、チャンクは Uint8Arrays(TypedArrays)です
ReadableStream
には特別な 'bytes'
モードがあります。WritableStream
自体は、チャンクが文字列か Uint8Arrays かを気にしません。したがって、インスタンスがテキストストリームかバイトストリームかは、基になるシンクが処理できるチャンクの種類によって異なります。TransformStream
が処理できるチャンクの種類も、その Transformer によって異なります。次に、読み取り可能なバイトストリームを作成する方法を学習します。
ReadableStream
コンストラクターによって作成されるストリームの種類は、オプションの最初のパラメーター underlyingSource
のオプションのプロパティ .type
によって異なります
.type
が省略されているか、基になるソースが提供されていない場合、新しいインスタンスはテキストストリームです。
.type
が文字列 'bytes'
の場合、新しいインスタンスはバイトストリームです
const readableByteStream = new ReadableStream({
type: 'bytes',
async start() { /*...*/ }
// ...
; })
ReadableStream が 'bytes'
モードの場合、何が変わりますか?
デフォルトモードでは、基になるソースはあらゆる種類のチャンクを返すことができます。バイトモードでは、チャンクは ArrayBufferViews、つまり TypedArrays(Uint8Arrays など)または DataViews である必要があります。
さらに、読み取り可能なバイトストリームは 2 種類のリーダーを作成できます
.getReader()
は ReadableStreamDefaultReader
のインスタンスを返します。.getReader({mode: 'byob'})
は ReadableStreamBYOBReader
のインスタンスを返します。「BYOB」は「Bring Your Own Buffer」の略で、バッファ(ArrayBufferView)を reader.read()
に渡すことができることを意味します。その後、その ArrayBufferView はデタッチされ、使用できなくなります。ただし、.read()
は、同じ型を持ち、同じ ArrayBuffer の同じ領域にアクセスする新しい ArrayBufferView でデータを返します。
さらに、可読バイトストリームは異なるコントローラーを持ちます。それらはReadableByteStreamController
(ReadableStreamDefaultController
ではなく)のインスタンスです。基になるソースにArrayBufferView(TypedArrayまたはDataView)をエンキューすることを強制する以外に、プロパティ.byobRequest
を介してReadableStreamBYOBReadersもサポートします。基になるソースは、このプロパティに格納されているBYOBRequestにデータを書き込みます。Web Streams標準には、「ストリームの作成例」セクションに.byobRequest
を使用する2つの例があります。
次の例では、チャンクをランダムデータで満たす無限の可読バイトストリームを作成します(インスピレーション:「Node.jsでのWeb Streams APIの実装」のexample4.mjs
)。
import {promisify} from 'node:util';
import {randomFill} from 'node:crypto';
const asyncRandomFill = promisify(randomFill);
const readableByteStream = new ReadableStream({
type: 'bytes',
async pull(controller) {
const byobRequest = controller.byobRequest;
await asyncRandomFill(byobRequest.view);
.respond(byobRequest.view.byteLength);
byobRequest,
};
})
const reader = readableByteStream.getReader({mode: 'byob'});
const buffer = new Uint8Array(10); // (A)
const firstChunk = await reader.read(buffer); // (B)
console.log(firstChunk);
readableByteStream
は無限であるため、ループすることはできません。そのため、最初のチャンク(行B)のみを読み取ります。
行Aで作成したバッファは転送されるため、行Bの後では読み取れません。
次の例では、可読バイトストリームを作成し、それをGZIP形式に圧縮するストリームにパイプします。
const readableByteStream = new ReadableStream({
type: 'bytes',
start(controller) {
// 256 zeros
.enqueue(new Uint8Array(256));
controller.close();
controller,
};
})const transformedStream = readableByteStream.pipeThrough(
new CompressionStream('gzip'));
await logChunks(transformedStream);
async function logChunks(readableByteStream) {
const reader = readableByteStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(value);
}finally {
} .releaseLock();
reader
} }
fetch()
を使用したWebページの読み取りfetch()
の結果は、プロパティ.body
が可読バイトストリームであるレスポンスオブジェクトに解決されます。TextDecoderStream
を使用して、そのバイトストリームをテキストストリームに変換します。
const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream.pipeThrough(
new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
console.log(stringChunk);
}
Node.jsは、_ユーティリティコンシューマー_と呼ばれる以下のヘルパー関数をサポートする唯一のWebプラットフォームです。
import {
,
arrayBuffer,
blob,
buffer,
json,
textfrom 'node:stream/consumers'; }
これらの関数は、Web ReadableStreams、Node.js Readables、およびAsyncIteratorsを、以下で履行されるPromiseに変換します。
arrayBuffer()
)blob()
)buffer()
)json()
)text()
)バイナリデータはUTF-8エンコードされていると想定されます。
import * as streamConsumers from 'node:stream/consumers';
const readableByteStream = new ReadableStream({
type: 'bytes',
start(controller) {
// TextEncoder converts strings to UTF-8 encoded Uint8Arrays
const encoder = new TextEncoder();
const view = encoder.encode('"😀"');
.deepEqual(
assert,
viewUint8Array.of(34, 240, 159, 152, 128, 34)
;
).enqueue(view);
controller.close();
controller,
};
})const jsonData = await streamConsumers.json(readableByteStream);
.equal(jsonData, '😀'); assert
文字列ストリームは期待どおりに動作します。
import * as streamConsumers from 'node:stream/consumers';
const readableByteStream = new ReadableStream({
start(controller) {
.enqueue('"😀"');
controller.close();
controller,
};
})const jsonData = await streamConsumers.json(readableByteStream);
.equal(jsonData, '😀'); assert
このセクションで言及されているすべての資料は、この章のソースでした。
この章では、Web Streams APIのすべての側面を網羅しているわけではありません。詳細については、こちらをご覧ください。
その他の資料