readableToString()この章では、Node のネイティブストリームを紹介します。ネイティブストリームは非同期イテレーションをサポートしており、操作が容易になります。本章では主にこれを使用します。
クロスプラットフォームの *Web ストリーム* については、§10 “Node.js での Web ストリームの使用” で説明します。本書では主にこれらを使用します。そのため、必要に応じて本章はスキップできます。
非同期イテレーションは、データコンテナの内容を非同期的に取得するためのプロトコルです (つまり、アイテムを取得する前に現在の「タスク」が一時停止される可能性があります)。
非同期ジェネレーターは、非同期イテレーションに役立ちます。たとえば、これは非同期ジェネレーター関数です。
/**
* @returns an asynchronous iterable
*/
async function* asyncGenerator(asyncIterable) {
for await (const item of asyncIterable) { // input
if (···) {
yield '> ' + item; // output
}
}
}for-await-of ループは、入力 asyncIterable を反復処理します。このループは、通常の非同期関数でも使用できます。yield は、このジェネレーターによって返される非同期イテラブルに値を供給します。本章の残りの部分では、関数が非同期関数なのか非同期ジェネレーター関数なのかに注意してください。
/** @returns a Promise */
async function asyncFunction() { /*···*/ }
/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }ストリームは、大量のデータを「分割して征服する」という中心的なアイデアを持つパターンです。データを小さな部分に分割して一度に 1 つの部分を処理すれば、処理できます。
Node.js は、いくつかの種類のストリームをサポートしています。たとえば、
*Readable ストリーム* は、データを読み取ることができるストリームです。言い換えれば、データのソースです。例としては、ファイルの内容を読み取ることができる *readable ファイルストリーム* があります。
*Writable ストリーム* は、データを書き込むことができるストリームです。言い換えれば、データのシンクです。例としては、ファイルにデータを書き込むことができる *writable ファイルストリーム* があります。
*変換ストリーム* は、readable と writable の両方です。writable ストリームとしてデータの一部を受信し、*変換* (変更または破棄) してから、readable ストリームとして出力します。
ストリーミングデータを複数の手順で処理するには、ストリームを *パイプライン* (接続) します。
パート (2) はオプションです。
テキストストリームを作成するときは、常にエンコーディングを指定することをお勧めします。
Node.js のドキュメントには、サポートされているエンコーディングとそのデフォルトのスペルの一覧 があります。たとえば、
'utf8''utf16le''base64'いくつかの異なるスペルも許可されています。Buffer.isEncoding() を使用して、どれが使用できるかを確認できます。
> buffer.Buffer.isEncoding('utf8')
true
> buffer.Buffer.isEncoding('utf-8')
true
> buffer.Buffer.isEncoding('UTF-8')
true
> buffer.Buffer.isEncoding('UTF:8')
falseエンコーディングのデフォルト値は null で、これは 'utf8' と同等です。
readableToString()このヘルパー関数を時々使用します。どのように機能するかは理解する必要はなく、(おおまかに) 何をするかだけを理解すれば十分です。
import * as stream from 'stream';
/**
* Reads all the text in a readable stream and returns it as a string,
* via a Promise.
* @param {stream.Readable} readable
*/
function readableToString(readable) {
return new Promise((resolve, reject) => {
let data = '';
readable.on('data', function (chunk) {
data += chunk;
});
readable.on('end', function () {
resolve(data);
});
readable.on('error', function (err) {
reject(err);
});
});
}この関数は、イベントベースの API を介して実装されています。後ほど、これを行うためのより簡単な方法 (非同期イテレーションを使用) を紹介します。
await が時々見られます。その場合、モジュール内 または非同期関数の本体内にいると想定します。'\n' (LF)'\r\n' (CR LF)os の 定数 EOL を介してアクセスできます。fs.createReadStream() を使用して、readable ストリームを作成できます。
import * as fs from 'fs';
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
assert.equal(
await readableToString(readableStream),
'This is a test!\n');Readable.from(): イテラブルからの Readable ストリームの作成静的メソッド Readable.from(iterable, options?) は、iterable に含まれるデータを保持する readable ストリームを作成します。iterable は、同期イテラブルまたは非同期イテラブルです。パラメーター options はオプションで、とりわけ、テキストエンコーディングを指定するために使用できます。
import * as stream from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
assert.equal(
await readableToString(readableStream),
'One line\nAnother line\n');Readable.from() は任意のイテラブルを受け入れるため、文字列をストリームに変換するためにも使用できます。
import {Readable} from 'stream';
const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
assert.equal(
await readableToString(readable),
'Some text!');現時点では、Readable.from() は文字列を他のイテラブルと同様に扱い、コードポイントを反復処理します。これはパフォーマンスの面では理想的ではありませんが、ほとんどのユースケースでは問題ないはずです。Readable.from() は文字列でよく使用されることが予想されるため、将来的に最適化される可能性があります。
for-await-of を使用した Readable ストリームからのチャンクの読み取りすべての readable ストリームは非同期的に反復可能であるため、for-await-of ループを使用してその内容を読み取ることができます。
import * as fs from 'fs';
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk);
}
}
const readable = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);
// Output:
// 'This is a test!\n'次の関数は、本章の冒頭で説明した関数のよりシンプルな再実装です。
import {Readable} from 'stream';
async function readableToString2(readable) {
let result = '';
for await (const chunk of readable) {
result += chunk;
}
return result;
}
const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');この場合、Promise を返す必要があったため、非同期関数を使用する必要がありました。
'node:readlines' を使用した Readable ストリームからの行の読み取り組み込みモジュール 'node:readline' を使用すると、readable ストリームから行を読み取ることができます。
import * as fs from 'node:fs';
import * as readline from 'node:readline/promises';
const filePath = process.argv[2]; // first command line argument
const rl = readline.createInterface({
input: fs.createReadStream(filePath, {encoding: 'utf-8'}),
});
for await (const line of rl) {
console.log('>', line);
}
rl.close();非同期イテレーションは、ストリーミングデータを複数の手順で処理するための変換ストリームに代わる洗練された方法を提供します。
Readable.from() を介して readable ストリームに変換できます (後で writable ストリームにパイプできます)。要約すると、これらはそのような処理パイプラインの要素です。
次の例では、説明したばかりの処理パイプラインの例を示します。
import {Readable} from 'stream';
/**
* @param chunkIterable An asynchronous or synchronous iterable
* over “chunks” (arbitrary strings)
* @returns An asynchronous iterable over “lines”
* (strings with at most one newline that always appears at the end)
*/
async function* chunksToLines(chunkIterable) {
let previous = '';
for await (const chunk of chunkIterable) {
let startSearch = previous.length;
previous += chunk;
while (true) {
// Works for EOL === '\n' and EOL === '\r\n'
const eolIndex = previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// Line includes the EOL
const line = previous.slice(0, eolIndex+1);
yield line;
previous = previous.slice(eolIndex+1);
startSearch = 0;
}
}
if (previous.length > 0) {
yield previous;
}
}
async function* numberLines(lineIterable) {
let lineNumber = 1;
for await (const line of lineIterable) {
yield lineNumber + ' ' + line;
lineNumber++;
}
}
async function logLines(lineIterable) {
for await (const line of lineIterable) {
console.log(line);
}
}
const chunks = Readable.from(
'Text with\nmultiple\nlines.\n',
{encoding: 'utf8'});
await logLines(numberLines(chunksToLines(chunks))); // (A)
// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'処理パイプラインは行 A で設定されます。手順は次のとおりです。
chunksToLines(): チャンクを含む非同期イテラブルを行を含む非同期イテラブルに変換します。numberLines(): 行を含む非同期イテラブルを行番号付きの行を含む非同期イテラブルに変換します。logLines(): 非同期イテラブルのアイテムをログに記録します。観察
chunksToLines() と numberLines() の入力と出力はどちらも非同期イテラブルです。そのため、これらは非同期ジェネレーターです (async と * で示されます)。logLines() の入力のみが非同期イテラブルです。そのため、これは非同期関数です (async で示されます)。fs.createWriteStream() を使用して、writable ストリームを作成できます。
const writableStream = fs.createWriteStream(
'tmp/log.txt', {encoding: 'utf8'});このセクションでは、writable ストリームに書き込むためのアプローチについて説明します。
.write() を介して writable ストリームに直接書き込みます。stream の関数 pipeline() を使用して、readable ストリームを writable ストリームにパイプします。これらのアプローチをデモンストレーションするために、それらを使用して同じ関数 writeIterableToFile() を実装します。
readable ストリームのメソッド .pipe() もパイピングをサポートしていますが、欠点があり、使用しない方がよいでしょう。
writable.write(chunk)ストリームにデータを書き込む場合、役立つ 2 つのコールバックベースのメカニズムがあります。
'drain' は、バックプレッシャーが終了したことを知らせます。finished() は、ストリームがコールバックを呼び出します。次の例では、これらのメカニズムを Promise 化して、非同期関数で使用できるようにします。
import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';
const finished = util.promisify(stream.finished); // (A)
async function writeIterableToFile(iterable, filePath) {
const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
for await (const chunk of iterable) {
if (!writable.write(chunk)) { // (B)
// Handle backpressure
await once(writable, 'drain');
}
}
writable.end(); // (C)
// Wait until done. Throws if there are errors.
await finished(writable);
}
await writeIterableToFile(
['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
'One line of text.\n');stream.finished() のデフォルトバージョンはコールバックベースですが、util.promisify() を介して Promise ベースのバージョンに変換できます (行 A)。
次の 2 つのパターンを使用しました。
バックプレッシャーを処理しながら writable ストリームに書き込みます (行 B)。
if (!writable.write(chunk)) {
await once(writable, 'drain');
}writable ストリームを閉じて、書き込みが完了するまで待ちます (行 C)。
writable.end();
await finished(writable);stream.pipeline() を使用した readable ストリームの writable ストリームへのパイピング行 A では、stream.pipeline() の Promise 化されたバージョンを使用して、readable ストリーム readable を writable ストリーム writable にパイプします。
import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);
async function writeIterableToFile(iterable, filePath) {
const readable = stream.Readable.from(
iterable, {encoding: 'utf8'});
const writable = fs.createWriteStream(filePath);
await pipeline(readable, writable); // (A)
}
await writeIterableToFile(
['One', ' line of text.\n'], 'tmp/log.txt');
// ···readable.pipe(destination)メソッド readable.pipe() もパイピングをサポートしていますが、注意点 があります。readable がエラーを発生させた場合、writable は自動的に閉じられません。pipeline() にはその注意点がありません。
モジュール os
const EOL: string (0.7.8 以降)
現在のプラットフォームで使用される行末文字シーケンスが含まれています。
モジュール buffer
Buffer.isEncoding(encoding: string): boolean (0.9.1 以降)
encoding が、Node.js でサポートされているテキストエンコーディングのいずれかを正しく指定している場合、true を返します。 サポートされているエンコーディング には以下が含まれます。
'utf8''utf16le''ascii''latin1''base64''hex' (各バイトを2つの16進文字として)モジュール stream
Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any> (10.0.0以降)
Readable ストリームは非同期的に反復可能です。たとえば、非同期関数または非同期ジェネレーターで for-await-of ループを使用して反復処理できます。
finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void> (10.0.0以降)
返された Promise は、読み取り/書き込みが完了したとき、またはエラーが発生したときに解決されます。
この Promise 化されたバージョンは、次のように作成されます。
const finished = util.promisify(stream.finished);pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void> (10.0.0以降)
ストリーム間のパイプです。返された Promise は、パイプラインが完了したとき、またはエラーが発生したときに解決されます。
この Promise 化されたバージョンは、次のように作成されます。
const pipeline = util.promisify(stream.pipeline);Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable (12.3.0以降)
反復可能オブジェクトを読み取り可能なストリームに変換します。
interface ReadableOptions {
highWaterMark?: number;
encoding?: string;
objectMode?: boolean;
read?(this: Readable, size: number): void;
destroy?(this: Readable, error: Error | null,
callback: (error: Error | null) => void): void;
autoDestroy?: boolean;
}これらのオプションは、Readable コンストラクターのオプションと同じであり、こちらにドキュメント化されています。
モジュール fs
createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream (2.3.0以降)
読み取り可能なストリームを作成します。より多くのオプションが利用可能です。
createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream (2.3.0以降)
オプション .flags を使用すると、書き込みまたは追加を行うかどうか、およびファイルが存在する場合と存在しない場合の動作を指定できます。より多くのオプションが利用可能です。
このセクションの静的型情報は、Definitely Typed に基づいています。