見出し画像

こんにちは。株式会社ラキールで DX 基盤開発を行う LaKeel DX Engine Group の澁井です。
普段は LaKeel Synergy Logic という自社 API Gateway を開発しています。

社内ウェビナー(ウェブセミナー)で発表した Node.js Stream API の利用方法について、その内容の一部を公開します。

Node.js Streamとは?

Node.js の Stream API は、ストリーミングデータを扱うためのインターフェースです。
Stream の特徴は、データを小さな単位に分けて、それを読み込むごとにイベントが発行される点にあります。
データを分割して処理するため、メモリ消費を抑えることができます。また、読み込んだ端から処理を行えるため、場合によっては処理時間の短縮も期待できます。非同期 I/O の根幹をなす技術であり、Node.js の開発を行う上では習得必須となります。
本記事では、Node.js Stream の概念と、基本的な活用方法について説明します。

Stream を使う場合と使わない場合の違い

Node.js のプロセスでファイルを読み込んで処理をする、というケースを想定してみます。

Stream を使わない場合
メモリ上にファイルの内容を全て読みこみ、読み込み完了後にイベントが1回だけ発火します。
ファイルサイズ分のメモリを消費することになる上に、読み込み完了するまでデータを扱うことはできません。

Stream を使う場合
新しいデータが読み込まれるたびにイベントが発火するため、処理を開始するのに読み込み完了を待つ必要がありません。
また、読み込んだ少量のデータ分だけメモリを利用するため、効率的にデータを取り扱えます。

Stream の種類

Stream のモジュールには、4つの抽象クラスが用意されています。
Node.js のすべての Stream は、この4つのいずれかの実装になります。

  • Readable: データの読み取り用

  • Writable: データの書き込み用(コンシューマー)

  • Duplex: Readable かつ Writable な Stream

  • Transform: 読み取り、書き込み時にデータ変更可能な二重 Stream


※本記事では、基本的な Readable / Writable ストリームについて解説します。

Readable

データの読み込み用の Stream です。
大元のリソースから、内部バッファに少しずつデータを読み込みます。
Stream の内部バッファからデータを取り出す方法には、non-flowing モード / flowing モードの2種類があります。

non-flowing
read() メソッドを明示的に呼び出して、内部バッファからデータを取り出します。
一般的な使い方としては、Stream の 'readable' イベントにリスナーをアタッチして、リソースが空になるまでデータの取り出しを繰り返し行います。

読み込んだファイルの内容を 50byte ずつコンソールに出力する例

import { createReadStream, Readable } from 'fs';

const readable: Readable = createReadStream('./hello.txt');
readable.on('readable', (): void => {
    let data: Buffer;
    while((data = readable.read(50)) !== null) {
        console.log(data.toString());
    }
});

※このモードでは、Stream の内部バッファからの読み取り量を逐次調整することができます。その必要がない場合は、以下に示すように flowing モードを利用するか、単純に Stream 同士を接続してください。

flowing
Stream の 'data' イベントにリスナーをアタッチすることで、自動的に flowing モードに切り替わります。
このモードでは、データが到着するとすぐに 'data' イベントが発火し、Stream の内部バッファデータがリスナーに引き渡されます。

import { createReadStream, ReadStream } from 'fs';

const readable: ReadStream = createReadStream('./hello.txt', highWaterMark: 50);
readable.on('data', (data: Buffer): void => {
    console.log(data.toString());
});

Writable

データの書き込み用 Stream です。
write() メソッドを利用して、Stream にデータをプッシュします。

Readable Streamである標準出力から、ファイルに書き込む例を紹介します。
fs モジュールの createWriteStream は、プッシュされたデータを指定したファイルに書き込むことができます。

import { createWriteStream, WriteStream } from 'fs';

const writable: WriteStream = createWriteStream('./sample.txt');
process.stdin.on('data', (data: Buffer) => {
    writable.write(data);
});

Stream の接続

ここまで、Stream の基本的な動作を確認するために、手動で内部バッファリングを行う手法を紹介してきました。しかし、特殊なケースを除けば、これらを手動で行うことは一般的ではありません。
これから示す pipe() / pipeline() メソッドを利用する方法をおすすめします。

pipe() / pipeline()

これらのメソッドは、Readable Stream の内部バッファから flowing モードでデータを取り出し、指定した Writable Stream にすべてのデータをプッシュします。
簡略化して記述できるうえに、データの流れが自動的に管理されるので、効率的にデータを処理することができます。

Stream の接続イメージ

Readable である標準出力から、ファイルに書き込む例

import { createWriteStream, WriteStream } from 'fs';

const writable: WriteStream = createWriteStream('./sample.txt');

/**
 * pipe() を利用する場合
 */
process.stdin.pipe(writable);

/**
 * pipeline() を利用する場合
 */
pipeline(process.stdin, writable, (err: any) => {});

※ pipe() メソッドは、Readable Stream 内部でエラーが発生した場合に、Writable Stream を閉じるための後始末を書く必要があります。
これを放っておくとメモリリークする可能性があるため、pipeline を利用することをおすすめします。

pipe() 利用時にエラーが発生した場合の後始末の例

readable.on('error', (err: any) => {
    writable.end();  // Writable Stream を閉じ、finish イベントを発火させる。
}).pipe(writable);

また、Node.js v.15.0.0 以降では、Streams Promises API が使えるようになりました。async/await で処理を書く場合は、pipeline() を利用してください。

import { pipeline } from 'stream/promises';

try {
    await pipeline(readable, writable);
} catch (e) {
    // エラーハンドリング
}

Node.js Streams Promises API

バックプレッシャーと highWaterMark

各 Stream は、上流からデータを内部バッファに読み込んでいきます。
もしも、それを処理するよりも速く Stream にデータが流れ込んできたらどうなるでしょうか。
内部バッファの量が肥大化し、最悪の場合、メモリ不足に陥ってプロセスが終了してしまう可能性があります。

これを回避するために、Stream では内部バッファのしきい値を設定することができます。
データ量がこのしきい値を超えた時に、一時的にリソースからのデータの読み取りを停止させることで、内部バッファが溢れることを防ぎます。

この流量制御の仕組みをバックプレッシャーと呼び、しきい値を highWaterMark と呼びます。

バックプレッシャーのイメージ

highWaterMark はあくまでしきい値であり、物理的なメモリの制約ではありません。
例えば、消費の遅い Writable Stream に対して、highWaterMark を超える量のデータを write() で書き込んでいくことも可能です。

import { createReadStream, createWriteStream, ReadStream, WriteStream } from "fs";

const readable: ReadStream = createReadStream('./sample.txt');
const writable: WriteStream = createWriteStream('./copy.txt', { highWaterMark: 64 });

readable.on('readable', (): void => {
    let data: Buffer;

    // highWaterMark 64byte の Writable Stream に、1KBのデータを書き込む
    while((data = readable.read(1024)) !== null) {
        writable.write(data)
    }
});

pipe() / pipeline() メソッドでは、設定された highWaterMark にもとづいたバックプレッシャーを自動的に行ってくれます。
このメソッドを利用する限り、開発者はバックプレッシャーについてほとんど意識する必要はありません。
Node.js 公式のドキュメントにも詳しく記載されているので、興味がある方はこちらもご確認ください。

Node.js Stream Buffering

Event Emitter について

各 Stream には、処理の状態に応じて発火するイベントが存在します。
イベントにリスナーをアタッチすることでデータの流れを制御したり、特定の状態の時に任意の処理を実行できます。

Readable ストリームのイベント

イベントにリスナーをアタッチするためには、on() メソッドを利用します。

const let: string = '';
readable.on('data', async (data: Buffer): Promise<void> => {
    result += data.toString();
    // データの取り出しを一時停止
    readable.pause();

    await sleep();
    
    // データの取り出し再開
    readable.resume();
})
.on('pause', () => {
    // pause() 実行時に実行される。
})
.on('resume', () => {
    // resume() 実行時に実行される
})
.on('end', async (): Promise<void> => {
    await fs.promises.writeFile(`./result.txt`, result);
})

Writable ストリームのイベント


writable.on('pipe', () => {
    // readable.pipe()が呼びされたときに実行する。
})
.on('finish', () => {
    // writable.end()が呼びされたときに実行する。
})
.on('close', () => {
    console.log('end writable stream.')
})

readable.pipe(writable);

性能比較

弊社では、マイクロサービス間で数 MB ~ 数百 MB の圧縮ファイルをやり取りする要件がありました。Stream を利用せずにやり取りしようとすると、当然のようにプロセスが OOM で異常終了してしまいます。

axios や node-fetch、undici などのHTTP クライアントライブラリの多くは、リクエストボディに Readable Stream を渡すことができるようになっています。これを利用することで、メモリを気にせずに大容量のデータを HTTP で送受信することが可能です。

シンプルな Express のサーバーを立ち上げて、Stream を利用する場合と利用しない場合を比較してみます。
undici ライブラリを利用して、ボディにファイルデータを含めて HTTP 通信を行ってみます。

メモリ使用量の違い

サンプルで使用したコードは以下の通りです。
一方では fs.promises.readFile() メソッドで、ファイルデータを全て読み込んでから送信し、もう一方では、Stream をそのままボディに利用しています。

送信側

import { createReadStream, type ReadStream} from 'fs';
import { readFile } from 'fs/promises';
import undici, { type Dispatcher } from 'undici';

// Stream を利用しない場合(Buffer)
const body: Buffer = await readFile('./request-1024mb.txt');

// Stream を利用する場合
const body: ReadStream = createReadStream('./request-1024mb.txt');

const options: { dispatcher?: Dispatcher } & Omit<Dispatcher.RequestOptions,'origin' | 'path'> = {
    method: 'POST',
    body: body
};

await undici.request('http://localhost:5000/save', options);

受信側

import { type Request, type Response } from 'express';
import { createWriteStream, type WriteStream } from 'fs';
const express = require('express')
const app = express()

const port: number = 5000;

app.post('/save', async (req: Request, res: Response): Promise<void> => {
    const writeStream: WriteStream = createWriteStream('./request');
    req.pipe(writeStream).on('finish', () => {
        res.send()
    })
});

app.listen(port, () => {
    console.log(`App listening on port ${port}`)
});
メモリ使用量の違い

Stream 利用しない場合では、ファイル読み込み時に Array Buffer が肥大化し、プロセス全体に大きな影響を与えます。
一方で Stream の場合は、内部バッファに読み込んだ分だけメモリを利用しているので、OOM の心配はありません。


メモリについての詳細は、以前に紹介した『メモリとはなんぞや ~ Node.js のメモリ管理を知る ~』を読んでみてください。

highWaterMark の設定

highWaterMark で内部バッファの容量を適切に調整することで、処理速度の向上が見込めます。
短絡的に考えれば、物理メモリを潤沢に用意し、highWaterMark の値を大きくするほど、処理速度は向上するように思えます。
あながち間違いではないのですが、書き込み側の処理能力を超える量のデータをつぎ込んでも意味はありません。

先ほどのファイル送信の例で、Readable Stream の highWaterMark 値を変えながら、処理速度を計測してみました。

highWaterMarkごとの処理時間

結果としては、highWaterMark を大きくするほど処理速度は向上しましたが、32KB を超えると変化は見られなくなりました。
Writable Stream 内部のファイル書き込み処理が、Readable Stream の読み込み処理より遅いためです。Readable Stream は、Writable Stream の処理速度に合わせて、読み込みを一時停止しながらデータを流し込んでいます。

highWaterMark の値は、Stream 内部でどのような処理を行うかで適正値が異なります。それぞれの処理内容に応じた値を設定してください。
弊社の案件では、1MB ~ 2MB の値が最も高速に処理をすることができました。

まとめ

  • Stream はデータを小さいサイズに分割して取り扱う事ができる。

    • メモリは、分割して内部バッファにため込まれた分だけ利用される。

    • バッファリングの流量制御には highWaterMark を利用する。

  • Readable Stream の内部バッファからデータを少しずつ取り出して、Writable Stream にプッシュして利用する。

    • これを自動的に行うために、Readable / Writable Stream を互いに pipe する。

  • 各 Stream が発火させるイベントにリスナーをアタッチして、任意の処理を実行することができる。

参考資料