2019年にクラスタをHadoop 3にアップグレード

内田早俊氏(以下、内田):LINEのData Platform室の内田です。本日は忙しい中ご参加いただき、ありがとうございます。

LINEでは10年近く大規模なHadoopクラスタを運用していますが、昨年(2019年)クラスタをHadoop 3にアップグレードしました。ディスク使用量を削減するためにHadoop 3で新しく追加されたHDFSのErasure Coding(EC)を、約1年間本番環境で使っています。今回はErasure Codingを使っていたときに、本番環境で実際に起きたトラブルとトラブルシューティングを紹介したいと思います。

はじめに簡単に自己紹介いたします。私はData Platform室という、全社的なデータプラットフォームを開発する組織に所属していて、主にHadoopまわりの運用を担当しています。2018年の2月にLINEに転職して以降、HadoopやKafka、Flinkの運用やそのまわりの開発をしてきました。分散システム全般やプログラムの検証理論に関心があります。

今回のアジェンダですが、Erasure Codingのトラブルシューティングの話の前にLINEのデータプラットフォームと、昨年(2019年)から進めているHadoopクラスタの統合プロジェクトを紹介したいと思います。今回お話しするErasure Codingのトラブルは、この統合の過程で起こった問題です。

Data Platform室が取り扱っているプロダクト

Data Platform室では、LINEの社内ユーザーに対して全社的な統一データプラットフォームを提供しています。各サービスのログを収集するパイプラインの自動化や分析環境の提供、セキュリティなどのデータのガバナンスなどをしています。社内ユーザーにはサービスの開発者やプランナー、マーケターがいて、DAUは1,000人を超えています。

セルフサービスなデータプラットフォームを目指していて、現在はポータルサイトの開発に力を入れています。

私たちはOSSを積極的に活用していますが、ただ、既存のOSSだと要件を満たさないこともあるため、自社独自のツールやAPIの開発にも積極的に力を入れています。いくつか簡単に例を紹介すると、例えばこのYANAGISHIMAはPrestoやHiveのクエリツールでOSSとしてGitHubにも公開されています。個人的には、クエリの結果をWebリンクで簡単に共有できるところが気に入っています。

そしてこのOASISは、Zeppelinライクなノードブックのツールで、SparkやPrestoに対応しています。ノートブックやそれを格納するスペース単位での細かな権限管理やスモールファイルのマージなど、OASIS独自のSpark APIを実装しているのが特徴です。

こちらはData Platform室の規模を表しています。細かい数値は現在と若干異なっていますが、ご覧の通り非常に大規模なプラットフォームを開発・運用していて、大規模ならではの課題がたくさんあります。

Data Platform室がまだなかった時代には、10を超えるHadoopクラスタが存在していました。悪く言うと乱立していました。そのために運用開発コストが高くなっていたり、ユーザーもどのHadoopクラスタを使えばよいのかわからなかったりする問題がありました。これを解決するために、Data Platform室が現在Hadoopクラスタの統合プロジェクトを進めています。

またいろいろなサービスのユーザーが存在するため、リソースアイソレーションやセキュリティなどのデータガバナンス、コストガバナンスが重要になります。またハードウェアのリソースは無限に存在するわけではないので、リソースを有効に活用する必要があります。特にデータサイズは、現在およそ169PBに達していて、日々増加を続けています。

データを削除できれば簡単なのですが、法的な理由で削除できないようなデータをアーカイブする必要があり、そのためにHDFS Erasure Codingを積極的に活用しています。

Hadoopクラスタの統合について

Hadoopクラスタの統合については2019年のLINE DEVELOPER DAYで発表をしています。HDFS Federationという技術を使ってユーザーに対する影響を最小限に抑えて統合しています。ただ、Hadoopのバージョンが違っていたり、Kerberos化されていなかったりするクラスタ同士のFederationは課題も多くて、いくつかの独自パッチを開発する必要がありました。

その中の1つはアップストリームにもコミットされています。LINE DEVELOPER DAYではHDFSでの統合の話がメインでしたが、その後Metastoreをダウンタイムなしで統合するために複数のMetastroreをストリーミングでシンクするコンポーネントを開発して、現在本番環境で運用しています。これについても近い将来お話できればと思っています。

またクラスタの統合と合わせてHadoopクラスタ専用のサーバールームを構築し、既存のサーバーの物理移動も行っています。この時Hadoopのマスターコンポーネントをノーダウンタイムで移行する必要があったのですが、そのときの話を先日公開しました。

この移行の際に、クラスタが一時的に複数のデータセンターに跨って存在してしまうことになってしまって、それが原因でErasure CodingがNetwork Congestionを引き起こしてしまったというのが、今日の発表のメイントピックになります。

それではHDFS Erasure Codingの話に移りたいと思います。

HDFS Erasure Codingの機能

Erasure Coding自体は、古くからある技術なんですが、HadoopにはHadoop 3でようやく導入されました。Erasure Codingを使うと、耐障害性を保ちながら、より効率的にデータを保存できるようになります。簡単に流れを紹介しますと、まずブロックをセルと呼ばれる細かい単位に分割して、6つのデータセルから3つのパリティセルを計算します。

この図のd1からd6までがデータセルで、p1からp3までがパリティセルです。そしてこの図のストレージブロックごとにデータノードにパラレルに書き込みが行われます。この場合、これら9つのブロックのうち3つが失われても、残りの6つからデータを復元できるのが特徴です。レプリケーションの場合には、このようなブロックの分割は行われなくて、単にコピーすることで耐障害性を担保しています。

こちらがレプリケーションとErasure Codingの比較表ですが、ここで重要なポイントとしては、まずErasure Codingはレプリケーションと比べてストレージのefficiencyが2倍という点が挙げられます。つまりECでデータを保存した場合、レプリケーションの半分のディスクしか使われないというメリットがあります。

リカバリーは、レプリケーションの場合、単に残っているデータを1つコピーするだけでよいですが、Erasure Codingの場合、ECリコンストラクションと呼ばれる特別な処理をする必要があるため、非常にコストが高くなっています。

Write/Readパフォーマンスについては、Erasure Codingはストレージブロックごとにパラレルにデータを書き込むことができるので、一般的には速いと言われていますが、より多くのCPUとネットワークを使うので、環境依存とも言えます。

Data Platform室では、コードデータ用にErasure Codingを約1年ほど使っていて、現在10PBのデータを保存しています。つまりレプリケーションの場合と比べて10PBのデータを削減できたことになります。仮に、1台のサーバーのデータ容量が300TBだとすると、30台ぐらいのサーバーを節約できたことになります。

データロスは今のところありませんが、運用していく中でいくつかの本題にぶつかりました。その中でもErasure Codingのリカバリー、つまりリコンストラクションの際に起こった問題を紹介します。

ECリコンストラクションの問題

ECリコンストラクションは、DataNodeがサーバー障害などでダウンしてブロックが失われた際に、残っているブロックからデータを復元する仕組みです。図にあるようにDN4、DN7、DN8がダウンした場合に失われたブロックを、別のDataNode、図だとDN11が復元します。この際、DN11は6つのDataNodeから残っているブロックを集めてくる必要があります。

レプリケーションの場合には、単に1つのブロックをコピーしてくるだけでいいのですが、Erasure Codingの場合は6つのブロックを集めてくる必要があるので、より多くのネットワークトラフィックが発生することになります。

その結果、先ほど触れたサーバー移動の際に、クラスタがデータセンターを跨って存在することが原因で、データセンター間のNetwork Congestionが発生してしまいました。最初ネットワークチームから連絡を受けたのですが、PrometheusでHadoopのメトリクスを確認したところ、Erasure Codingのメトリクスが異常な値を示していることに気づきました。

左が各データノードごとのErasure Codingのリコンストラクションの数で、右がリモートから読んだErasure CodingのB/sです。なので、そこでErasure Codingリコンストラクションをスロットルする方法を調べるために、Hadoopのソースコードを読み始めました。

ECリコンストラクションのしくみ

こちらがECリコンストラクションのインターナルの概要になります。上がNameNode、下がDataNodeです。まずRedundancyMonitorというコンポーネントが定期的にブロックレポートをもとに、足りないブロックがないか、余っているブロックがないかをチェックして、Erasure Codingのブロックが足りない場合には、各DataNodeDescriptorにEC reconstruction tasksを渡します。

一方で、DataNodeのBPServiceActorというコンポーネントが定期的にNameNodeにHeartBeatを送っています。この際xmitsInProgressというメトリクスを送ります。このメトリクスはそのDataNode上で現在走っているレプリケーション、もしくはリコンストラクションタスクの数を表します。

このメトリクスの値をもとに、NameNodeはmaxTransferと呼ばれる、いくつレプリケーションもしくはリコンストラクションタスクをDataNodeに送ることができるのかを示す値を計算して、DataNodeに命令を返します。

それを受け取ったErasureCodingWorkerは、設定であらかじめ決められた数のタスクを並列で実行します。この際、ErasureCodingWorkerはタスクをサブミットする前にxmitsInProgressをインクリメントして、StripedBlockReconstructionはタスクが終わるとxmitsInProgressをデクリメントします。

xmitsInProgressとmaxTransfer

次に、xmitsInProgressとmaxTransferについて詳しく見ていきたいと思います。まずxmitsInProgressですが、このメトリクスはDataNode上で現在走っているレプリケーションもしくはリコンストラクションタスクの数を表します。この際Erasure Codingリコンストラクションタスクにweightがかけられているというのがポイントで、このweightはec.reconstruction.xmits.weightという設定で制御できます。

例えば、レプリケーションタスクがなくて、リコンストラクションタスクが2つある場合は、ここの例にあるようになります。ここでmax(6, 2)は6つのDataNodeからブロックを受け取ってデータを復元して、2つのDataNodeに渡す場合です。max(6, 1)も同様で、この場合ですとxmitsInProgressは6になります。

次にmaxTransferですが、この値はNameNodeがDataNodeに送ることができるレプリケーションもしくはリコンストラクションタスクの数を表します。ここでmaxStreamsはreplication.max-streamsという設定で制御できます。

例えば、maxStreamsが2で、xmitsInProgressが3の場合、-1になるのでタスクを送ることはできませんが、2と0の場合2つのタスクを送れます。

そこで、maxStreamsを1にすることで、Network Congestionを和らげることができるのではないかと考えました。ただ、実際に設定を適用する前に本番環境のDataNodeのxmitsInProgressの値をPrometheusを使って調べたところ、その値がマイナスになっていました。

このxmitsInProgressがマイナスになっていると、maxTransferは意図しない大きな値になってしまいます。実際に、このときxmitsInProgressは-1,000以下だったので、DataNodeには1,000以上のErasure Codingリコンストラクションタスクが送られていた可能性があります。

このようなバグを見つけたときは、まず、すでにコミュニティにissue報告がされていないかやパッチがないかを確認します。この問題に関しては、すでにissueが上がってパッチも送られていましたが、ユニットテストで失敗していて、まだコミットされていない状態でした。そこで私たちは、失敗していたユニットテストを修正したパッチを送りました。

この問題の原因と対策

この問題の原因自体はすごく単純で、このStripedBlockReconstructionは、タスクが終わるとxmitsInProgressをデクリメントするのですが、その際にweightが考慮されていなかったために、意図しない大きな値でデクリメントしてしまっていたのが原因でした。

maxStreamsを下げるだけではまだ十分にNetwork Congestionを防ぐことはできなかったので、実際にはmaxStreams以外の設定も変更しました。上3つのNameNodeの設定は、Erasure Codingだけではなくレプリケーションにも作用する点に注意が必要です。またこの設定は決してベストプラクティスではなくて、データセンターを跨ったクラスタという、特殊な環境でのものであることも注意してください。逆にいうと、このレプリケーションやリコンストラクションのタスクのスピードを上げたいというような場合には、これとは逆の変更をするといいということもわかります。

この図は、先ほどの設定がどのコンポーネントに作用するかを表したものですが、時間の都合上、説明は省略します。ソースコードを読む際に参考にしていただければと思います。

xmitsInProgressのパッチを当てて設定を変更した結果、ECリコンストラクションタスクの数もトラフィックも、下げることに成功しました。ただ当然ではありますが、以前よりもリカバリーに長い時間がかかっているのがわかると思います。

今後の課題

まとめに入ります。Data Platform室では約1年間、Erasure Codingを運用してきましたが、データロスこそないものの、さまざまな問題に遭遇してきました。先ほども見たように設定の中には副作用があるものもあるため、Erasure Coding用のNameServiceを独立させたり、クラスタを分けることを検討しています。

またErasure Codingでアーカイブする対象のデータを、現在だと人間の目で見て確認しているんですが、これをラストアクセスタイムなどに基づいて自動的に判断する仕組みを開発したいと考えています。また、HDFS Archival Storageの機能を使って、1台のデータノードでより多くのデータを保存することも、検証していきたいと思っています。

それから、今回Erasure Codingリコンストラクションタスクをスロットリングするために、さまざまな設定変更をしましたが、その効果が予測しづらいという問題がありました。今後、より大きなスケールでErasure Codingを使っていくためには、HDFS BalancerのようにI/Oベースでトラフィックを制御する仕組みが必要だと考えています。

Erasure Codingの説明が少し駆け足だったので、今回の発表で説明しきれなかった情報については、Clouderaとヤフーの記事を読むことをオススメします。また(スライドに書かれている)本もオススメです。

最後になりますが、Data Platform室では、OSSへのコントリビューションやビッグデータプラットフォームの開発に関心のある方を募集しています。カジュアル面談も実施していますので、興味がある方はぜひ気軽にお声がけください。どうもありがとうございました。

質疑応答

司会者:内田さんご登壇ありがとうございました。ではQ&Aに入りたいと思います。まずErasure Codingについて、どのように利用しているかという質問ですが、お答えいただけますか?

内田:そうですね。コールドデータのみに対して利用しています。その理由としては、説明を省いてしまったのですが、Erasure Codingはスモールファイルの問題にレプリケーションよりも弱い点が挙げられます。

これは、例えば10MBのデータをレプリケーションする場合だと、3つのブロックで済むのですが、Erasure Codingの場合は、9のブロックが作られてしまって、HDFSのメモリを圧迫してしまうという理由があります。なので、基本的にはもうすでに作られたファイルなどを大きなファイルにマージして、マージしたあとでErasure Codingするのが理想的なので、現在ではあまり更新が発生しないデータに対してのみ適用しています。

司会者:はい、ありがとうございます。続いての質問ですが、ISA-Lは使用していますか?

内田:はい。ISA-Lは使用しています。

司会者:ありがとうございます。ではちょっと駆け足となっちゃったんですが、Q&Aは以上です。内田さん本当にありがとうございました。

内田:ありがとうございました。