HDFSのErasure Codingでインフラコストの削減に取り組むLINE

内田早俊氏:それでは始めます。こんにちは。LINEのData Engineering Center、Data Platform室に所属している内田です。最初に自己紹介から始めたいと思います。私は2018年にLINEに入社して以降、Hadoopを中心にデータプラットフォームの基盤開発や、その運用を行っています。

大規模にHadoopを運用していると、まだ誰も出会ったことのない問題や、新機能のニーズに出会うことがよくあります。そうした問題や、私たちに必要な機能を報告したり、パッチを送ったりすることで、積極的にオープンソースのコミュニティに貢献を行っています。

Hadoopには、大きく分けてストレージの機能を提供しているHDFS(Hadoop Distributed File System)と、コンピュテーションの機能を提供しているYARN(Yet Another Resource Negotiator)という2つのコンポーネントがあります。今日はHDFSについて話したいと思います。

LINEでは、HDFSのErasure Codingと呼ばれる、データをより効率的に保存するための機能を活用して、インフラコストの削減に取り組んでいます。

HDFS Erasure Codingは、Hadoop 3で導入された比較的新しい機能で、これまで私たちもさまざまな問題に出会ってきました。今日はその中でも、2020年末に起こったデータコラプションの問題を例に、私たちがふだん行っているエンジニアリングを紹介したいと思います。

特に後半は、HDFS Erasure Codingの実装に踏み込んだ話になりますが、大規模ならではの問題を解決することの楽しさをお伝えできればと思います。

HDFSについて

はじめに、HDFSについて簡単に紹介したいと思います。HDFSは、Hadoopのサブプロジェクトの1つで、分散型のファイルシステムを提供するミドルウェアです。非常にスケーラブルに設計されていて、数千台規模のサーバー上に、ペタバイト単位のデータを分散して保存できます。

また、データを自動的に複数台のサーバーにレプリケーションすることで、サーバーが死んでもデータが消えないようになっています。それから、大規模なデータを高いスループットで読み書きできるように設計されていて、主にデータ分析のためのストレージとして、世界的にも広く使われています。

HDFSは、マスターのNameNodeと、スレーブのDataNodeの2つのコンポーネントから構成されています。NameNodeはメモリ上で、ファイルシステムなどのメタデータを管理しています。ファイルはブロックに分割され、複数のDataNodeに分散して保存されます。

Clientは、例えばデータを読み込む時、そのデータがどのDataNodeに保存されているかをNameNodeに問い合わせて、該当のDataNodeから直接データを読み込みます。

データを書き込む際も同様に、Clientは直接DataNodeに書き込みますが、その際、DataNodeは複数のDataNodeにブロックをレプリケーションすることで、耐障害性を実現しています。

大規模なHDFS Erasure Codingでのデータ保存

私たちData Platform室では、全社に向けて統一されたHDFSクラスタを提供しています。データサイズ、サーバーの台数ともに、世界的にも非常に大規模なHDFSクラスタを運用しています。特に注目したいのがデータの増加量で、サービスの成長や、新しい分析のニーズに伴い、毎月平均して約10ペタバイトずつ増加しています。

ただし、すべてのデータが毎日活用されているわけではないため、古いデータをより効率的に保存して、インフラコストを削減するために、HDFS Erasure Codingの活用を進めています。

Erasure Coding自体は、RAIDなどで古くから活用されている技術ですが、HDFSではHadoop 3でようやく導入されました。

(スライドを指して)左が従来のレプリケーションによるデータの保存方法で、右がErasure Codingによるデータの保存方法を表した図になります。

いずれの場合も、HDFSではファイルは、まずブロックと呼ばれる単位に分割されます。レプリケーションの場合、そのブロックはデフォルトで3台のDataNodeにコピーされて保存されます。

この場合、1つのブロックだけを保存する場合と比べて、データ量は3倍になります。Erasure Codingではこのブロックを、さらにセルと呼ばれるより細かい単位に分割します。そして、(スライドの)図のRS-3-2と呼ばれるErasure Codingでは、d1、d2、d3という3つのデータセルから、p1、p2という2つのパリティセルを計算し、それらを異なる複数のDataNodeに保存していきます。

データセルから構成されたブロックはデータブロック、パリティセルから構成されたブロックはパリティブロックと呼ばれ、合計で5つのブロックのうち2つのブロックが失われても、残りの3つのブロックから失われた2つのブロックを復元できるのがErasure Codingの大きな特徴です。

そして、このRS-3-2の場合、データ量は元のデータ量と比べて3分の5、つまり約1.7倍で済みます。また、RS-3-2の「3」と「2」という数字は、それぞれデータブロックの数と、パリティブロックの数に対応していて、データブロックの数だけブロックがあれば、残りのブロックを復元できます。LINEでは、HDFSのデフォルトのRS-6-3を使っています。

データのリカバリー方法

データのリカバリー方法について、もう少し詳しく見ていきたいと思います。レプリケーションの場合は単に、同じブロックをほかのDataNodeにコピーすることでリカバリーできますが、Erasure Codingの場合、生きているブロックから失われたブロックを復元する、Erasure Coding Reconstructionと呼ばれる特別な処理を行う必要があります。

図ではDataNode6が、d1、d3、p2を取得してd2とp1を復元し、d2は自分に、p1はDataNode7に保存しています。

(スライドを指して)こちらがErasure Codingとレプリケーションの主な特徴をまとめた表になります。時間の都合上、すべての説明はできませんが、最も重要な点は、Erasure Codingはレプリケーションより、より高い耐障害性を保ちながら、より効率的にデータを保存できるということです。

表のRS-6-3の場合、Storage efficiencyで示されているように、Erasure Codingは、レプリケーションの半分のサイズでデータを保存できます。

大規模にErasure Codingを運用していく上で気をつけなければならないのが、小さなファイルの保存です。レプリケーションの場合は、3つのブロックを生成するだけで済みますが、Erasure CodingのRS-6-3の場合、小さなファイルに対しても、9つのブロックを生成してしまうため、NameNodeの負荷が上がってしまいます。

LINEでは、この小さいファイルによる負荷を避けるために、ファイルサイズをコントロールしやすいコールドデータに対して、Erasure Codingを適用しています。

現在、RS-6-3を使って、約12ペタバイトのデータを保存しています。これはつまり、レプリケーションの場合と比べて、約12ペタバイト分のディスク容量を削減できたことになります。

ユーザーからの報告で判明したデータコラプションの問題

それでは、2020年末に起こったデータコラプションの問題と、その解決方法の話に移りたいと思います。この問題に気づいたきっかけは、ユーザーからの報告によるものでした。このエラーは、ユーザーがErasure CodingされたORCファイルを、Hiveから読もうとした時に出たものです。

このエラーだけでは、HiveやORCの問題なのか、それともErasure Codingの問題なのかが判断できなかったため、すべてのErasure Codingされたファイルを、徹底的に調べることにしました。

幸い、同様のエラーを出すErasure Codingされたファイルの中に、バックアップが残っているものを見つけ出せたため、そのファイルの9つあるブロックを、一つひとつビットワイズに比較してみました。

すると、ある1つのブロックだけ、ビット列が異なっていることがわかりました。さらに、そのブロックのヒストリーを、すべてのDataNodeのログからたどったところ、そのブロックが過去にリコンストラクションされたものであることや、その直前のリコンストラクションで、NullPointerExceptionが発生していたことがわかってきました。

これらのことから、Erasure Coding Reconstructionになにか問題がありそうだと判断しました。

こうした問題が発生した時にはまず、HDFSのコミュニティで、すでにバグ報告がないかを調べます。すると、HDFSのJIRA上で、私たちのオブザベーションと、非常に類似性の高いopen issueを発見しました。

このissueの概要を紹介すると、StripedBlockReconstructorというDataNode上でリコンストラクションのメインロジックを担っているクラスは、バッファープールをスタティックに持っていて、それをリコンストラクションの際に使っています。

そして、そのバッファープールがある条件下で汚染されてしまい、その汚染されたバッファープールを使って、リコンストラクションされたブロックが壊れてしまうという問題です。汚染が起きる条件を理解するために、まず、リコンストラクションの仕組みを見ていきましょう。

リコンストラクションの仕組み

StripedBlockReconstructorは、デフォルトだと64キロバイトのバッファーサイズごとに、リコンストラクションを行います。StripedBlockReconstructorは、リコンストラクションを始める前に、まずStripedBlockReaderと呼ばれるDataNode上のブロックを読むためのオブジェクトを生成します。

その際、Readerはリコンストラクターのバッファープールから、バッファーを取得します。そして、各イテレーションにおいて、リコンストラクターはReaderにソースのDataNodeからデータを読むように命令を出し、Readerはバッファーにデータを読み込みます。

すべてのReaderの読み込みが終わると、リコンストラクターはバッファーのデータを使って、リコンストラクションを行います。リコンストラクションされたデータは、StripedBlockWriterによってターゲットのDataNodeに転送されます。

イテレーションの最後に、バッファーはクリアされます。すべてのイテレーションが終わると、Readerはバッファーをリコンストラクターのバッファープールに戻します。

それでは、バッファープールの汚染がどのようにして起こるのか、見ていきたいと思います。

この問題は、あるStripedBlockReaderがタイムアウトしてしまう場合に起こります。あるReaderがタイムアウトすると、別のDataNodeからデータを読むために、新しいReaderが作られます。リコンストラクションに必要な最小の数のReaderの読み込みが終わると、タイムアウトしたReaderはキャンセルされます。

すると、次のイテレーションで、リコンストラクターがReaderから結果を取得する時にNullPointerExceptionが発生してしまい、リコンストラクターが失敗します。

NullPointerExceptionが発生するのは、タイムアウトが原因でキャンセルされたReaderの取り扱いが悪いからですが、詳細はソースコードを詳しく追う必要があるので省略します。

リコンストラクター終了の際、Readerをクローズする前に、Readerにバッファーをバッファープールに返すように命令します。

しかし、そうするとReaderは、バッファーを返したあともバッファーを持ち続けているので、クローズされる前の間データを読み込み続けてしまい、バッファープールの汚染が発生します。

バッファープールはスタティックに定義されているので、おそらくは別のブロックのリコンストラクションにおいて、別のStripedBlockReconstructorがこの汚染されたバッファープールを使ってリコンストラクションを行ってしまい、結果的に壊れたブロックが生成されてしまいます。以上が、バッファープールの汚染によって壊れたブロックが生成されるシナリオです。

このように私たちは、ソースコードを読み込むことで、このバグが原因でファイルが壊れてしまうということは間違いないと判断し、パッチを当てることにしました。

データコラプション防止のために解決すべき2つの課題

では、パッチを当てればそれで終わりかというと、そうではありません。パッチを当てたあと、データコラプションが起こらなくなったことを、どのように監視するのか。それから、ほかのバグが潜んでいたり、将来的に新たなバグが発生する可能性も踏まえ、今後どのようにしてデータコラプションを防ぐのかという、大きく2つの課題を解決する必要がありました。

1つの目のデータコラプションの検知についてはSparkを使い、Erasure Codingされたファイルのチェックサムを、デイリーでHiveテーブルとして保存し、それを比較することにしました。

実装はすぐに終わりましたが、実際にジョブを走らせてみると、いくつかの予期せぬ問題に出会いました。

1つ目は、Erasure Codingされたファイルのチェックサムを取得する際、DataNode上でファイルディスクリプタのリークが起きるという問題で、2つ目は、誤ったチェックサムの値が返されるという問題です。

いずれもまだ報告されていない問題だったため、私たちはHDFSのソースコードにパッチを送りました。これら2つの問題について、詳しく見ていきましょう。

Erasure Codingされたファイルに対して、getFileChecksumが呼ばれると、図ではDataNode1が、ほかのDataNodeからデータブロックのチェックサムを取得し、それらのブロックから、そのファイルのチェックサムを計算します。

タイムアウトなどが原因で、あるブロックのチェックサムを取得できなかった場合、DataNode1は、生きているブロックからそのブロックをリコンストラクションして、チェックサムを計算します。

ファイルディスクリプタリークの問題は、このリコンストラクションで使われたDataNodeへのコネクションが、クローズされないために起こっていました。

間違ったチェックサムが返ってくる問題も、このリコンストラクションの際に起こっていました。(スライドの)図を例に説明すると、タイムアウトなどが原因で、p1とp2を取得できずにリコンストラクションが失敗した時、getFileChecksumがd3を無視して、d1とd2だけからファイルのチェックサムを計算してしまうことが原因でした。

2つ目の、今後どのようにしてデータコラプションを防ぐのかという問題に関しても、私たちはリコンストラクションの正しさを、リコンストラクションのたびにチェックするバリデーションロジックを実装して、HDFSのソースコードにパッチを送りました。

この機能はすでに、アップストリームにマージされていて、DataNode上のhdfs-site.xmlで、ec.reconstruction.validationというプロパティをtrueに設定することで有効になります。

また、EcInvalidReconstructionTasksというメトリクスも提供していて、バリデーションが失敗した場合には、このメトリクスを使って検知できます。

アイデアは、リコンストラクションされたブロックを使って、もう一度バリデーションのためのリコンストラクションを走らせるというものです。

例えば、(スライドの)図でd2が失われた場合、DataNodeはd1、d3とp1を使ってd2をリコンストラクションしますが、私たちのパッチは、この時リコンストラクションされたd2を使い、続けてd1をリコンストラクションします。

ここで得られたd1、つまり図のd1'がもともとのd1と一致していなければ、リコンストラクションにバグがあるので、リコンストラクションを失敗させます。

これまでに見てきたように、バグは特殊な条件のもとで起きることがほとんどです。ダーティーバッファープールによるデータコラプションの場合、リコンストラクションでタイムアウトが発生してしまうことが引き金になっていましたが、私たちのパッチによって、その条件が解消されるまでリコンストラクションを失敗させ、壊れたブロックの発生を食い止められるようになります。

このバリデーションは、リコンストラクションの正しさを100パーセント保証するものではありませんが、同様のバグに対して、非常に効果的だと考えています。

パッチを当てて以降、データコラプションは検知ゼロ

パッチを当ててもうすぐ1年ほど経ちますが、私たちの監視システムではその後、データコラプションは検知していません。以前は数日に1個のファイルが壊れていたと予想されるため、少なくとも現時点では、データコラプションのバグは解消されたと言えそうです。

しかしながら、より大規模にErasure Codingの正しさを保証しようとすると、今のすべてのErasure Codingされたファイルのチェックサムを定期的に取得するやり方は、ファイル数に応じて実行時間が長くなってしまうため、あまり現実的ではありません。

将来的には、ブロックのチェックサムを、リコンストラクションなどのイベントごとにリアルタイムで取得し、監視する仕組みが必要だと感じています。

大規模な環境でのHDFS Erasure Codingは注意

それではまとめに入りたいと思います。HDFS Erasure Codingは、たいていの場合うまく動作しますが、大規模な環境でバグが顕在することがしばしばあります。その意味で、まだまだ成熟しているとは言い難いと思います。

Erasure Codingを使うことを検討している場合には、できるだけ最新のメンテナンスバージョンのHDFSを使うことと、HDFSのJIRAを定期的に確認することをおすすめします。

最後になりますが、今後もHDFSだけではなく、さまざまなOSSに貢献しながら世界中のエンジニアと一緒に、よりよいデータプラットフォームを作っていきたいと思います。

ありがとうございました。