【Streams API】メモリ効率と低遅延なデータ処理 – iimon TECH BLOG

はじめに

こんにちは。iimonでエンジニアをしている保田です。
本日は、Web標準技術であるStreams APIについて、その基本と実装サンプルまでを解説します。TypeScriptを用いた具体的なコード例を交えながら、Streams APIがなぜ必要で、どのように活用すべきかを理解していきたいと思います。

Streams API とは

まず、Streams APIとは何かを説明します。
Streams APIは、データの流れ(ストリーム)を効率的に処理するためのWeb標準技術です。
従来のアプローチでは、大容量のデータの場合、
全データが揃うまで待ってから処理を開始していました。
一方、Streams APIでは、データをチャンクという小さな単位に分けて、
到着したものから順次処理を開始することができます。
https://developer.mozilla.org/ja/docs/Web/API/Streams_API

Streams APIの3つの構成要素

Streams APIは3つのストリームから構成されています。

1. ReadableStream(読み取り)
– Fetch APIのレスポンスボディはReadableStream
– データのソースとなるストリーム

const response = await fetch('https://example.com/data.json');

if (!response.body) {
  throw new Error('レスポンスボディが存在しません');
}

const stream: ReadableStreamUint8Array> = response.body;

2. TransformStream(変換)
– データを変換・加工するストリーム
– ReadableStreamから出てくるのはUint8Array(バイナリデータ)なので、変換するためにTextDecoderStream()を通す必要がある

const decoder = new TextDecoderStream();
const textStream: ReadableStreamstring> =
  stream.pipeThrough(decoder);

3. WritableStream(書き込み)
– データの出力先となるストリーム
– ファイル保存、DOM要素への出力などを行う

const fileHandle = await window.showSaveFilePicker();
const writable: WritableStream = await fileHandle.createWritable();

主要なメソッド

実装で頻繁に使うメソッドを簡単に紹介します。

ストリームの接続メソッド

pipeThrough(transformStream)
– データを変換しながら次のストリームに渡すメソッド
– TransformStreamを引数に取り、変換後のReadableStreamを返す
– 複数回連結して、データ変換のパイプラインを構築することができる

stream
  .pipeThrough(transform1)  
  .pipeThrough(transform2)  
  

pipeTo(writableStream)
– データを最終的な出力先に送るメソッド
– WritableStreamを引数に取り、処理完了を示すPromiseを返す
– パイプラインの最後に必ず1回だけ使用する

await stream
  .pipeThrough(transform)
  .pipeTo(destination);  

ReadableStreamのメソッド

getReader()

ストリームから直接データを読み取るための リーダー(Reader) を取得するメソッドです。

使い分け
pipeThrough / pipeTo → 自動パイプ処理向け(推奨)
getReader()手動でチャンク単位の処理 をしたい場合

使用時の注意点
– 読み取り中は ストリームがロックされる ため、他の操作ができない
– 処理終了後は releaseLock() でロックを解放する必要がある
– 中断する場合は cancel(reason) でリソースを解放

基本的な使い方

const reader = stream.getReader();
try {
  while (true) {
    
    const { done, value } = await reader.read();

    
    
    if (done) {
      console.log('読み取り完了');
      break;
    }

    
    console.log('受信:', value);
  }
} finally {
  
  reader.releaseLock();
}

cancel(reason)
– ストリームの読み取りを 途中で中止 するメソッドです。
– 不要になったストリームのリソースを解放できます。
reason は省略可能ですが、エラー原因などを渡せます。
pipeThroughpipeTo で接続している場合、キャンセルすると 下流の WritableStream も影響を受けます

await stream.cancel("不要になったため中止");

TransformStreamDefaultControllerのプロパティとメソッド

enqueue(chunk)
– 変換したデータを次のストリームに送出するメソッド
transform()メソッド内で使用

transform(chunk, controller) {
  const processed = processData(chunk);
  controller.enqueue(processed);
}

desiredSize(プロパティ)
– キューの残り容量を示す数値
– 正の値 → まだデータを受け入れ可能
– 0以下 → キューが満杯(後で説明する背圧が発生している状態)

console.log(controller.desiredSize);  
controller.enqueue(data);
console.log(controller.desiredSize);  

Transformerインターフェースのメソッド

transform(chunk, controller)
– 各チャンクに対して実行される変換処理
– 上流から送られてきたデータ(chunk)を受け取る
– 必要に応じて加工・変換する
– 加工したデータを controller.enqueue()下流に送る

transform(chunk: string, controller: TransformStreamDefaultControllernumber>) {
  const number = parseInt(chunk, 10);

  if (!isNaN(number)) {
    controller.enqueue(number);  
  }
}

flush(controller)
flush(controller) は、TransformStream が終了するときに呼ばれる関数
– 変換中に残ったバッファのデータを最後に下流に送る
– ストリームの終端処理を行う

flush(controller: TransformStreamDefaultControllerstring>) {
  if (this.buffer.length > 0) {
    controller.enqueue(this.buffer);  
  }
}

start(controller)(オプション)
– Transformerの初期化処理(バッファの初期化やログ出力など)
– 必要に応じて実装

start(controller: TransformStreamDefaultControllerstring>) {
  console.log('変換処理を開始します');
}

パイプラインの構築

これらのストリームをpipeThrough()pipeTo()で連結することで、データ処理のパイプラインを構築できます。

await stream
  .pipeThrough(decoder)      
  .pipeThrough(jsonParser)   
  .pipeTo(writer);           

なぜ Streams API が必要なのか

従来の処理方法が抱える課題

ここで、従来の方法が抱える2つの問題についてみてみます。

メモリ負荷の増大
– 従来の方法では データ全体を一度にメモリに読み込む
– データサイズが大きいと、メモリを大量に消費してしまう
– 最悪の場合、アプリがクラッシュすることもある

const response = await fetch('large-file.json');
const data = await response.json(); 

初回応答の遅延
– データのダウンロードや読み込みが完了するまで、画面に何も表示されない
– ユーザーは数秒~数十秒間、待たされることになる
– 特に大量データの表示やリアルタイム処理には不向き

const users = await fetchAllUsers(); 
displayUsers(users);                 

Streams APIによる解決

Streams APIは、チャンク単位でデータを処理することで、これらの課題を解決します。

メモリ効率の最適化

  • データを チャンクごとに処理
  • メモリに残るのは、チャンクサイズ × キューサイズだけ
  • 大容量ファイルでも安定して動作
if (!response.body) {
  throw new Error('レスポンスボディが存在しません');
}

await response.body                      
  .pipeThrough(new TextDecoderStream())  
  .pipeThrough(new TransformStream({
    transform(chunk, controller) {
      const processed = processChunk(chunk); 
      controller.enqueue(processed);          
      
    }
  }))
  .pipeTo(writer);                         

低遅延の実現

  • 最初のチャンクが届いた瞬間から処理・表示開始
  • ユーザー体験の大幅な向上
  • プログレッシブレンダリングが可能
if (!response.body) {
  throw new Error('レスポンスボディが存在しません');
}

await response.body                        
  .pipeThrough(decoder)                    
  .pipeTo(new WritableStream({
    write(chunk) {
      displayChunk(chunk);                 
    }
  }));

内部キューと背圧制御

ここから、Streams APIの重要な機能である背圧制御について説明します。

背圧制御とは

背圧制御とは、データの生産速度と消費速度の差を自動的に調整する仕組みです。

例えば、データの生成が速すぎてメモリが溢れないように、また、処理が遅すぎて待機時間が発生しないように、自動的にバランスを取ります。

内部キューの仕組み

各ストリームは、内部にキュー(待ち行列を持っています。
このキューには、highWaterMarkという最大容量が設定されています。

例えば、highWaterMark: 5の場合、最大5個のチャンクを保持できます。

データを生成するReadableStreamと、データを消費するWritableStreamの速度が異なる場合、
このキューが満杯になったり、空になったりします。

背圧制御は、この状態を自動的に検知して、データフローを調整しています。

new ReadableStream({
  
}, {
  highWaterMark: 5  
});

背圧制御の動作パターン

具体的な動作を3つのケースで見てみます。

通常の状態

  • 処理がスムーズ
  • キューに余裕がある
  • 背圧なし
ReadableStream → [キュー: □□□__] → WritableStream
                   (3/5個使用)        ↓
                                   高速処理

処理が遅い

  • キューが満杯になる
  • 背圧発生 → ReadableStreamに停止信号
  • ReadableStreamは一時停止
ReadableStream → [キュー: ■■■■■] → WritableStream
    ↑              (5/5個満杯)        ↓
  停止信号                         低速処理

処理が追いつく

  • キューに空きができる
  • 背圧解除 → ReadableStreamに再開信号
  • ReadableStreamが再開
ReadableStream → [キュー: ■■___] → WritableStream
    ↑              (2/5個使用)        ↓
  再開信号                         処理完了

このように、自動的にペースを調整することで、メモリに溜まるデータ量が制限されます。

desiredSizeプロパティの役割

desiredSizeプロパティは、キューの残り容量を示す重要な指標です。

desiredSizeの計算式

desiredSize = highWaterMark - queueTotalSize





desiredSize =  5  
desiredSize =  2  
desiredSize =  0  
desiredSize = -1  

この値によって、次のセクションで説明するpull()メソッドの呼び出しが制御されます。

pull()とenqueue()による自動調整

背圧制御の核心は、pull()メソッドとdesiredSizeの連携にあります。

仕組みの詳細

  • ReadableStreampull()下流のキューに余裕があるとき (desiredSize > 0) に自動的に呼ばれる
  • controller.enqueue(chunk) でチャンクを追加すると、desiredSize が減少
  • desiredSize になると、pull() は自動的に停止
  • 上流はこれ以上チャンクを作らないので、メモリ使用量が抑えられる
  • 下流の処理が進み、キューに余裕が出るとpull() が再び呼ばれる

つまり、手動でペース調整する必要がなく、
システムが自動的にメモリ使用量を制限してくれます。

実装例

const stream = new ReadableStream({
  
  start(controller) {
    console.log('ストリーム開始');
  },

  
  pull(controller) {
    const chunk = generateData();

    
    controller.enqueue(chunk);

    
    

試してみる

Metropolitan Museum of Art APIから約50万件のオブジェクトID(約5MB)を取得する例で、
実際に試してみました。

従来の方法の場合

type ApiResponse = {
  total: number;
  objectIDs: number[];
};

const fetchAllIds = async (): Promisenumber[]> => {
  const response = await fetch('https://collectionapi.metmuseum.org/public/collection/v1/objects');

  
  const data: ApiResponse = await response.json();

  return data.objectIDs; 
};


const ids = await fetchAllIds(); 
console.log(`取得完了: ${ids.length}件`);

問題点:
– ダウンロード完了まで待機時間が約20秒間程度
– 全データ(約5MB)がメモリに保持される
– 初回表示までの時間が長い

Streams APIを使用した場合

Streams APIでは、データをチャンクごとに処理します。

CustomReadableStream

Fetch APIのレスポンスをReadableStreamとして扱うカスタムクラス。

class CustomReadableStream extends ReadableStreamUint8Array> {
  constructor(url: string) {
    let reader: ReadableStreamDefaultReaderUint8Array> | null = null;

    super({
      
      async start(controller) {
        try {
          
          const response = await fetch(url);
          if (!response.ok) {
            throw new Error(`HTTP Error: ${response.status}`);
          }
          if (!response.body) {
            throw new Error("No response body");
          }
          
          reader = response.body.getReader();
        } catch (error) {
          controller.error(error);
        }
      },

      
      async pull(controller) {
        if (!reader) {
          controller.error(new Error("Reader not initialized"));
          return;
        }

        try {
          
          const { done, value } = await reader.read();
          if (done) {
            
            controller.close();
          } else {
            
            controller.enqueue(value);
          }
        } catch (error) {
          controller.error(error);
        }
      },

      
      async cancel() {
        if (reader) {
          await reader.cancel();
          reader = null;
        }
      },
    });
  }
}

CustomTransformStream

受け取った文字列データを加工して、必要なIDだけを取り出す変換用ストリームのクラス。

class CustomTransformStream extends TransformStreamstring, number> {
  constructor() {
    let buffer = "";
    let isInsideArray = false;

    super({
      


      start() {
        console.log('Transform開始');
      },

      



      transform(chunk, controller) {
        
        buffer += chunk;

        
        if (!isInsideArray) {
          const arrayStartIndex = findArrayStart(buffer);
          if (arrayStartIndex !== null) {
            isInsideArray = true;
            
            buffer = buffer.substring(arrayStartIndex);
          }
        }

        
        if (isInsideArray) {
          
          const ids = extractIds(buffer);

          
          for (const id of ids) {
            controller.enqueue(id);
          }

          
          buffer = cleanBuffer(buffer);
        }
      },

      



      flush(controller) {
        const ids = extractIds(buffer);
        for (const id of ids) {
          controller.enqueue(id);
        }
        console.log('Transform完了');
      },
    });
  }
}

CustomWritableStream

変換されたIDを受け取り、UIにテキストとして表示する書き込み用ストリームのクラス。

class CustomWritableStream extends WritableStreamnumber> {
  constructor(
    onWrite?: (id: number) => void,
    onComplete?: () => void,
    onError?: (error: any) => void,
  ) {
    super({
      


      async write(id) {
        
        if (onWrite) {
          onWrite(id);
        }
      },

      


      close() {
        console.log("処理完了");
        if (onComplete) {
          onComplete();
        }
      },

      


      abort(reason) {
        console.error("エラー:", reason);
        if (onError) {
          onError(reason);
        }
      },
    });
  }
}

パイプラインでの使用例

const readable = new CustomReadableStream(MET_API_URL);  
const decoder = new TextDecoderStream();                 
const transformer = new CustomTransformStream();         
const writable = new CustomWritableStream(
  (id) => console.log(`ID受信: ${id}`),         
  () => console.log('処理完了'),                 
  (error) => console.error('エラー:', error)    
);


try {
  await readable
    .pipeThrough(decoder)      
    .pipeThrough(transformer)  
    .pipeTo(writable);         
} catch (error) {
  console.error('ストリーム処理エラー:', error);
  
}

測定結果

※ 環境によって結果は異なります

項目 従来の方法 Streams API
初回表示 約20秒待機 即座(数十ms)
表示方法 一括表示 逐次表示
メモリ使用量 約5MB増加 一定(チャンクサイズ程度)

Streams APIが適している場面

大容量データの処理リアルタイム性が求められる処理 の2つで利用されることが多いようです。
例:リアルタイムデータ処理、大容量ファイルのダウンロード・アップロードなど

まとめ

本日は、Streams API の基本について解説しました。
今後は、実装の選択肢のひとつとして適した場面があれば、積極的に活用していきたいと思います。

特に、今回紹介した背圧制御の仕組みを理解することで、なぜメモリ効率が良いのか、なぜ安定して動作するのかが理解できました。


ご清聴ありがとうございました!

この技術に興味を持っていただけた方、一緒に開発してみたい方は、ぜひカジュアルにお話しさせてください。

iimon採用サイト / Wantedly / Green




元の記事を確認する

関連記事