分散システムをどうスケーラビリティ高く運用していくか

佐々木海氏(以下、佐々木):よろしくお願いします。Treasure Dataの佐々木といいます。

今日のTech Talkのテーマはデータ分析基盤ということで、Treasure Data自体がデータ分析基盤のプラットフォームを提供しており、その中ではいろんな分散システムが動いています。今日の私の話では、「その分散システムをどうスケーラビリティ高く運用していくか」というテーマで、それを支えるインフラにフォーカスしたいと思います。

私は2015年の10月あたりから現在までの約2年半ぐらい、Treasure Dataでソフトウェアエンジニアとして働いています。基本的にはクエリエンジンという、Treasure Dataで動いている分散クエリ実行エンジンはHiveとPrestoが主にあるんですけど、その2つをメンテするチームに所属しています。

いろんなことをやってきているのですが、最近やった仕事の大きなものとして、このPrestoのスケーラビリティを保つためのインフラを中心とした開発の話を今日のトピックとして話したいと思います。

まず、分散システムってけっこう高可用でハイパフォーマンスが出るようなスケーラビリティの高いシステムを、最近だとビッグデータを分析するような処理エンジンとして提供されているものが多いのですが。言うは易しで、それを毎日の業務として使うのはとても大変なものです。

それをうまい具合に使うためには、実際に運用する人々がいて、そのためのインフラがあります。今日は、「Treasure Data内で使われている分散システムをうまく提供するためのインフラをどのように作ったか」と、それができる前のつらみも含めて話したいと思います。

「Treasure Dataで使われている分散システムになにがあるか・どんなものか」と、それを運用していく上でつらかったことがいっぱいあるのですが、そのつらみを話します。そのつらかったことで解決したこともたくさんある。全部は解決していないのですが、解決したことがたくさんあるので、それをどうやったかという話をします。

Treasure Data内で使われている分散システムについて

佐々木:Treasure Dataのプラットフォームはクラウドサービス、主にAWSで構築され、このセッションでは「その機能を使ってどう解決したか」という話にフォーカスしています。まず「Treasure Dataで使われている分散システムの話となにをやったか」、最後にオートスケーリングを活用した話をします。

Treasure Data内で動いている分散システムは、先程も言ったとおりPrestoとHiveがあります。

Treasure Dataは、データのインポートからデータの保存、そこから分析までをEnd to Endで行うためのプラットフォームをクラウドサービス上で提供していて、そのingestionはモバイルデバイスや、Fluentdを介してサーバからデータを入れることができます。バルクのかたちでバッチ処理としてデータを入れることもできますし、継続的にデータが流れるストリーミングのデータとしてデータを入れることもできます。

データを保存しているのは、Plazmaと呼ばれているコンポーネントで、メタデータをPostgreに、実際のログデータがAmazonのS3に入っている、Treasure Dataが作ったログ分析のためのデータストレージエンジンです。

ここにデータが全部入っており、Plazma上のデータに対してPrestoやHiveでクエリをかけて、コンソールからSQLを書いて分析したり、あるいは今Treasure Dataが最も力を入れているプロダクトであるCDPからのバックエンドとしてPrestoやHiveを使ってデータを分析します。これが主なパイプラインになっています。

このPrestoやHiveがどうなっているかというと、まずTreasure Data内で使われているHiveは、ちょっとバージョンは古いのですが、今は0.13を使っています。monthlyで400万のクエリが動いています。約250兆のレコードと、これはデータサイズで3PBぐらいのデータを処理しています。

Prestoも同様で今0.188というバージョン、を使っています。1,400万クエリで1,050兆ぐらいのレコード、データサイズでいうと19PBのデータを月に処理しています。

プラグイン内のパブリック・クラスをメンテナーに消された話

佐々木:HiveとPrestoの使い分けとしては、Prestoはインメモリのエンジンなのであまりにデータが巨大だと処理できないクエリが出てきます。そういった場合にはHiveにジョブを投げてもらうようにしています。基本的にはデータ量が多いETLのジョブはHiveでやって、どちらかといえばデータが少ない処理、アドホックなクエリをPrestoでやってもらうことにしてもらって使い分けをしています。

HiveもPrestoも最新版と若干乖離があるのですが、コアバージョンのアップグレードはとてもつらい作業の1つで、今日この話はあんまりしませんがまだ完全には解決できていないものです。HadoopやPrestoのバージョンアップをやったことがある人はわかると思いますが、バージョンアップするごとに互換性が壊れます。

Prestoの場合は、頻繁にバージョンが上がるのですが、クエリのsyntaxのcompatibility以上にコードベースのcompatibilityも壊れます。一番つらかったのは、Prestoのバージョンを上げると使っていたクラスがけっこうなくなることです。

メンテナーに「なんでこのクラスなくなったの?」って聞くとまぁ「使ってないから」って言うんです。Prestoはプラグイン機構を提供しているんですけど、そのクラスはプラグインの中で使われているパブリッククラスだったんですね。ただ、リポジトリの中では確かに誰も参照していないようなクラスだったので、メンテナーは使っていないだろうと思って消したらしいのです。

どうしてプラグインで使われているパブリックなクラスを誰も使っていないと思ったのかなというのはすごく不思議に思ったのですが、なかなかそれをわかってもらうためにはコミュニケーションが更に必要だなと思い、今でもこのバージョンアップに関してはつらい思いを続けています。そしてまだ解決できていない問題の1つです。

スケールするとどうしてクエリが速くなるのか

佐々木:HiveとPrestoの簡単な仕組みを説明すると、どちらもマスター・スレーブの方式でクエリを分散させて実行します。

HiveではApplicationMaster、PrestoではCoordinatorと呼ばれるマスターサーバがいて、こいつがクエリの分散実行計画を立てて、タスクを各Workerコンポーネント、Slaveコンポーネントに分散させます。

このSlaveコンポーネント、Hiveの場合はContainer、Prestoの場合はWorkerと呼ばれていて、こいつを水平スケーリングさせることで高速に安定してジョブを実行することができます。

今回はオートスケーリングはこのマスターのほうではなく、「Workerをどうやって簡単に安定してスケールさせるか」という話です。まずその前に、「そもそもなぜここをスケールさせるとクエリが速くなるのか・安定するのか」という話をします。

普通のシングルノードのマシンでこういった分析SQLを書いた場合だと、1つのクエリは、1つのノードで、1つのメモリで、1つのマシン上で動きます。

これを分散させるためにはなにをしなきゃいけないかというと、1つのSQL、ジョブを受け取ったときに「タスクフラグメント」と呼ばれるものに分解します。

例えば、S3からデータを読んでくる部分や、JOINさせる部分、あとはフィルタリングさせたり、データを変換させたりするような個々のタスクに分けます。こいつらをバーっと複数のWorkerノードたちにばら撒くことで、1つのクエリを複数のマシンで実行するということを実現します。

それがなんで速くなるかというと、主に速くなるケースが2タイプあって、まず1つ目はJOINですね。

OLAP処理だとJOINオペレーションはとてもよくあるパターンで、こいつがシングルノードで走らせた場合は、右側も左側もどっちのテーブルも1つのマシンで乗っけて、JOINのキーがマッチするかどうかをループで回したりする必要があります。

これが、2つのテーブルを両方の1つのマシンに乗っけるというメモリやディスクを食うもので、それを1つのCPU、マシンで1個ずつキーをマッチさせていく処理も同様に重たいものになります。

ところが、これを分散させるとどうなるかというと、Shuffleという処理を走らせることができて、右側か左側のテーブルのキーでレコードを分散させてあげると、1つのマシンで処理させるデータを減らすことができます。かつ、同じキーのデータが揃っているので、そこだけでマッチさせることでその部分だけはJOIN処理が完結します。

OLAP処理ではよくあるこのJOINというとても重い処理を非常に効率的にできるのが、この分散クエリエンジンの大きな利点です。

日常業務のなかで水平スケールすべきもの

佐々木:もう1個が、JOINと並んで重たい処理がデータを読み込んでくる部分ですね。

とくにバックエンドがS3などのクラウドストレージだと、クエリエンジンが動いているところからかなり離れた場所にストレージがあることになります。

この場合には、高いレイテンシがデータを読み込むときに発生しますし、全体のクエリの実行に対してデータの読み込みの部分のオーバーヘッドはかなり高くなります。この重いネットワークIOを分散させることができます。

どうやるかというと、読み込んでくるデータをあるキーやある時刻でパーティショニングしておくことで、各Workerノードが読み込んでくるデータを限定させることができます。

こうすることで1つのノードが全データを読み込んでくる必要はなくなって、あるあるWorkerノードは、特定の時刻の範囲のレコードだけを読んだり、特定のキーを持ったレコードだけを読んできたりということができるようになります。

こうすると、テーブルを読み込んでくる、S3からデータを読み込んでくる処理をみんなWorkerに等しく分けることができて、データ処理自体が高速になります。なので、分散クエリ実行エンジンにおいて水平スケールさせるのはとても大事なことです。

ただ、技術的にHadoopやPrestoは水平スケールさせることはできるんですが、それを日常の業務としてあるいはサービスとして提供していくなかでやっていくにはとても重い作業になります。

水平スケーリングさせるようなシステムは多くあります。分散クエリエンジンの水平スケーリングを行う上での課題をいくつか述べますと、例えば普通のWebアプリケーションではWebサーバとデータベースは分かれていると思うのですが、そうすることでWebサーバにはステート、状態が残らないので、彼らを水平にスケールさせることができます。

HadoopやPrestoとの大きな違いはそこになります。データがWebサーバのほうにはないんだけど、HadoopやPrestoにはデータがあるわけですね。なぜかというと、そこでデータを処理するからです。データが残っていると簡単にノードを追加したり減らしたりということがすごく困難になります。このWorkerたちをステートフルな状態にさせないというのが大きな課題になります。

1つずつやると時間がかかってしまうものをシステム化する

佐々木:次にあるのが、インスタンス自体を立ち上げるのに時間がかかる。クラウド上でインスタンスを立ち上げるのは、実際に「よし、スケールさせよう」と思ってサーバを買ってくるよりはずっと速いのですが、やっぱりそれでも何分かかかってしまう。

とくにジョブのワークロードのピークやスパイクがあったときに、それをなんとか対処するためにスケールさせるときに時間が2〜3分かかってしまうと、かなりクラスタがunstableになってしまうので、この時間を早くさせることは重要です。

あとはインスタンスを立ち上げるときに、もちろん空っぽのままで立ち上げるわけにはいかないのでアプリケーションをそこで動かす。ソフトウェアをインストールしたりコンフィグレーションを作ったり、いろんなbootstrap処理があるのですが、こいつがきちんと終わったかどうかを各インスタンスでトラッキングする必要があります。人でやるとこれまたかなり大変なので、簡単にする必要があります。

もう1個コンフィグレーション関連で大変なのが、分散システムだと、さっきもあったとおり、マスターとWorkerスレーブノードで役割が違うんですね。だから入っているソフトウェアも違うかもしれないし、コンフィグレーションも違うかもしれない。

そのときに、どのインスタンスにどんなコンフィグが入っていて、どのインスタンスにどのバージョンのパッケージが入っているかというのをマネージする必要があります。これをやっぱり人手でやると大変なので、うまいことやってあげる必要がある。

次がDeployment Verification。デプロイがうまくいったかどうかというのを確認する必要があるのですが、台数が多いと大変だし、さっきも言ったとおり個々の分散システムでのロールによってどういうチェックをする必要があるかというのはぜんぜん違うので、それをマニュアルでやるのは考えがたいわけです。

リリースマネジメントのためにリバージョンとパッケージを確認する

佐々木:次のGraceful Shutdownに関しては、クエリ実行エンジン特有のものだと思うんですけど、Webサーバと違うのは、1つのリクエストが終わるまでに時間がかなりかかる場合があります。HadoopやHiveのジョブだと何時間もかかりますし、Prestoのジョブでも何十分もかかるものが大半です。

スケールイン・スケールアウトさせる場合、とくにスケールインの場合に、インスタンスをシャットダウンするときクエリがまだ走っているのにインスタンスをシャットダウンしてしまうと、クエリが落ちてしまう。そのジョブをリトライしなきゃいけないと、クエリをサービスとして提供している側としては、クエリ実行エンジンをreliableな状態でサービスとして提供するのは難しくなるわけですね。

なので、クエリを失敗させずにスケールインさせる、ノードをシャットダウンさせる仕組みが必要になります。これもやらないといけないことです。

もう1個が、Treasure DataはAWS以外にもいろんなクラウドの中でサービスを提供しています。また、USリージョン以外にも、東京リージョンであったり、ほかのロケーションでサービスを提供しています。それぞれの中で違うバージョン、違うコンフィグレーションでアプリケーションを立ち上げたときに、それをマネージする必要がやっぱりあります。

「どのリージョンでどのパッケージが動いているか」というのがわかるようになっていないとリリースマネジメントとしてうまくやっていけなくなるので、これを解決する必要があります。

最後、これがAuto Scaling Groupで一番やりたい大きな夢なのですが、今のクラスタのワークロードに対して「どのぐらいのサイズにすればいいか」というのを、手で人が考えるのはこれまたつらい作業なので、これを簡単にさせたい。

以上8つぐらいあるんですけど、分散クエリ実行エンジンをスケーラブルな状態で保つためには、Treasure Dataでの経験として「このぐらいつらいことがありました」「これからこれをどうやって解決したか」「これをどうやって解決したか」「そのために必要な機能とかソフトウェアはなにを使ったか」という話をしていきたいと思います。

エラー時のためにジョブの中間データを残しておく

佐々木:まずは、Stateful Datasource。スケールイン・スケールアウトを自由にさせたいこのクエリ実行エンジンに対して、データ、ステートを持たせない、なくしたくないものを持たせないにはどうしたらいいかという話です。

これは簡単で、さっき出てきたPlazmaというデータストレージシステムで、こいつらにお客様から預かったログデータなどをすべてストアしています。

こいつらからデータを読み取るようにHive、Prestoでデータのコネクタレイヤを作っています。Prestoの場合はもともとConnectorというプラグインが、さっきも言ったよく壊れるプラグインがあるのですが、Plazmz用のConnectorを作っています。

Hiveに関しても、Hadoopのインプットフォーマットをちょっと書き換えて、このPlazmaからデータを読み取れるようにしています。

こうすることでHiveやPrestoで残っているものはジョブの中間データだけになります。最悪「ジョブが失敗した」「クラスタをシャットダウンした」としても、消えるものはログデータではなく、そのジョブの中間データだけになるわけですね。

そうすると、クラスタをシャットダウンしても、もう1回ジョブを新しいクラスタで実行さえすれば、データはPlazmaのほうに残っているのでもう1回まっさらな状態からデータ分析をすることができます。こうすることでクラスタをかなりdisposalにできるのが大きなメリットです。

もう1つ大きなメリットとしては、デプロイが楽になります。どういうことかというと、HiveやPrestoクラスタをBlue-Green Deploymentでリリースすることができます。

データストレージレイヤはもうパーマネントな状態で残っているので、新しいクラスタを立てて、どっちにしてもこのPlazmaからデータを読み取ることは変わらなくて、お客様から見えるクラスタを新しいほうに切り替えてあげればいいだけです。別に新しいほうのクラスタと古いほうのクラスタで状態が違ってもいい。どうせジョブの中間データしか残っていないので状態が変わっていても大丈夫というのが大きなメリットです。