AWS Lambda + Bedrock + Athena で S3 Tables (Iceberg) に自然言語でクエリするMCPサーバーを構築してみた

はじめに

全社データ技術局データインテグレーションチームに所属している與田龍人です。

Amazon S3 Tables を利用して Iceberg 形式でデータを管理すると、Iceberg テーブルの自動コンパクションやテーブル単位の権限制御が可能になります。これにより、従来の S3 バケット運用に比べてクエリ性能とデータガバナンスの両立が容易になります。

そこで今回は、Claude から自然言語で質問を送ると、自動で対応する SQL クエリを生成し、Athena がそのクエリを実行して結果を JSON と要約付きで返す仕組みを構築します。

Lambda 関数は MCP(Model Context Protocol)サーバーとして動作し、Bedrock の Guardrails や監査ログを経由しながら、Claude(ブラウザ版)から test_connectiontext_to_sqlexecute_queryfetch_query_resultsといったツールを呼び出せるようになっています。

アーキテクチャと狙い

具体的なシステムは以下の流れを想定しています。

  1. Claude から質問送信

  2. SQL クエリ自動生成

  3. Athena でクエリ実行

  4. 結果と要約を返却

Lambda は MCP サーバーとして、Bedrock の Guardrails や監査ログを経由しつつ、ベタ書き SQL を意識せず Iceberg テーブルを操作可能です。質問文・生成 SQL・実行結果はすべて監査ログに残るため、データガバナンスや性能改善に活用できます。

s3tables-mcp-architecture

構築の手順について

前提条件

構築には以下の前提条件を満たしていることを想定しています。

Amazon S3 Tables / Iceberg テーブル

aws_analytics_services_s3tables

出典 : Amazon S3 Tables integration with AWS analytics services overview

Amazon Athena

    • S3 Tables の Iceberg テーブルを Athena からクエリできる状態であること。

    • Athena カタログ(例: AwsDataCatalog)やワークグループ(例: primary)が設定済みであること。

AWS アカウント / 権限

IAM ロールを整える

Lambda が Bedrock と Athena を呼べるように、実行ロールを作成します。まずは信頼ポリシー付きのロールを作成しました。

aws iam create-role     --role-name text2sql-lambda-execution-role     --assume-role-policy-document '{
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "lambda.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }'

検証ではAmazonAthena、AmazonS3、AWSGlueServiceRoleなどフルアクセス系のポリシーをまとめて付与しました。本番では最小権限に削ることがベストです。

aws iam attach-role-policy --role-name text2sql-lambda-execution-role --policy-arn arn:aws:iam::aws:policy/AmazonAthenaFullAccess
aws iam attach-role-policy --role-name text2sql-lambda-execution-role --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
aws iam attach-role-policy --role-name text2sql-lambda-execution-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
aws iam attach-role-policy --role-name text2sql-lambda-execution-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

Bedrock と Lake Formation へのアクセスはカスタムポリシーで許可しました。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [ "bedrock:InvokeModel" ],
      "Resource": "arn:aws:bedrock:us-west-2::foundation-model/anthropic.claude-3-5-sonnet-20240620-v1:0"
    },
    {
      "Effect": "Allow",
      "Action": [ "lakeformation:GetDataAccess" ],
      "Resource": "*"
    }
  ]
}
aws iam put-role-policy     --role-name text2sql-lambda-execution-role     --policy-name LambdaBedrock-S3TablesPolicy     --policy-document file://lambda-custom-policy.json

Lake Formation で Iceberg の権限を発行

Iceberg テーブルは IAM だけではアクセスできないので、Lake Formation でも DESCRIBE / SELECT を付与します。

aws lakeformation grant-permissions   --principal DataLakePrincipalIdentifier=arn:aws:iam::123456789012:role/text2sql-lambda-execution-role   --permissions "DESCRIBE"   --resource '{"Database": {"CatalogId": "123456789012", "Name": "data_catalog_link"}}'
aws lakeformation grant-permissions   --principal DataLakePrincipalIdentifier=arn:aws:iam::123456789012:role/text2sql-lambda-execution-role   --permissions "DESCRIBE"   --resource '{"Database": {"CatalogId": "123456789012:s3tablescatalog/demo-bucket", "Name": "analytics_namespace"}}'
aws lakeformation grant-permissions   --principal DataLakePrincipalIdentifier=arn:aws:iam::123456789012:role/text2sql-lambda-execution-role   --permissions "SELECT" "DESCRIBE"   --resource '{"Table": {"CatalogId": "123456789012:s3tablescatalog/demo-bucket", "DatabaseName": "analytics_namespace", "Name": "slack_messages"}}'

テーブルにテストデータを追加

今回の例ではSlackでのメッセージデータを想定して、締切を 10 月 20 日とするメッセージを 1 レコード Iceberg テーブルに入れておきました。

INSERT INTO "AwsDataCatalog"."MCP_DEMO_DB"."slack_messages"
VALUES (
  'MSG-20251008-001',
  '20251008103015999',
  'message_posted',
  'C1234567890',
  'official-announcements',
  'U000BOSS001',
  '各位、S3Tables MCP の初回リリースは 10月20日 が締切です。遅延厳禁でお願いします。',
  'T1234567890',
  'EVT-20251008-001',
  current_timestamp,
  date(current_timestamp)
);

Lambda コードについて

lambda_function.py では以下のコードを記述しました。詳細はコード内に記載しています。MCPLambdaHandler を使うので、後で依存ライブラリを zip にまとめてアップロードします。

from __future__ import annotations
import json
import os
import time
from typing import Any, Dict, List, Optional
import boto3

# グローバルクライアント: ウォームスタート時に接続を再利用するために生成
AWS_REGION = os.environ.get("AWS_REGION", "us-west-2")
bedrock_runtime = boto3.client("bedrock-runtime", region_name=AWS_REGION)
athena_client = boto3.client("athena", region_name=AWS_REGION)
s3_client = boto3.client("s3", region_name=AWS_REGION)

# Lambda変数。※本番では環境変数に置き換えてください。
DEFAULT_DATABASE = os.environ.get("DATABASE_NAME", "MCP_DEMO_DB")
DEFAULT_TABLE = os.environ.get("TABLE_NAME", "MCP_DEMO_TABLE")
DEFAULT_CATALOG = os.environ.get("ATHENA_CATALOG", "AwsDataCatalog")
DEFAULT_WORKGROUP = os.environ.get("ATHENA_WORKGROUP", "primary")
DEFAULT_OUTPUT_LOCATION = os.environ.get(
    "ATHENA_OUTPUT_LOCATION", "s3://mcp-demo-athena-results/"
)
DEFAULT_BEDROCK_MODEL_ID = os.environ.get(
    "BEDROCK_MODEL_ID", "anthropic.claude-3-5-sonnet-20240620-v1:0"
)

# MCP ハンドラーの初期化。
try:
    from awslabs.mcp_lambda_handler import MCPLambdaHandler
except ImportError:
    class MCPLambdaHandler:  # type: ignore
        def __init__(self, name: str, version: str) -> None:
            self.name = name
            self.version = version
            self.tools: Dict[str, Any] = {}
        def tool(self):
            def decorator(func):
                self.tools[func.__name__] = func
                return func
            return decorator
        def handle_request(self, event: Dict[str, Any], context: Any) -> Dict[str, Any]:
            return {
                "statusCode": 500,
                "body": json.dumps(
                    {"error": "MCP runtime unavailable - deploy inside AWS Lambda."}
                ),
            }
def _get_mcp_handler() -> MCPLambdaHandler:
    return MCPLambdaHandler(name="bedrock-athena-s3tables", version="1.0.0")
mcp = _get_mcp_handler()

# Athena / Bedrock で使う補助関数
def _normalise_catalog(catalog: Optional[str]) -> str:
    return catalog or DEFAULT_CATALOG
def _normalise_database(database: Optional[str]) -> str:
    return database or DEFAULT_DATABASE
def _normalise_table(table: Optional[str]) -> str:
    return table or DEFAULT_TABLE
def _build_prompt(nl_query: str, catalog: str, database: str, table: str) -> str:
    """Bedrock Claude に渡す Text-to-SQL プロンプトを組み立てる。"""
    return (
        "You are an expert SQL assistant for Amazon Athena. "
        "Generate only a SQL query (no commentary) that answers the user request.\n\n"
        "Table information:\n"
        f"- Catalog: {catalog}\n"
        f"- Database: {database}\n"
        f"- Table: {table}\n"
        '- Fully qualified form: "{catalog}"."{database}"."{table}"\n'
        "Columns include: message_id, message_ts, event_type, channel, channel_name, "
        "user, text, team_id, event_id, event_time, processed_date.\n\n"
        "Requirements:\n"
        "1. Output only valid Athena SQL.\n"
        "2. Always use the fully qualified 3-part identifier.\n"
        "3. Use DATE/TIMESTAMP helpers when filtering temporal fields.\n\n"
        f"Natural language request: {nl_query}\n\nSQL query:"
    ).format(catalog=catalog, database=database, table=table)
def _invoke_bedrock(prompt: str) -> str:
    """Bedrock Claude を呼び出し、不要な Markdown を取り除いて SQL を返す。"""
    response = bedrock_runtime.invoke_model(
        modelId=DEFAULT_BEDROCK_MODEL_ID,
        body=json.dumps(
            {
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 800,
                "temperature": 0,
                "messages": [{"role": "user", "content": prompt}],
            }
        ),
    )
    body = json.loads(response["body"].read())
    text = body["content"][0]["text"].strip()
    if text.startswith("```sql"):
        text = text[6:]
    if text.endswith("```"):
        text = text[:-3]
    return text.strip()
def _start_athena_query(sql: str, catalog: str, database: str) -> str:
    """Athena クエリを開始し、実行 ID を返す。"""
    execution = athena_client.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={"Catalog": catalog, "Database": database},
        ResultConfiguration={"OutputLocation": DEFAULT_OUTPUT_LOCATION},
        WorkGroup=DEFAULT_WORKGROUP,
    )
    return execution["QueryExecutionId"]
def _wait_for_query(query_execution_id: str, timeout: int = 60) -> str:
    """Athena のクエリ完了をポーリングで待つ。タイムアウト時は例外を送出。"""
    start = time.time()
    while time.time() - start  List[Dict[str, Any]]:
    """Athena の結果セットを全件取得して返す。"""
    columns: List[str] = []
    next_token: Optional[str] = None
    rows: List[Dict[str, Any]] = []
    while True:
        params: Dict[str, Any] = {
            "QueryExecutionId": query_execution_id,
            "MaxResults": 1000,
        }
        if next_token:
            params["NextToken"] = next_token
        response = athena_client.get_query_results(**params)
        if not columns:
            metadata = response["ResultSet"]["ResultSetMetadata"]["ColumnInfo"]
            columns = [col["Label"] for col in metadata]
        result_rows = response["ResultSet"].get("Rows", [])
        start_idx = 1 if not next_token else 0  # 最初のバッチのみヘッダをスキップ
        for row in result_rows[start_idx:]:
            data = row.get("Data", [])
            rows.append(
                {
                    columns[i] if i  str:
    """結果を要約するために Bedrock Claude を再利用する。"""
    if not results:
        return "該当するレコードは見つかりませんでした。"
    preview = "\n".join(
        f"行{i+1}: {row.get('text', '')}" for i, row in enumerate(results[:10])
    )
    # プロンプトによる出力制御
    prompt = (
        "以下の Athena 結果をもとに、ユーザーの質問に日本語で回答してください。\n\n"
        f"ユーザーの質問: {nl_query}\n\n"
        "結果プレビュー:\n"
        f"{preview}\n\n"
        "出力条件:\n"
        "- 重要ポイントを箇条書きで示す\n"
        "- 固有名詞があれば残す\n"
        "- 200文字以内"
    )
    try:
        response = bedrock_runtime.invoke_model(
            modelId=DEFAULT_BEDROCK_MODEL_ID,
            body=json.dumps(
                {
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 400,
                    "temperature": 0.1,
                    "messages": [{"role": "user", "content": prompt}],
                }
            ),
        )
        body = json.loads(response["body"].read())
        return body["content"][0]["text"].strip()
    except Exception:
        snippet = json.dumps(results[0], ensure_ascii=False)[:180]
        return f"回答生成に失敗したため、最初のレコードを返します: {snippet}"

# MCP ツール定義
@mcp.tool()
def test_connection() -> str:
    """環境情報とクエリ実行時の注意点を返す。"""
    return "\n".join(
        [
            "Bedrock + Athena MCP Lambda is ready!",
            "",
            "Configuration:",
            f"  - AWS_REGION: {AWS_REGION}",
            f"  - CATALOG: {DEFAULT_CATALOG}",
            f"  - DATABASE: {DEFAULT_DATABASE}",
            f"  - TABLE: {DEFAULT_TABLE}",
            f"  - WORKGROUP: {DEFAULT_WORKGROUP}",
            f"  - ATHENA_OUTPUT_LOCATION: {DEFAULT_OUTPUT_LOCATION}",
            f"  - BEDROCK_MODEL_ID: {DEFAULT_BEDROCK_MODEL_ID}",
            "",
            "Query tips:",
            "  - 先頭で `USE CATALOG AwsDataCatalog;` や `USE MCP_DEMO_DB;` を宣言すると安全です。",
            "  - Iceberg テーブルは \"Catalog\".\"Database\".\"Table\" 形式の引用符が必須です。",
            "  - 自然文は text_to_sql で SQL を確認してから execute_query を実行すると安全です。",
        ]
    )
@mcp.tool()
def text_to_sql(
    natural_language_query: str,
    catalog: Optional[str] = None,
    database: Optional[str] = None,
    table: Optional[str] = None,
) -> str:
    """自然言語を Athena の SQL に変換して JSON で返す。"""
    if not natural_language_query:
        raise ValueError("natural_language_query is required")
    catalog = _normalise_catalog(catalog)
    database = _normalise_database(database)
    table = _normalise_table(table)
    sql = _invoke_bedrock(_build_prompt(natural_language_query, catalog, database, table))
    return json.dumps(
        {
            "success": True,
            "catalog": catalog,
            "database": database,
            "table": table,
            "sql": sql,
        },
        indent=2,
    )
@mcp.tool()
def execute_query(
    natural_language_query: Optional[str] = None,
    sql_query: Optional[str] = None,
    catalog: Optional[str] = None,
    database: Optional[str] = None,
    table: Optional[str] = None,
) -> str:
    """自然言語または SQL を受け取り、Athena 実行から要約まで行う。"""
    if not natural_language_query and not sql_query:
        raise ValueError("Either natural_language_query or sql_query must be provided")
    catalog = _normalise_catalog(catalog)
    database = _normalise_database(database)
    table = _normalise_table(table)
    sql = sql_query or _invoke_bedrock(
        _build_prompt(natural_language_query or "", catalog, database, table)
    )
    execution_id = _start_athena_query(sql, catalog, database)
    status = _wait_for_query(execution_id)
    if status != "SUCCEEDED":
        reason = athena_client.get_query_execution(QueryExecutionId=execution_id)[
            "QueryExecution"
        ]["Status"].get("StateChangeReason", "Unknown")
        raise RuntimeError(f"Athena query failed ({status}): {reason}")
    rows = _collect_athena_rows(execution_id)
    summary = _summarise_results(natural_language_query or sql, rows)
    return json.dumps(
        {
            "success": True,
            "catalog": catalog,
            "database": database,
            "table": table,
            "sql": sql,
            "query_execution_id": execution_id,
            "row_count": len(rows),
            "rows": rows,
            "summary": summary,
        },
        indent=2,
        ensure_ascii=False,
    )
@mcp.tool()
def fetch_query_results(query_execution_id: str) -> str:
    """Athena の過去実行結果を再取得して返す。"""
    if not query_execution_id:
        raise ValueError("query_execution_id is required")
    status = athena_client.get_query_execution(QueryExecutionId=query_execution_id)[
        "QueryExecution"
    ]["Status"]["State"]
    if status != "SUCCEEDED":
        raise RuntimeError(f"Query execution {query_execution_id} is not successful: {status}")
    rows = _collect_athena_rows(query_execution_id)
    return json.dumps(
        {
            "success": True,
            "row_count": len(rows),
            "rows": rows,
        },
        indent=2,
        ensure_ascii=False,
    )

# Lambda エントリポイント
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """MCP の JSON-RPC リクエストを routing するエントリポイント。"""
    if "requestContext" in event:
        raw_body = event.get("body", "{}")
        try:
            payload = json.loads(raw_body) if isinstance(raw_body, str) else raw_body
        except json.JSONDecodeError:
            payload = {}
    else:
        payload = event
    if isinstance(payload, dict) and payload.get("method"):
        return mcp.handle_request(event, context)
    raise ValueError("Unsupported request format - MCP JSON-RPC expected")

依存ライブラリを zip にまとめてアップロード

awslabs.mcp_lambda_handler が必要なので、python3 -m pip install で依存をディレクトリに展開し、lambda_function.pyと共にアップロードしました。今回は依存ライブラリが軽量のためレイヤーは使用せずzipでまとめています。

mkdir -p package
cp lambda_function.py package/
python3 -m pip install awslabs-mcp-lambda-handler -t package/
python3 -m pip freeze > package/requirements.txt 
(cd package && zip -r ../lambda-function-mcp.zip .)

Function URL を有効化する

MCP から接続するため、Function URL を作成します。
今回はAPI Key 認証を Lambda 内で実装していますが、本番環境では AWS IAM 認証や API Gateway + Authorizer(Cognito / OIDC 等)の利用を推奨します。

Function URL を作成

認証なし設定(--auth-type NONE)ですが、Lambda 内で API Key 認証を実装しています。

aws lambda create-function-url-config \
  --function-name bedrock-athena-s3tables \
  --auth-type NONE \
  --cors '{"AllowOrigins":["*"],"AllowMethods":["POST"],"AllowHeaders":["content-type"]}' \
  --region us-west-2

パブリックアクセス権限を追加

aws lambda add-permission \
  --function-name bedrock-athena-s3tables \
  --statement-id function-url-public-access \
  --action lambda:InvokeFunctionUrl \
  --principal '*' \
  --function-url-auth-type NONE \
  --region us-west-2

API Key 認証の実装(Lambda 内)

Lambda 関数内で以下のように API Key 認証を実装します。

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    # API Key認証
    query_params = event.get("queryStringParameters", {}) or {}
    api_key = query_params.get("key") or query_params.get("api_key")
    expected_key = os.environ.get("MCP_API_KEY")

    if not expected_key or api_key != expected_key:
        return {
            "statusCode": 401,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Invalid or missing API key"})
        }

環境変数に MCP_API_KEY を設定

aws lambda update-function-configuration \
  --function-name bedrock-athena-s3tables \
  --environment Variables="{"MCP_API_KEY":"your-secure-api-key-here"}" \
  --region us-west-2

動作確認

Function URL が発行されたら、API Key を付けてステータス 200 が返るか確認します。

API Key あり(200 正常動作):

curl -s -X POST "https://xxxx.lambda-url.us-west-2.on.aws/?api_key=your-secure-api-key-here" \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","id":"health","method":"initialize","params":{}}'

Claude でツールを試す

Claude の UI 左下のアイコン → 設定 → コネクタ → カスタムコネクタを追加 をクリックし、以下を入力します。

  • コネクタ名: s3tables-lambda-mcp

  • リモート MCP サーバー URL:
    https://xxxx.lambda-url.us-west-2.on.aws/?api_key=your-secure-api-key-here

チャットの設定に s3tables-lambda-mcp が表示されます。

「s3tablesに関するMCPプロジェクトの締切がいつまでか分かりますか?」と尋ねると、メッセージを拾って初回リリースが 10 月 20 日だと要約してくれました。

おわりに

これで、Claude から Athena を経由して S3 Tables に自然言語でアクセスできるサーバーレス MCP 環境が完成しました。MCP サーバーを通じて自然文でデータにアクセスできるため、社内のちょっとした「締切いつ?」のような問い合わせにも素早く応答できるようになりました。

今後は、execute_query にクエリバリデーションを追加して安全性を高めたり、SSO や権限管理を導入して Lambda や S3 Tables へのアクセスを厳密に制御してみたいと思います。今回構築した環境をベースに、社内外向けの自然言語検索やデータ活用の参考として役に立てれば幸いです。

参考

AWS MCP Lambda Handler GitHub

Amazon S3 Tables とテーブルバケットの使用




元の記事を確認する

関連記事