はじめに
こんにちは。iimonでエンジニアをしている保田です。
本日は、Web標準技術であるStreams APIについて、その基本と実装サンプルまでを解説します。TypeScriptを用いた具体的なコード例を交えながら、Streams APIがなぜ必要で、どのように活用すべきかを理解していきたいと思います。
Streams API とは
まず、Streams APIとは何かを説明します。
Streams APIは、データの流れ(ストリーム)を効率的に処理するためのWeb標準技術です。
従来のアプローチでは、大容量のデータの場合、
全データが揃うまで待ってから処理を開始していました。
一方、Streams APIでは、データをチャンクという小さな単位に分けて、
到着したものから順次処理を開始することができます。
https://developer.mozilla.org/ja/docs/Web/API/Streams_API
Streams APIの3つの構成要素
Streams APIは3つのストリームから構成されています。
1. ReadableStream(読み取り)
– Fetch APIのレスポンスボディはReadableStream
– データのソースとなるストリーム
const response = await fetch('https://example.com/data.json'); if (!response.body) { throw new Error('レスポンスボディが存在しません'); } const stream: ReadableStreamUint8Array> = response.body;
2. TransformStream(変換)
– データを変換・加工するストリーム
– ReadableStreamから出てくるのはUint8Array(バイナリデータ)なので、変換するためにTextDecoderStream()を通す必要がある
const decoder = new TextDecoderStream(); const textStream: ReadableStreamstring> = stream.pipeThrough(decoder);
3. WritableStream(書き込み)
– データの出力先となるストリーム
– ファイル保存、DOM要素への出力などを行う
const fileHandle = await window.showSaveFilePicker(); const writable: WritableStream = await fileHandle.createWritable();
主要なメソッド
実装で頻繁に使うメソッドを簡単に紹介します。
ストリームの接続メソッド
pipeThrough(transformStream)
– データを変換しながら次のストリームに渡すメソッド
– TransformStreamを引数に取り、変換後のReadableStreamを返す
– 複数回連結して、データ変換のパイプラインを構築することができる
stream .pipeThrough(transform1) .pipeThrough(transform2)
pipeTo(writableStream)
– データを最終的な出力先に送るメソッド
– WritableStreamを引数に取り、処理完了を示すPromiseを返す
– パイプラインの最後に必ず1回だけ使用する
await stream
.pipeThrough(transform)
.pipeTo(destination);
ReadableStreamのメソッド
getReader()
ストリームから直接データを読み取るための リーダー(Reader) を取得するメソッドです。
使い分け
– pipeThrough / pipeTo → 自動パイプ処理向け(推奨)
– getReader() → 手動でチャンク単位の処理 をしたい場合
使用時の注意点
– 読み取り中は ストリームがロックされる ため、他の操作ができない
– 処理終了後は releaseLock() でロックを解放する必要がある
– 中断する場合は cancel(reason) でリソースを解放
基本的な使い方
const reader = stream.getReader(); try { while (true) { const { done, value } = await reader.read(); if (done) { console.log('読み取り完了'); break; } console.log('受信:', value); } } finally { reader.releaseLock(); }
cancel(reason)
– ストリームの読み取りを 途中で中止 するメソッドです。
– 不要になったストリームのリソースを解放できます。
– reason は省略可能ですが、エラー原因などを渡せます。
– pipeThrough や pipeTo で接続している場合、キャンセルすると 下流の WritableStream も影響を受けます。
await stream.cancel("不要になったため中止");
TransformStreamDefaultControllerのプロパティとメソッド
enqueue(chunk)
– 変換したデータを次のストリームに送出するメソッド
– transform()メソッド内で使用
transform(chunk, controller) { const processed = processData(chunk); controller.enqueue(processed); }
desiredSize(プロパティ)
– キューの残り容量を示す数値
– 正の値 → まだデータを受け入れ可能
– 0以下 → キューが満杯(後で説明する背圧が発生している状態)
console.log(controller.desiredSize); controller.enqueue(data); console.log(controller.desiredSize);
Transformerインターフェースのメソッド
transform(chunk, controller)
– 各チャンクに対して実行される変換処理
– 上流から送られてきたデータ(chunk)を受け取る
– 必要に応じて加工・変換する
– 加工したデータを controller.enqueue() で下流に送る
transform(chunk: string, controller: TransformStreamDefaultControllernumber>) { const number = parseInt(chunk, 10); if (!isNaN(number)) { controller.enqueue(number); } }
flush(controller)
– flush(controller) は、TransformStream が終了するときに呼ばれる関数
– 変換中に残ったバッファのデータを最後に下流に送る
– ストリームの終端処理を行う
flush(controller: TransformStreamDefaultControllerstring>) { if (this.buffer.length > 0) { controller.enqueue(this.buffer); } }
start(controller)(オプション)
– Transformerの初期化処理(バッファの初期化やログ出力など)
– 必要に応じて実装
start(controller: TransformStreamDefaultControllerstring>) { console.log('変換処理を開始します'); }
パイプラインの構築
これらのストリームをpipeThrough()とpipeTo()で連結することで、データ処理のパイプラインを構築できます。
await stream
.pipeThrough(decoder)
.pipeThrough(jsonParser)
.pipeTo(writer);
なぜ Streams API が必要なのか
従来の処理方法が抱える課題
ここで、従来の方法が抱える2つの問題についてみてみます。
メモリ負荷の増大
– 従来の方法では データ全体を一度にメモリに読み込む
– データサイズが大きいと、メモリを大量に消費してしまう
– 最悪の場合、アプリがクラッシュすることもある
const response = await fetch('large-file.json'); const data = await response.json();
初回応答の遅延
– データのダウンロードや読み込みが完了するまで、画面に何も表示されない
– ユーザーは数秒~数十秒間、待たされることになる
– 特に大量データの表示やリアルタイム処理には不向き
const users = await fetchAllUsers(); displayUsers(users);
Streams APIによる解決
Streams APIは、チャンク単位でデータを処理することで、これらの課題を解決します。
メモリ効率の最適化
- データを チャンクごとに処理
- メモリに残るのは、チャンクサイズ × キューサイズだけ
- 大容量ファイルでも安定して動作
if (!response.body) { throw new Error('レスポンスボディが存在しません'); } await response.body .pipeThrough(new TextDecoderStream()) .pipeThrough(new TransformStream({ transform(chunk, controller) { const processed = processChunk(chunk); controller.enqueue(processed); } })) .pipeTo(writer);
低遅延の実現
if (!response.body) { throw new Error('レスポンスボディが存在しません'); } await response.body .pipeThrough(decoder) .pipeTo(new WritableStream({ write(chunk) { displayChunk(chunk); } }));
内部キューと背圧制御
ここから、Streams APIの重要な機能である背圧制御について説明します。
背圧制御とは
背圧制御とは、データの生産速度と消費速度の差を自動的に調整する仕組みです。
例えば、データの生成が速すぎてメモリが溢れないように、また、処理が遅すぎて待機時間が発生しないように、自動的にバランスを取ります。
内部キューの仕組み
各ストリームは、内部にキュー(待ち行列)を持っています。
このキューには、highWaterMarkという最大容量が設定されています。
例えば、highWaterMark: 5の場合、最大5個のチャンクを保持できます。
データを生成するReadableStreamと、データを消費するWritableStreamの速度が異なる場合、
このキューが満杯になったり、空になったりします。
背圧制御は、この状態を自動的に検知して、データフローを調整しています。
new ReadableStream({ }, { highWaterMark: 5 });
背圧制御の動作パターン
具体的な動作を3つのケースで見てみます。
通常の状態
- 処理がスムーズ
- キューに余裕がある
- 背圧なし
ReadableStream → [キュー: □□□__] → WritableStream
(3/5個使用) ↓
高速処理
処理が遅い
- キューが満杯になる
- 背圧発生 → ReadableStreamに停止信号
- ReadableStreamは一時停止
ReadableStream → [キュー: ■■■■■] → WritableStream
↑ (5/5個満杯) ↓
停止信号 低速処理
処理が追いつく
- キューに空きができる
- 背圧解除 → ReadableStreamに再開信号
- ReadableStreamが再開
ReadableStream → [キュー: ■■___] → WritableStream
↑ (2/5個使用) ↓
再開信号 処理完了
このように、自動的にペースを調整することで、メモリに溜まるデータ量が制限されます。
desiredSizeプロパティの役割
desiredSizeプロパティは、キューの残り容量を示す重要な指標です。
desiredSizeの計算式
desiredSize = highWaterMark - queueTotalSize desiredSize = 5 desiredSize = 2 desiredSize = 0 desiredSize = -1
この値によって、次のセクションで説明するpull()メソッドの呼び出しが制御されます。
pull()とenqueue()による自動調整
背圧制御の核心は、pull()メソッドとdesiredSizeの連携にあります。
仕組みの詳細
ReadableStreamのpull()は 下流のキューに余裕があるとき (desiredSize > 0) に自動的に呼ばれるcontroller.enqueue(chunk)でチャンクを追加すると、desiredSize が減少desiredSize になると、pull() は自動的に停止- 上流はこれ以上チャンクを作らないので、メモリ使用量が抑えられる
- 下流の処理が進み、キューに余裕が出るとpull() が再び呼ばれる
つまり、手動でペース調整する必要がなく、
システムが自動的にメモリ使用量を制限してくれます。
実装例
const stream = new ReadableStream({ start(controller) { console.log('ストリーム開始'); }, pull(controller) { const chunk = generateData(); controller.enqueue(chunk);
試してみる
Metropolitan Museum of Art APIから約50万件のオブジェクトID(約5MB)を取得する例で、
実際に試してみました。
従来の方法の場合
type ApiResponse = { total: number; objectIDs: number[]; }; const fetchAllIds = async (): Promisenumber[]> => { const response = await fetch('https://collectionapi.metmuseum.org/public/collection/v1/objects'); const data: ApiResponse = await response.json(); return data.objectIDs; }; const ids = await fetchAllIds(); console.log(`取得完了: ${ids.length}件`);
問題点:
– ダウンロード完了まで待機時間が約20秒間程度
– 全データ(約5MB)がメモリに保持される
– 初回表示までの時間が長い
Streams APIを使用した場合
Streams APIでは、データをチャンクごとに処理します。
CustomReadableStream
Fetch APIのレスポンスをReadableStreamとして扱うカスタムクラス。
class CustomReadableStream extends ReadableStreamUint8Array> { constructor(url: string) { let reader: ReadableStreamDefaultReaderUint8Array> | null = null; super({ async start(controller) { try { const response = await fetch(url); if (!response.ok) { throw new Error(`HTTP Error: ${response.status}`); } if (!response.body) { throw new Error("No response body"); } reader = response.body.getReader(); } catch (error) { controller.error(error); } }, async pull(controller) { if (!reader) { controller.error(new Error("Reader not initialized")); return; } try { const { done, value } = await reader.read(); if (done) { controller.close(); } else { controller.enqueue(value); } } catch (error) { controller.error(error); } }, async cancel() { if (reader) { await reader.cancel(); reader = null; } }, }); } }
CustomTransformStream
受け取った文字列データを加工して、必要なIDだけを取り出す変換用ストリームのクラス。
class CustomTransformStream extends TransformStreamstring, number> { constructor() { let buffer = ""; let isInsideArray = false; super({ start() { console.log('Transform開始'); }, transform(chunk, controller) { buffer += chunk; if (!isInsideArray) { const arrayStartIndex = findArrayStart(buffer); if (arrayStartIndex !== null) { isInsideArray = true; buffer = buffer.substring(arrayStartIndex); } } if (isInsideArray) { const ids = extractIds(buffer); for (const id of ids) { controller.enqueue(id); } buffer = cleanBuffer(buffer); } }, flush(controller) { const ids = extractIds(buffer); for (const id of ids) { controller.enqueue(id); } console.log('Transform完了'); }, }); } }
CustomWritableStream
変換されたIDを受け取り、UIにテキストとして表示する書き込み用ストリームのクラス。
class CustomWritableStream extends WritableStreamnumber> { constructor( onWrite?: (id: number) => void, onComplete?: () => void, onError?: (error: any) => void, ) { super({ async write(id) { if (onWrite) { onWrite(id); } }, close() { console.log("処理完了"); if (onComplete) { onComplete(); } }, abort(reason) { console.error("エラー:", reason); if (onError) { onError(reason); } }, }); } }
パイプラインでの使用例
const readable = new CustomReadableStream(MET_API_URL); const decoder = new TextDecoderStream(); const transformer = new CustomTransformStream(); const writable = new CustomWritableStream( (id) => console.log(`ID受信: ${id}`), () => console.log('処理完了'), (error) => console.error('エラー:', error) ); try { await readable .pipeThrough(decoder) .pipeThrough(transformer) .pipeTo(writable); } catch (error) { console.error('ストリーム処理エラー:', error); }
測定結果
※ 環境によって結果は異なります
| 項目 | 従来の方法 | Streams API |
|---|---|---|
| 初回表示 | 約20秒待機 | 即座(数十ms) |
| 表示方法 | 一括表示 | 逐次表示 |
| メモリ使用量 | 約5MB増加 | 一定(チャンクサイズ程度) |
Streams APIが適している場面
大容量データの処理 と リアルタイム性が求められる処理 の2つで利用されることが多いようです。
例:リアルタイムデータ処理、大容量ファイルのダウンロード・アップロードなど
まとめ
本日は、Streams API の基本について解説しました。
今後は、実装の選択肢のひとつとして適した場面があれば、積極的に活用していきたいと思います。
特に、今回紹介した背圧制御の仕組みを理解することで、なぜメモリ効率が良いのか、なぜ安定して動作するのかが理解できました。
ご清聴ありがとうございました!
この技術に興味を持っていただけた方、一緒に開発してみたい方は、ぜひカジュアルにお話しさせてください。
iimon採用サイト / Wantedly / Green