Microsoft Agent Frameworkのワークフローを見てみる

Agent Frameworkとは

Microsoft Agent Frameworkは、AIエージェントとマルチエージェント・ワークフローを構築するためのオープンソースSDK/ランタイムです。
もともと、Microsoft製のAIエージェントフレームワークにはSemantic KernelとAutoGenの2つが存在していましたが、これらの長所を統合する新たなフレームワークとしてリリースされました。

今回は、業務プロセスにAIエージェントを組み込んで自動化した処理を定義することのできるWorkflowsという機能を中心にみていきます。

https://zenn.dev/headwaters/articles/2d8222bf2214ad

Workflowsを見てみる

主要なコンポーネントや機能の実装例を紹介します。(Python)

ワークフローの基本概念

有向グラフ構造アーキテクチャにより、柔軟なワークフローを直感的に定義できます。

ワークフローの構築には以下のコンポーネントを組み合わせていきます。

  • Executor:ワークフロー内の個々の処理単位を表します。 AI エージェントまたはカスタム ロジック コンポーネントを指定できます。 入力メッセージを受信し、特定のタスクを実行し、出力メッセージを生成します。
  • Edge:Executor 間の接続を定義し、メッセージのフローを決定します。
  • Workflows:Executor とEdgeで構成される有向グラフです。 これらは、最初の Executor から始まり、エッジで定義された条件とロジックに基づいてさまざまなパスを進むプロセス全体を定義する

Executor

グラフ構造のノード(四角い部分)にあたります。処理を実行する部分です。

Executorは、Executor基底クラスを継承して作成します。@handlerデコレーターで修飾されたメソッドがExecutorで実行されます。

from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class UpperCase(Executor):

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """入力された文字列を大文字に変換し、次のノードに転送します。"""
        
        await ctx.send_message(text.upper())
  • ワークフローは、Messageというオブジェクトでノード間のデータの受け渡しやルーティングの管理をします。

  • ハンドラーは、ctx.send_message()を実行することで、そのExecutorの出力としてMessageを送信し、次のExecutorに連携します。(WorkflowContextは、ハンドラーがワークフローと対話するためのメソッドを提供する役割を持ちます。)

  • ハンドラーの第一引数では、前Executorから送信されるMessageデータを受け取ります。(型を一致させる必要があります)

  • 第二引数のWorkflowContext は、このハンドラーが出力する型でパラメータ化されています。

  • ここでは WorkflowContext[str] となっているため、下流のノードは str 型を受け取ることを想定しています。


また、@executorデコレーターを使用して、関数からExecutorを作成することもできます。

@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(text.upper())

Edge

グラフ構造の接続を表すエッジ(線)にあたります。
Executor間のメッセージフローを定義し、データフロー パスを決定します。

エッジはワークフロー定義時に設定するため、ワークフローの構築方法も一緒に見ていきましょう。

ワークフローの構築

ワークフローの構築にはWorkflowBuilderクラスを使用します。

from agent_framework import WorkflowBuilder


processor = DataProcessor()
validator = Validator()
formatter = Formatter()


builder = WorkflowBuilder()
builder.set_start_executor(processor)  
builder.add_edge(processor, validator) 
builder.add_edge(validator, formatter)
workflow = builder.build()

この例では、processor → validator → formatter というダイレクトエッジ(単純な1対1の接続)からなるワークフローを構築しています。

Edgeの種類

その他のエッジパターンは以下の通りです。

特定の条件が満たされた場合にのみアクティブにする
from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.add_edge(spam_detector, email_processor, condition=lambda result: isinstance(result, SpamResult) and not result.is_spam)
builder.add_edge(spam_detector, spam_handler, condition=lambda result: isinstance(result, SpamResult) and result.is_spam)
builder.set_start_executor(spam_detector)
workflow = builder.build()
条件に基づいて異なる Executor にメッセージをルーティングする
from agent_framework import (
   Case,
   Default,
   WorkflowBuilder,
)

builder = WorkflowBuilder()
builder.set_start_executor(router_executor)
builder.add_switch_case_edge_group(
   router_executor,
   [
       Case(
           condition=lambda message: message.priority  Priority.NORMAL,
           target=executor_a,
       ),
       Case(
           condition=lambda message: message.priority  Priority.HIGH,
           target=executor_b,
       ),
       Default(target=executor_c)
   ],
)
workflow = builder.build()
  • ファンアウト エッジ:
1 つの Executor から複数のターゲットにメッセージを配布する
from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(splitter_executor)
builder.add_fan_out_edges(splitter_executor, [worker1, worker2, worker3])
workflow = builder.build()


builder = WorkflowBuilder()
builder.set_start_executor(splitter_executor)
builder.add_fan_out_edges(
   splitter_executor,
   [worker1, worker2, worker3],
   selection_func=lambda message, target_ids: (
       [0] if message.priority == Priority.HIGH else
       [1, 2] if message.priority == Priority.NORMAL else
       list(range(target_count))
   )
)
workflow = builder.build()
  • ファンイン エッジ
複数のソースから 1 つのターゲットにメッセージを収集する
builder.add_fan_in_edge([worker1, worker2, worker3], aggregator_executor)

ワークフローの実行

ワークフロー実行イベント(後述)を非同期で受け取るストリーミング実行と、実行が完了してからまとめて受け取る非ストリーミング実行の両方がサポートされています。

from agent_framework import WorkflowCompletedEvent


async for event in workflow.run_stream(input_message):
    if isinstance(event, WorkflowCompletedEvent):
        print(f"Workflow completed: {event.data}")


events = await workflow.run(input_message)
print(f"Final result: {events.get_completed_event()}")

Event

イベントは、ワークフローの実行のレスポンスとして出力され、実行中の状態や実行結果を監視できます。

組み込みイベント


WorkflowStartedEvent    
WorkflowOutputEvent     
WorkflowErrorEvent      


ExecutorInvokeEvent     
ExecutorCompleteEvent   


RequestInfoEvent        

カスタムイベント

カスタムイベントを定義して出力することで、監視を強化できます。


@dataclass
class ExecutorWarningEvent(WorkflowEvent):
    """カスタムイベント:警告を通知"""
    executor_id: str
    message: str
    severity: str  

@executor(id="validator")
async def validate_input(data: dict, ctx: WorkflowContext[dict]) -> None:
    if not data.get("required_field"):
        
        await ctx.add_event(ExecutorWarningEvent(
            executor_id="validator",
            message="必須フィールドが不足しています",
            severity="warning"
        ))
        
        data["required_field"] = "default"
    
    await ctx.send_message(data)

ハンドラーでctx.add_event()を実行することでイベントを出力できます。


ExecutorにAIエージェントを使用する

Microsoft Agent Frameworkでは、ワークフローの実行にAIエージェントを統合するための複数のオプションが用意されています。

AgentExecutor

AgentExecutorオブジェクトを使用して、エージェントの実行をラップしたExecutorを作成できます。

from agent_framework.azure import AzureOpenAIChatClient
from agent_framework import AgentExecutor


agent = AzureOpenAIChatClient(
    endpoint=AZURE_OPENAI_ENDPOINT,
    deployment_name="gpt-5",
    api_key=API_KEY,
).create_agent(
    instructions="ユーザーの質問に回答してください。",
    name="Your Assistant",
)

agent_executor = AgentExecutor(agent)

この場合、Executorが受信/送信するMessageがエージェントの入力/出力となります。

また、単にエッジにエージェントを指定するだけもAgentExecutorとしてワークフローを構築できます。便利。

writer_agent = AzureOpenAIChatClient(
・・・
)
reviewer_agent = AzureOpenAIChatClient(
・・・
)


builder = WorkflowBuilder()
builder.set_start_executor(writer_agent) 
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()

カスタム Executorの作成

通常のExecutor定義にAIエージェントの実行を組み込むことで、エージェントの実行をカスタマイズしたり、その他の処理と統合したりなどの細かい制御ができます。

from agent_framework import (
    ChatAgent,
    ChatMessage,
    Executor,
    WorkflowContext,
    handler
)

class Writer(Executor):

    agent: ChatAgent

    def __init__(self, chat_client: AzureChatClient, id: str = "writer"):
        
        agent = chat_client.create_agent(
            instructions=(
                "あなたは優れたコンテンツライターです。新しいコンテンツを作成し、フィードバックに基づいて編集を行います。"
            ),
        )
        
        super().__init__(agent=agent, id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
        """1つのチャットメッセージを処理し、蓄積されたメッセージをワークフロー内の次の executor に転送します。"""
        
        messages: list[ChatMessage] = [message]
        response = await self.agent.run(messages)
        
        messages.extend(response.messages)
        await ctx.send_message(messages)


Shared State

Shared Stateは、ワークフロー全体で共有するKey-Valueストレージです。
tx.send_message()では隣接したExecutor同士でのデータ連携しかできないのに対して、SharedStateではすべての WorkflowContext が同じ SharedState を参照しており、ctx.set_shared_state() / ctx.get_shared_state() でワークフロー横断の情報を保管・取得できます。これにより、特定の Executor が計算した結果やフラグを別の Executor が参照する、といった連携が可能です。
複数のエージェントでコンテキストを共有したい場合にも有効だと思いました。

Shared Stateへの書込み

class FileReadExecutor(Executor):

    @handler
    async def handle(self, file_path: str, ctx: WorkflowContext[str]):
        """ファイルパスを受け取り、ファイルの内容を読み込んで、次の executor にファイル ID を転送します。"""
        
        with open(file_path, 'r') as file:
            file_content = file.read()
        
        file_id = str(uuid.uuid4())
        await ctx.set_shared_state(file_id, file_content)

        
        await ctx.send_message(file_id)

Shared Stateへのアクセス

class WordCountingExecutor(Executor):

    @handler
    async def handle(self, file_id: str, ctx: WorkflowContext[int]):
        
        file_content = await ctx.get_shared_state(file_id)
        if file_content is None:
            raise ValueError("File content state not found")

        await ctx.send_message(len(file_content.split()))

まとめ

今回はMicrosoft Agent FrameworkのWorkflowsについて、ワークフローを構築するためのコンポーネントたちを見ていきました。
他にもおもしろそうな機能があるので、次回はこれらを組み合わせたワークフローを実際に作ってみたいと思います。


元の記事を確認する

関連記事