Reproでチーフアーキテクトとして仕事をしている橋立(joker1007)です。
今回、本番環境への導入を視野に入れてApache Icebergを実際に業務データで検証しました。
この記事ではその検証内容と結果について共有したいと思います。
IcebergはOpen Table Formatと呼ばれているデータフォーマットの一つで、クラウド環境で効率よくデータ分析を行うためのデータフォーマットとして開発が進んでいます。
Icebergの様なOpen Table Formatは単一のファイルによるデータ構造ではなく、複数のファイルとそのメタデータの管理機構が合わさったフォーマットになっており、全体のファイル構造が合わさって一つのテーブルとして扱えるというのが特徴です。
以前、Reproで検証したApache HudiもOpen Table Formatの一つですが、IcebergはHudiと比べてシンプルな仕組みになっている印象です。
Open Table Format群が旧来のデータ構造に比べて優れている点の一つとして、分析用途に向いた大規模データを前提にしていながらレコード単位のUPDATEやDELETEに対応しているという点です。大量データ処理に必要なのはイベントログの様にINSERTだけに対応できていれば良いものもあれば、更新が必要なデータもあります。今回の検証で利用したユーザー単位のメタデータなどもそういった性質のデータです。
こういったデータを扱うためには、旧来はデータの投入の後に重複を排除するクエリを書きつつ、定期的にテーブルを洗い替えするメンテナンスを実施する必要がありました。それをフォーマット自体の仕組みと書き込みのための基盤でサポートしてくれるというのがOpen Table Formatのメリットです。
ちなみに、この記事ではIcebergがどういったデータ構造なのかという詳細は解説しませんので、公式ドキュメントと、内部構造について分かり易く解説してくれている記事へのリンクを参考資料として掲載しておきます。
参考資料
Reproでは大体200億件ぐらいのレコードを持つテーブルから1000万件前後を取得するワークロードと、普通にWebリクエストで1件単位で取得するワークロードが混在しています。
そして、そのデータは出来る限り短いリードタイムで利用可能になっていることが望ましいものです。(5分以内ぐらいなら許容可能)
この目的のために、ReproではCassandraをバックエンドとし、テーブルのパーティションキー及びクラスタリングキーを工夫してbucketingなどのテクニックを活用することでなんとか対応できています。
大規模な読み込みワークロードではTrinoを利用し、その他のリクエストでは普通にCassandra Clientから読み取りを行っています。
しかし、Cassandraはバルクロードに向いた構造とは言い難く、現状Trinoの並列読み込みとテーブルのbucketingにより無理矢理なんとかしているという側面があります。
現状対応は出来ているが無駄が多いし、将来的なスケーラビリティに一定の不安がある状態なので、より良い基盤に置き換えていきたいというのが実情です。
しかし、開発メンバーが運用できる基盤にも限りがあるので、できる限り運用負荷を上げないことも大事にしたいところではあります。
そういう訳で、情報を集めていたのですが、今回上記の要求を満たせそうな選択肢としてicebergに注目し検証を行いました。
検証対象として重視したのは以下の要素です。
- 弊社のデータ処理基盤の中核にあるKafkaからの書き込みが簡単であること
- Upsert処理が可能であること
- データ自体をS3に配置可能であること
- 既存基盤との組み合わせを含めて、5分以内にはデータがクエリ可能になる見込みがあること
実際のところやってみないと分からんなというのが4つ目で、Reproのデータ量及び書き込みのワークロードに対して、icebergの書き込みとcommitにかかるノードのリソースコストがどの程度になるのか、commitとcompactionがどれぐらいの実行頻度ならバランスが取れるのか。compactionにどれぐらいの負荷がかかるのか、この辺りを知るために実際のデータで検証が必要でした。
— 以下、社内向けの評価報告を手直ししたものなので文体が変わります —
現行のcassandraへの書き込みと同様にKafkaからレコードを受け取ってicebergテーブルの書き込みを行う。
コミット及びcompaction処理がReproの機能・運用面で現実的な頻度の実行で処理が可能かどうかを検証し、productionで必要になるノード数を見積る。
また、trinoでクエリをする時の簡単なクエリパターンの実験・検討を行い挙動を確認する。
レコードの内容はエンドユーザーに関するメタデータを保持するuser_id+キーバリューというシンプルな構造のレコードであり、Updateも頻繁に発生する。
同一の内容であれば処理をスキップする仕組みは既に前段に構築中であるため、本当に書き込みを必要とするスループットはそこまで高くない。大体秒間6000件程度がカバーできれば現状は対応可能。
書き込み方式
EMR-7.10を利用してFlinkでS3にデータを書き込む。
テーブル定義とメンテナンス
テーブルの管理にはAWS Glue Data Catalogを利用し、spark-sqlを利用したDDLで構築する。
Glue Data Catalogを利用するのは、Catalog管理の運用コストを削減するためと、Glueが持つIcebergのテーブル最適化の機能を利用したいため。
また、icebergは特性としていくつかの定期的なメンテナンスタスクが必要になる。特にFlinkから短いスパンで書き込みを行うと小さいサイズのparquetファイルとUpdateに対応するための削除ファイルが大量に発生するため、定期的にcompactionを行わないと読み取り効率が大きく劣化する。
そのため実行しなければならないのが以下の3つである。メンテナンスタスクは一般的にはsparkから実行する。spark-sqlを利用するのが最も簡単である。
- expire_snapshot: icebergはコミットごとにsnapshot (manifest)を作成するので、無限に蓄積しない様に一定以上古いsnapshotを削除する。
- rewrite_data_files (compaction): merge-on-readのために生成された細切れのparquetファイルやdelete情報ファイルを結合する。
- delete_orphan_files: 上記のタスクによって不要になった実体ファイルを削除する。
削除ファイルについて補足
冒頭で述べた様にIcebergはAnalytics用途に向いたデータフォーマットでありつつUpdate(Delete)をサポートしているのが特徴です。こういったデータフォーマットでUpdateをサポートするには代表的な二つの方法があります。
- Copy on Write: Update要求があった時に該当レコードを含む列指向データフォーマット(基本的にはParquet)のファイルを丸ごと置き換える
- Merge on Read : Update要求に該当するレコードの削除があったことを記録し、新しい値を持つレコードを持った新規ファイルと削除記録ファイルを作成、読み取り時にそれらを自動的にマージする
Merge on Readは頻繁なUpdateが発生する場合に向いた形式で、書き込み時には新しく小さいファイルを作るだけで済むのでFlinkなどからストリームで短いリードタイムでデータを書き込む際に相性が良い方式です。一方で小さいファイルが大量に発生すること、そして削除内容を記録したファイルが必要になるため更にファイル数が増えて読み取り時の無駄も大きくなるのがデメリットです。
Icebergでは、こういった小さいサイズのデータファイルと削除ファイルの増大を一定の範囲で収めるため、定期的にcompactionと呼ばれる処理を実行してParquetファイルをマージし処理効率を維持しなければなりません。
データ規模
全体のデータ量: 2TB程
レコード数: 200億件超
更新頻度: 5000件 / sec
bucketingの結果、バラつきはあるが大体1パーティションが平均1GBぐらいになる様にして検証。(バケットサイズ、ファイルサイズは実運用時には要調整)
spark-sqlで実行する。(実際のテーブル定義から簡易化し、また簡単のために事前の変換オペレーションなども省略してあります)
CREATE TABLE IF NOT EXISTS glue.testdb_production.test_tables ( app_id bigint NOT NULL, user_id bigint NOT NULL, key string NOT NULL, value string ) USING iceberg PARTITIONED BY (bucket(32, app_id), bucket(8, key)) TBLPROPERTIES ( 'write.object-storage.enabled'='true', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read', 'write.merge.mode'='merge-on-read', 'history.expire.max-snapshot-age-ms'='86400000' ) LOCATION 's3://repro-experimental-store/production/testdb_production/test_tables'; ALTER TABLE glue.testdb_production.test_tables WRITE ORDERED BY insight_id, key; ALTER TABLE glue.testdb_production.test_tables SET IDENTIFIER FIELDS insight_id, user_id, key;
Flinkで高頻度の書き込むことでsnapshotが大量に増えるためmax-snapshot-age-msを短く設定し、merge-on-readとobject storage向けの最適化(prefixをばらけさせることで書き込みスループットのキャップを回避する)を有効化。
その他のファイルサイズの基準値などはデフォルトを利用。
Icebergのパーティショニングについて補足
Icebergのパーティション情報は旧来のHive形式とは違ってメタデータファイルの中に記録されています。
そして、メタデータは履歴管理が可能なのでIcebergはデータファイルの配置を変更しなくても途中からパーティショニングの形式を変更することができます。
また、Bucketingのための関数やタイムスタンプから年・月・日に分解する関数を噛ませてパーティショニングすることも可能で、クエリ時にはそれを元になったカラムの抽出条件から透過的に処理してくれます。
この仕組みをhidden partitionと呼びます。
上記のテーブル定義では想定されるクエリのパターンに合わせて二つのキーでbucketingする形にしています。
EMR設定
以下のクラスタサイズ、設定内容は試行錯誤の結果落ち着いた値で、実際の作業中は動作を見つつ何度もテーブルを作り直して台数や設定値を調整しています。
書き込みクラスタサイズ: r8g.2xlarge * 3
テーブルメンテナンス用クラスタサイズ: r8g.2xlarge * 20 (メンテナンスタスク実行時のみ必要)
Config:
[ { "Classification": "iceberg-defaults", "Properties": { "iceberg.enabled": "true" } }, { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } }, { "Classification": "spark-defaults", "Properties": { "spark.driver.memory": "8g", "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.executor.memory": "44g", "spark.memory.fraction": "0.2", "spark.memory.storageFraction": "0.3", "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:MaxGCPauseMillis=1500 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1ReservePercent=15 -XX:ConcGCThreads=6", "spark.task.cpus": "8", "spark.sql.catalog.glue": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.glue.warehouse": "s3://aws-glue-repro-iceberg-test2/iceberg_test", "spark.sql.catalog.glue.type": "glue", "spark.sql.defaultCatalog": "glue", "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.glue.io-impl": "org.apache.iceberg.aws.s3.S3FileIO" } }, { "Classification": "flink-conf", "Properties": { "glue.enabled": "true", "jobmanager.memory.process.size": "2gb", "kafka.enabled": "true", "taskmanager.memory.managed.fraction": "0.3", "taskmanager.memory.process.size": "40gb", "env.java.opts.taskmanager": "-Xlog:gc -XX:+UseG1GC -XX:MaxGCPauseMillis=1000 -XX:InitiatingHeapOccupancyPercent=25 -XX:G1NewSizePercent=15" "job.autoscaler.enabled": "true", "jobmanager.scheduler": "adaptive", "job.autoscaler.stabilization.interval": "300s", "job.autoscaler.metrics.window": "600s", "job.autoscaler.decision.interval": "60s" } }, { "Classification": "trino-connector-cassandra", "Properties": { "cassandra.client.read-timeout": "5m", "cassandra.consistency-level": "LOCAL_QUORUM", "cassandra.contact-points": "", "cassandra.fetch-size": "1000", "cassandra.load-policy.dc-aware.local-dc": "ap-northeast_3az", "cassandra.load-policy.use-dc-aware": "true", "cassandra.partition-size-for-batch-select": "256", "cassandra.password": "******", "cassandra.security": "PASSWORD", "cassandra.username": "cassandra", "connector.name": "cassandra" } }, { "Classification": "trino-connector-kafka", "Properties": { "connector.name": "kafka", "kafka.confluent-schema-registry-url": " ", "kafka.nodes": " ", "kafka.table-description-supplier": "CONFLUENT" } }, { "Classification": "trino-connector-iceberg", "Properties": { "connector.name": "iceberg", "fs.native-s3.enabled": "true", "iceberg.catalog.type": "glue", "iceberg.max-partitions-per-writer": "1024", "iceberg.object-store-layout.enabled": "true" } } ]
初期構築
Flinkで書き込みを行う前に、既存のデータを元に初期データを投入し本番と同等のサイズのテーブルを構築する。
既存のデータはcassandraに蓄積されているため、これを変換してTrino経由でicebergテーブルに投入する。
INSERT INTO iceberg.testdb_production.test_tables (app_id, user_id, key, value) SELECT app_id, user_id, key, value FROM cassandra.repro.test_tables;
上記のデータ規模を投入するのにr8g.2xlarge 8台のtrinoクラスタで1時間半ぐらいの所要時間で完了する。
Flinkによる書き込み設定
confluentのschema registryを利用したKafkaトピックからのストリーム書き込みを行うため、以下の準備をする。
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.20.0/flink-sql-avro-confluent-registry-1.20.0.jar
flink-yarn-session -Dparallelism.default=2 -d
flink-sql-client -j flink-sql-avro-confluent-registry-1.20.0.jar -f insert.sql
SET state.backend.type = 'rocksdb'; SET execution.checkpointing.storage = 'filesystem'; SET execution.checkpointing.dir = 's3://repro-experimental-store/production/flink-checkpoints'; SET execution.checkpointing.savepoint-dir = 's3://repro-experimental-store/production/flink-checkpoints'; SET execution.checkpointing.num-retained = '1'; SET execution.checkpointing.interval = '15min'; SET execution.checkpointing.timeout = '10min'; SET execution.checkpointing.min-pause = '1min'; SET execution.checkpointing.externalized-checkpoint-retention = 'RETAIN_ON_CANCELLATION'; SET execution.checkpointing.min-pause = '3min'; SET state.backend.incremental = 'true'; SET table.exec.mini-batch.enabled = 'true'; SET table.exec.mini-batch.allow-latency = '10s'; SET table.exec.mini-batch.size = '10000'; SET table.exec.deduplicate.mini-batch.compact-changes-enabled = 'true'; CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='s3://aws-glue-repro-iceberg-test/iceberg_test', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO' ); CREATE TABLE IF NOT EXISTS test_tables_kafka ( kafka_key STRING NOT NULL, insight_id INT NOT NULL, user_id BIGINT NOT NULL, key STRING NOT NULL, ts TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'TestTopic', 'properties.bootstrap.servers' = '', 'properties.group.id' = 'flink-iceberg-experiment', 'scan.startup.mode' = 'group-offsets', 'key.format' = 'raw', 'key.fields' = 'kafka_key', 'value.format' = 'avro-confluent', 'value.avro-confluent.url' = ' ', 'value.fields-include' = 'EXCEPT_KEY' ); INSERT INTO glue_catalog.testdb_production.test_tables (app_id, user_id, key, value) SELECT insight_id, user_id, key, value FROM default_catalog.default_database.test_tables_kafka;
特に重要な設定は以下。
execution.checkpointing.interval: checkpointの実行間隔がicebergのcommit間隔になる。今回は調整の結果15分とした。- INSERT文の
OPTIONS('upsert-enabled'='true'): icebergの書き込みに際してupsertモードで書き込みを行う設定。flink SQLのヒント句として設定している。 scan.startup.mode: kafkaトピックのどこから取得を開始するのかを設定する。初期構築をtrinoで実施したので、初起動時はそれに合わせて少し前のtimestampから実行する。一度group.idのoffsetがkafka brokerに記録されたら移行はgroup-offsetsを利用してそこから処理を再開できる。
また、EMR側の設定でflinkのadaptiveスケジューラーを有効にしていて、負荷に合わせて自動でタスク数が増減する。
現状の秒間5000件程度の更新要求であれば1台で余裕で処理できることが分かった。
1タスクで15分に1回コミットなので、15分に凡そ1パーティションに1ファイルづつparquetファイルが増えていく。
24時間で 4(1hで4回) * 24h * 2(data file & delete file) = 96ファイル程増えることになる。
並列数がもっと必要であれば、その分作成されるファイル数も増える。
これを定期的なcompactionで解消できるかどうかを検証した。
メンテナンスタスクの実行
検証開始当初はGlue Data CatalogのTable Optimizationに任せるつもりだったが、compactionの挙動がどうも期待通りではなかったので、expire_snapshotとdelete_orpharn_filesだけGlueの機能に任せて、compactionはEMRを定期的に実行する形にした。
Glue Data CatalogのTable Optimizationは実行自体はちゃんと動くのだが、上記のDDLで作成したパーティションの全てに対してcompactionが行われていない様な挙動を観測した。これでは非常に困るので一旦利用を諦めて現在AWSのサポートに問い合わせ中。
expire_snapshotは12時間に1回実行、delete_orphan_filesは24時間に1回実行し48時間より前のファイルを削除する用に設定した。
compactionはEMRのcommand-runnerステップを利用して24時間に1回以下のshellコマンドを実行する。
spark-sql -e "CALL glue.system.rewrite_data_files(table => 'testdb_production.test_tables', strategy => 'sort', options => map('min-input-files', '50', 'partial-progress.enabled', 'true', 'partial-progress.max-commits', '10', 'max-concurrent-file-group-rewrites', '3', 'remove-dangling-deletes', 'true', 'rewrite-job-order', 'files-desc'));"
compactionオプション解説
min-input-files: あるパーティションのファイル数がこれを越えてたら対象とする。partial-progress.enabled: 進捗の途中でのコミットを有効にする。全てのパーティションの処理が終了する前に処理状況を反映できる。どれぐらいの頻度でコミットするかはpartial-progress.max-commitsとタスク総数によって決まる。max-concurrent-file-group-rewrites: 同時に処理するファイルグループ(パーティション)タスクの数。remove-dangling-deletes: compactionの結果必要なくなったdeleteファイルを削除する。rewrite-job-order: パーティションごとの処理順番を決める。files-descはファイル総数の多い順に処理。
24時間で1パーティションごとに最低でも96ファイル + deletesファイル分の小さいファイルが生成されるので、それをcompactionにより大きなparquetファイルに結合する。
このファイル増加ペースであれば、大体r8g.2xlarge * 20台で1時間〜2時間ぐらいの処理時間でタスクが完了する。
24時間に1度、2時間の所要時間の実行ペースで十分追い付けると分かった。
クエリ方法
icebergを利用しつつ現状のcassandraに対するクエリと同様に数分以内のリードタイムでデータを利用可能にするために、TrinoのKafka connectorとiceberg connectorをUNIONしたviewを構成し、そのviewに対してクエリを行うことで最新のデータだけをKafkaから取得可能になる。
検証実験では、現在時刻から30分以内のtimestampに限定してKafka connectorでクエリを行いそれ以外のデータはicebergからクエリを行う様にする。
Kafkaから読み込むデータ量は直近30分に限定すると、約2.7GBで2700万レコードに相当する。
Kafka上のデータはicebergと同様のパーショニングは行えないが、直近30分程度のデータ量であれば無駄を承知の上でレコードを読んでも大した負荷にならないことが分かった。
10台前後のtrinoクラスタがあれば数秒で読むことが出来るしKafkaクラスタにかかる負荷も許容範囲の小さいものだった。
icebergのcatalogをGlue Catalogに設定して構成していた場合、icebergテーブルが所属しているcatalog及びdatabaseに対してviewを生成するとGlue Data Catalog側に永続化される。そのためクラスタの停止や入れ替えを伴ってもviewを再作成する手間はかからないことも確認できた。
将来的にはApache Flussを活用できたりすると効率が良さそうと考えているが、現状ではTrinoからのクエリがサポートされていないしバックエンドのicebergサポートも計画中という感じなので、今後の展開に注目していきたい。
Flinkによる書き込みは動きがシンプルなので動作自体は非常に軽い。検証に利用したテーブルだけなら1台で十分日中のピークトラフィックにも耐えられる。対象のテーブルが増えても問題無いだろう。
15分単位のupsert書き込みを継続している状態で、compactionの実行が遅れると割と顕著にパフォーマンスに影響を与えることが分かった。
ただ、24時間に1度程度で解消でき、想定される最悪の状況で現在のワークロードに対応可能な範疇なので、運用負荷としては十分に許容範囲で対処できる範囲と言える。
しかし、compactionはかなり処理負荷が高くメモリもCPUリソースもかなり必要になる。特に今のicebergで普及しているテーブル仕様のバージョン(v2)とsparkの実装では、非常に大量の小さなファイル(特にdeleteファイル)が存在すると、compactionに膨大なメモリが必要になるため、compactionが長期に渡って実行されないとテーブルのメンテナンス自体が困難になる可能性があって危険。
という訳でcompactionの所要時間をそれなりに抑えて安定して実行できる様にするには割と大きめのsparkクラスタがいる。テーブルを増やしていけばワンタイムの実行でも多少のコストは必要になるだろう。
テーブルのtarget sizeやバケッティングの数を調整することで、compaction負荷とクエリ負荷のバランスを取ることは出来るのでテーブル設計時にしっかり考える必要がありそう。
クエリについては、Kafka Connectorとの組み合わせが現実的に利用できるぐらいの負荷とレイテンシで結果を返せることが分かった。Glue Data Catalogにviewを保存しておけることが分かった点も大きい。独立したマネージドサービスに永続化できるのでクラスタの再構成とviewの再構成を分離できる。ただし、同時に複数のクエリが発生した時のKafka Brokerへの負荷は別途検証した方が良いかもしれない。
今のReproのユースケースとワークロードを想定した総合的な評価としては、いくつかの懸念事項はあるが十分productionに投入可能であると判断する。書き込み動作の安定性自体は高く、明らかに対処不可能な課題は存在しなかった。
実際にproductionで運用するためには、運用の安定化のためにいくつか必要なものがある。
テーブルチューニング
検証で得た情報を踏まえて、テーブルの設定値(特にパーティショニング)についてよりよいバランスを設定したい。
モニタリング・監視
- flinkがちゃんと動作していることを監視し、マシンリソースと書き込みペースのメトリックを取得できる様にする。
- Flinkのreporterからdatadogに直でメトリック送れる様にしたい
- compactionを定期的に実行するスケジューラー。恐らくEventBridgeとStepFunctionで良いはず。
- 実行の仕組みの中でcompactionの失敗時にアラートを上げられる様にする仕組みが必要。
- icebergのテーブルメタデータによるモニタリング。パーティションごとのファイル数やファイルサイズ合計、利用可能なスナップショットの数など
上記が、今回Icebergの技術検証として行なった内容と評価結果です。
アウトプットとしてある程度シンプルにまとめてありますが、実際はクラスタ構築から実際に書き込みを行って動作確認をするまでに、S3のバケットの整備やIAMの権限の修正でterraformを直したり、実行中にAWSのコストの変動をモニタしたり、productionのミドルウェアをある程度利用するので負荷をかけても大丈夫な時間に実験したりと、それなりに時間がかかっています。
特にcompactionの実行には結構な時間とリソースが必要だったので、ここが安定して動作する状況に持っていくのは結構苦労しました。
やはり、この手のOpen Table FormatをUpdateありきで運用しようとしたら、重要になるのはテーブルのcompactionとパーティション数のバランスだなということを改めて実感できました。
この記事が、Icebergを利用しようとしている誰かの参考になれば幸いです。