自己紹介と後半のアジェンダ

尾野健氏:ここからはスピーカーが代わります。尾野健と言います。Data Engineering1 teamに所属していて、LINEには2019年に入社しました。現在はインジェスチョンパイプラインを開発中です。

後半のアジェンダです。現在のログパイプラインの概要、そのログパイプラインが抱える問題、その問題がIceberg導入によってどのように解決されるか。そしてFlink Iceberg applicationの詳細、最後にプロジェクトの今後の進め方を説明します。

ログパイプラインの概要

それでは現在のログパイプラインの概要です。(スライドを指して)この図は、現在のログパイプラインの最初の段階を示しています。もともとはシンプルなアーキテクチャでした。Kafka、Flink、HDFS上のHive tableという構成で、テーブルはAppend Onlyです。

ユーザーはプロトコルバッファでスキーマを定義し、プロトコルバッファ、またはJSONのメッセージをKafka(Apache Kafka)にプロデュースします。Flinkはそれをコンシュームし、シーケンスファイルフォーマットでレコードをなにも変換せずに、そのまま追記していきます。

ユーザーが送信したデータをそのまま保持しているということで、我々はこれをRAW tableと呼んでいます。ユーザーは特別なSerdeを使ってそれを読むことができます。

そして、Flinkのチェックポインティングの仕組みを使い、exactly-once deliveryをサポートしています。

プロトコルバッファのデシリアライズは高速ではありますが、それでもテーブル数、ユーザー数が増えた結果、Yarnの計算リソースが枯渇し始めました。原因は、シーケンスファイルは1行ごとにデシリアライズを行わなければならないからです。そこで、カラムナフォーマットを導入することになりました。

より高速なカラムナフォーマットとして、ORC tableを導入しました。Watcherというプロセスが、RAW tableのHDFSファイルの更新をパーティションごとに頻繁にチェックしています。

更新を検出すると、Hiveserver2経由でINSERT OVERWRITEクエリを実行し、RAWデータをORCに変換します。ORCファイルなので、ユーザーは特別なSerdeなしにテーブルを読めます。なお、すべてのテーブルはexternal tableとなっています。

このように、もともとのRAW tableのパイプラインは残しつつ、ORC tableへの変換処理を追加した2段階のアーキテクチャとなっています。

ログパイプラインが抱える問題

現在のログパイプラインが抱える問題です。最初の問題は、end-to-endの到達に時間がかかることです。ログが生成されてからテーブルとして読み込めるようになるまで、1時間から2時間程度かかっています。これにはいくつかの理由があります。

小さいファイルが大量にあると、NameNodeのメモリ使用量が増え、いわゆるスモールファイル問題が発生します。これを避けるため、Flink applicationはtruncateをサポートしているBucketingSinkを使い、できるだけファイルを生成しないようにしています。

アワリーで作られているパーティションに対して、RAW tableのシーケンスファイルは1時間ごとにフラッシュされるので、この時点で最大1時間程度のレイテンシーが発生します。さらに、ORC tableへの変換はそのあとに追加で行われ、また、変換すべきテーブル数も多いため、遅いものでは50分後くらいに完了しています。

こうして最大2時間程度のレイテンシーが発生します。最新のデータに基づいて判断したり、サービスを提供するためには、このレイテンシーを短くすることが求められています。

この問題の改善案として、直接ORCファイルを出力するFlink applicationを実装し、提案してみました。FlinkのStreamingFileSinkと、OrcWriterを使用しています。このSinkはORCを直接出力できますが、truncateはサポートしていないので、数分ごとのチェックポイントのたびにファイルがフラッシュ、クローズされます。

その結果、end-to-endレイテンシーは数分に短縮されます。するとファイル数が大量になるため、スモールファイルのコンパクションが必要となります。しかし、このコンパクションの実装が非常に困難であると判明しました。

小さいファイルをコンパクションしたあとには、ファイルの置き換えが必要になります。我々の使っているHiveはACID Transactionを使用していないので、ファイルを置き換える瞬間に読もうとすると、ファイル欠損エラーが発生する可能性があります。

また、遅れてやってくるログがある状況で、どのパーティションをコンパクションすべきかを検出するためには、大量のディレクトリを頻繁にスキャンしなくてはならず、NameNodeへのスキャンのコストも高くなります。つまり、エラーのリスクがあって実装も複雑で、リソースのコストも高いことになります。結果、この提案は不採用になりました。

次にシステムの堅牢さに関する問題があります。最初の問題は、コンポーネントが多すぎることです。ORC変換を追加したため、Flink、HDFSだけではなく、Watcher、Hiveserver2、Hive Metastore、Yarnのすべてが正しく動作していないとパイプラインが停止します。依存するコンポーネントが多いと障害の可能性が高まり、また、障害時の原因調査が難しくなります。

次に、運用の問題があります。テーブル、パーティションの操作は基本的にすべて自動化されていますが、それでもユーザーからの特殊なリクエストの際には、手動のオペレーションが必要です。ログごとに2種類のテーブルがあり、external tableのためメタデータとファイルをそれぞれ処理する必要があって、ミスが発生しやすいです。

そして高負荷の問題です。これは前半でも話したことですが、多くのテーブルがあるため、NameNodeとHive Metastoreに大量のファイル情報とパーティション情報が格納されています。クエリを計画する際や、Watcherがパーティションをチェックする際に、これらに負荷が集中しやすいです。

最後に、スキーマエボリューションの制約の問題があります。ORCのフィールドマッピングのロジックは、ポジションベースを採用しています。Hive3からは名前ベースのマッピングも対応していますが、これまでのシステムの互換性を維持するために、そのままポジションベースを採用しています。

そのため基本的には、table/structに対するフィールドの追加しかできません。削除、挿入、移動は以前のデータを作り直さないとエラーになります。renameは可能ですが、renameをするとSparkのクライアント設定を変更する必要があるため、推奨されていません。ユーザーからは使わなくなったフィールドの削除や、新しいフィールドをわかりやすい位置に挿入したいという要望が時々あります。

Icebergの適用でどう解決されるか

ではIcebergの適用により、これらの問題がどのように解決されるかを見ていきます。最初に、Icebergを使った新たなパイプラインの概要を説明します。基本的には先ほど述べた不採用となったORCを直接出力する、Flinkのアーキテクチャと同じです。異なるのは、FlinkがIceberg tableに対して、直接ORC/Parquetなどのカラムナファイルを書くことです。

また、テーブルのメンテナンスを行う、Table Optimizerというプロセスが存在することです。Flinkはファイルを5分ごとにフラッシュしています。Table Optimizerは、Icebergが提供しているテーブルをメンテナンスするための各Sparkアクションを実行します。

これまで1時間から2時間かかっていたend-to-endのレイテンシーが、5分に短縮されます。Flinkは5分ごとに発生するチェックポイントのタイミングでファイルをフラッシュ、クローズします。したがって、フラッシュした瞬間からカラムナフォーマットのファイルを読めます。

ここで問題になるのはスモールファイルのコンパクションですが、IcebergのSnapshotの仕組みによって、先に見たようなファイル読み込み時の失敗は発生しません。今まさにFlinkが書き込んでおり、ユーザーが読んでいるパーティションに対してでさえ、Sparkがコンパクションを安全に実行できます。

(スライドを指して)図の緑の線はタイムラインを示しており、左から右に時間が進んでいます。Flinkは5分ごとに00、01、02と書き込んでいます。03が作成された時点、つまり、snapshot-010のタイミングでSparkによるコンパクションがFlinkと並列に開始され、00から03のファイルをマージしようとします。

その後、Flinkは04を含むsnapshot-011を作成し、Sparkがコンパクションを完了し、マージされた05を含むsnapshot-012を作成します。snapshot-012が作成される直前に読み込みを開始しても、ここでは以前のようなコンパクションによるファイル消失は発生しません。

Snapshotはエクスパイアされない限りは消えないので、コンパクション後も011以前のSnapshotは存在しており、00から04のファイルも残っています。また、snapshot-012以降は00から03のファイルは含まれないので、05の内容と重複することもありません。

現在のものと比べると、シンプルでスケーラブルなアーキテクチャになります。パイプラインからはHiveserver2、Yarnは不要になったため構成がシンプルになり、障害の要因が減りました。

もちろんTable OptimizerはSpark on Yarnに依存していますが、これが行うのはテーブルメンテナンスであり、そのジョブが遅延したとしてもサービスにとっては致命的ではありません。

またIcebergのhidden partitionの機能により、Hive external tableに対して必要だったパーティション、ファイルを管理するオペレーションは不要になりました。古いレコードを削除するのもDELETEクエリを発行するだけでよく、パーティション、ファイルの削除は不要です。

NameNode、Hive Metastoreへの負荷も軽減されています。これも前半でお話したとおり、Icebergではテーブルやパーティションを構成するファイルの情報は、すべてファイルとして書き込まれています。クエリの実行計画を作る際も、Table Optimizerが各パーティションの状態を確認する際も、NameNode、Hive Metastoreへのスキャンは不要となっています。

そして、これまでできなかったスキーマ変更、フィールドの追加、rename、削除、挿入、移動と、プロトコルバッファスキーマが対応しているすべての変更は可能になります。

renameと同時に移動もサポートしているのは、内部的にフィールドごとにユニークなIDが付与されているからです。ポジションでも名前でもなく、IDをもとに各変更がトレースされています。

ここまでをまとめます。スモールファイル問題、end-to-endレイテンシー、スキーマエボリューション、スケーラビリディの問題、これらはすべてIcebergの導入によって解決されます。

Flink Iceberg applicationの実装

では、Flink Iceberg applicationの実装の詳細を見ていきます。FlinkのDataStream APIを使う場合、それがインプットとして採用しているRowDataタイプに変換しなくてはなりません。レコードタイプとして用いているので、プロトコルバッファに加えてJSONもサポートする必要があります。

Icebergはタイムスタンプやストリング以外のキーを持つマップ型もサポートしているので、Int64をタイムスタンプに変換したり、JSONの文字列キーをキャストする必要があります。

またプロトコルバッファのenumはサポートしていないので、ストリングに変換する必要があります。現在の実装では、いったんこれをAvroに変換しています。これは、別のプロジェクトで実装した、プロトコルバッファをAvroに変換する処理を流用しているからです。Icebergが提供しているFlinkAvroReaderを使えば、AvroをIcebergに対応するRowDataにそのまま変換できます。

もちろん現在の実装でも正しく動作しますが、さらにパフォーマンスを向上させて設計をシンプルにするため、Avroに変換せずに直接RowDataに変換することを検討しています。

Flinkは各フォーマットをFlinkのRow、RowData形式に変換するライブラリを提供しています。JSONはflink-formats/flink-jsonがあり、プロトコルバッファに対するflink-protobufというパッケージもFlink1.15で予定されています。これらのライブラリを活用、もしくは参考にしつつプロトコルバッファ、JSONの直接変換の実装を進めていきたいと考えています。

Flinkアプリの実装

Flinkアプリの実装を詳しく見ていこうと思います。Icebergの場合、Flinkのデータフローグラフが、複数のStreamWriterと単独のFilesCommitterで構成されているのが特徴的です。ConsumerによってKafkaから取り込まれたレコードは、ConverterによってRowDataに変換され、Writerに送られます。

Writerは、受け取ったレコードをIcebergのテーブルの適切なパーティションに、ORCまたはParquetとして書き込みます。Flinkのチェックポイントバリアが各オペレーターに到達するとWriterはファイルをクローズし、パスなどのファイル情報をCommitterに送ります。

Committerは受け取ったファイル情報をもとにManifest fileなどを作成し、新たなSnapshotをIceberg tableにコミットします。このコミットは1度だけ行われるべきなので、Committerはフローの中に1つだけ存在することになります。

このチェックポインティングのどこかで処理が失敗した場合、Flinkは最後に成功したチェックポイントから処理を再度実行するため、データの欠損は発生しません。また、コミットに失敗した更新は、たとえファイルがパーティション内に残っていたとしてもIceberg tableには反映されないので、重複も発生しません。つまり、exactly-onceでデリバリーが行われています。

また、IcebergはFlinkSinkというSinkを提供しています。これはWriterとCommitterで構成されるデータフローグラフをカプセル化しているもので、パフォーマンスに問題がなければFlinkSinkを使うのがシンプルです。しかし、我々はこれを使っていません。

各Consumerは1つのKafkaパーティションをコンシュームしていますが、Kafkaからのデータ流量が多い場合はWriterを増やす必要があります。FlinkSinkの場合、Parallelismを増やしてWriterの数を増やします。

そうすると、異なる並列度のオペレーター間でのストリームの再分配、つまりシャッフルが行われるため、タスクマネージャー間のネットワーク経由のデータ転送が発生してしまいます。パイプラインで処理するテーブルの中には秒間1ギガバイト以上のデータ流量を持つものもあり、不要なネットワーク転送の負荷はできるだけ減らしたいと考えています。

そこで我々は、Parallelismは変えずにWriterの数を増やすため、Splitterというオペレーターを導入しています。これは、データ流量に合わせて稼働するWriterの数を動的に変更する仕組みです。タスクマネージャー間のネットワーク経由のデータ転送は発生しないため、パフォーマンス劣化は生じません。また、急にデータ流量が増えた場合にも対応できます。

ここでは実装の詳細は省きますが、FlinkのSide Outputsという仕組みを活用して実装しています。

今後のプロジェクトの進め方

最後にこのプロジェクトの今後の進め方をお話しします。まずはIceberg tableの安定性やスケーラビリティ、クエリエンジンを検証し、そしてマイグレーションプランを検討していきます。

次に、データパイプラインで使っているテーブルを、Hive tableからIceberg tagleへと置き換えます。そしてインクリメンタルリード、行レベルでの更新、タイムトラベルなどのIceberg特有の機能を適用していきます。

我々の部署では、以下のポジションを募集中です。ご関心のある方は、ぜひサイトにアクセスしてみてください。ご視聴ありがとうございました。