Improving Spark SQL Performance

吉田啓二氏:LINEの吉田と申します。よろしくお願いします。

LINEでは「OASIS」という内製のBIダッシュボードツールを独自で開発して運用しています。LINEの各社員は、こちらのツール上でSpark SQLのクエリを書くことによって、データの分析やレポートの作成などが行えるようになっています。

私の発表では、このOASIS上でSpark SQLのクエリの性能改善したお話をします。

まずどういった課題感があったのかをお伝えしまして、次に、統計情報を取得することによって性能改善した取り組みについて。次に、独自の最適化ルールを適用することによって性能改善したお話をします。最後に、今後の導入に向けてCost-Based Optimizerの性能検証をしたお話をします。

では、まず最初に課題感についてです。

こちらが現在LINEでのOASISの利用状況のサマリーです。DailyのActive User数が200名ほど。1日あたりに接続されるSparkアプリケーションの数が1,300個ほど。1日あたりに実行されるSpark SQLのクエリの数が7,000個ほどあります。こういった使用状況になっています。

使用しているSparkのバージョンは2.4.0の最新版を使っていまして、使用しているOptimizerはルールベースのものを使っています。

課題としては、どうしてもエンドユーザーが自由にクエリを書いて実行するツールですので、クエリによっては遅いものが存在するといったことが課題になります。

こちらの表が、ある特定の2週間でトータルの実行時間が多いものTop 5です。

例えばランク1のQuery Aというものですと、この2週間の期間で実行された回数が335回、1回あたりの平均実行時間が20分、トータル実行時間が6,800分。こういったかたちでいくつか遅いクエリが存在するといったところが課題としてあります。

本来であれば、エンドユーザーが書いたクエリですので、エンドユーザー自身の努力によって性能改善してもらえればよいんですが、なかなかそれができない事情もいくつかありまして。

まず、そもそもエンドユーザー自身がどうすればクエリの性能改善ができるのかというスキルや知識が不足している場合。あとは、分析が主目的になっているので、パフォーマンスを常に意識してクエリを書く余裕がないといったところ。あるいは、すでにプロダクション環境で動いているクエリやテーブル構成については、途中でそれを大幅に変えることは難しいといった場合です。

そういったことで、本来であればエンドユーザー側で改善してもらいたいんですが、そうはできない事情もいくつかあります。

ですので、今回の取り組みの方針としては、エンドユーザーには作業負荷を一切かけずに、つまりはエンドユーザーが書いたクエリとテーブル構成は所与のものとして、OASISというツール上でできるかぎりクエリの性能改善をがんばるというところが、今回の対応の方針になっています。

統計情報の取得

それでは、まずは統計情報の取得の取り組みについてお話しします。

こちらは、あるクエリの性能を、もともと1,500秒かかっていたところを60秒まで短縮した取り組みになります。

小さくて恐縮なんですけれども、こういったクエリになっています。

あるdb.transactionテーブルというのがありまして、例えば、あるLINEサービスのログデータですとか、そういったものが入っているようなテーブルになります。

このWHERE句でdtカラムが指定されていまして、こちらがパーティションキーになっていて、db.transactionテーブルの中で1週間のパーティションキーのデータを取ってきます。

そういったようなサブクエリが「t」というエイリアスがついていて、それに対して、db.masterテーブルというマスター情報を管理するようなテーブルがあって、そのmasterテーブルとJOINして、GROUP BYしてカウントを取ると。こういったようなクエリになっています。

レコード数としては、このtransactionテーブルの1週間の期間のデータを取ってくる部分は約18億件、masterテーブルのほうは200件といったクエリになっています。これがもともと1,500秒かかっていたクエリになります。

こちらのクエリの統計情報を取ってみますと、こういったかたちになっています。ここで注目すべきは、INNER JOINの方法としてSort-Merge Joinが選択されているところです。

Sort-Merge Joinは何かといいますと、こちらで例を示しています。

Table 1・2、2つのテーブルをJOINする場合を今見ていますが、まずは最初に、各ExecutorがTable1・2のデータを手分けしてスキャンします。そのあとに、JOINするために一度各Executor間で「シャッフル」というデータの再配布を行いまして、JOINのキーが指定されているカラムの値が、同じレコードが同じExecutorで格納されるように、データの再配布が行われます。最後に、各Executor内で自分が持っているTable 1のレコードとTable 2のレコードをJOINして、全体として新しいJOIN後の結果セットを得ると。こういったものがSort-Merge Joinになっています。

このJOINの特徴としては、Table 1・2、両方のデータについてデータのシャッフルが発生している点にあります。Sparkではこのシャッフルの処理が重い処理と言われていて、「いかにシャッフルをなくすか」、もしくは「いかにシャッフルする前にデータボリュームを落としておくか」といったところが1つのチューニングポイントとしてあります。

もう1つのJOINの方法を見ますけれども、こちらはBroadcast Hash Joinというものです。

各Executorが各テーブルから手分けしてデータを、レコードをスキャンするところまでは同じでして。この例ではTable 2のデータサイズが小さい場合を見ていますけれども、その場合に各ExecutorでTable 2の全データ量を持つようなかたちでデータの再配布が行われます。これがBroadcastと言われている処理で。そのあとに、各Executor内で自分が持っているTable 1のレコードとTable 2のレコードをJOINして、全体として新しい結果セットを得ると。こういったものがBroadcast Hash Joinになります。

このJOINの特徴としては、Table 1のデータについてはまったくデータのシャッフルが発生していないという点にありまして。ですので、Table 1のデータ量が多い場合には、その大量のデータのシャッフルを抑えることで、より高速にJOINできる方法になっています。

ただし、各ExecutorでTable 2の全データをメモリ上に持つことになっていますので、このTable 2のデータサイズが小さい場合にかぎり使用できるJOINの方法になっています。

ルールベースのOptimizerであっても、このBroadcast Hash Joinを使うか・使わないかという選択は自動的に判断されるようになっていまして。その判断基準がspark.sql.autoBroadcastJoinThresholdという設定値になっていまして、デフォルトが10MBとなっていて、OASISでも10MBで運用しています。

ですので、先ほどの例で言いますと、このTable 2のデータサイズが10MBを下回っていればBroadcast Hash Joinを選択するし、そうでなければSort-Merge Joinを選択すると。そういった判断がルールベースのOptimizerであっても自動的になされるようになっています。

もともとのクエリに立ち返ってみると、このdb.masterテーブルについては、たかだか200件程度しかない。このデータ量は優にその10MBのしきい値を下回っているにもかかわらず、選択されるJOINの方法はSort-Merge Joinになっているというが課題としてあります。

これがなぜSort-Merge Joinが選択されているのかをもう少し見ます。

こちらのコードは「Sparkアプリケーションがdb.masterテーブルに対してどれぐらいのデータサイズを見積もっているのか」を見るためのコードになっていまして、実際に見てみますと、こういったかたちでものすごく大きな値が出てきます。

これ、64ビットのlong型のデータの最大値になります。

つまりは、Sparkアプリケーションがこのdb.masterテーブルのデータサイズを正しく推定できていないがために、Broadcast Hash Joinが使われずにSort-Merge Joinが使われています。そういった状況になります。

実行時間が1,500秒から60秒に

ですので、今回の対策としては、このdb.masterテーブルについては統計情報を取得して、Sparkアプリケーションが正しくデータサイズを見積もることができるようにしてあげる、というのが対応の方針になります。

こちらのdb.masterテーブルは、エンドユーザーがOASISというツール上でデータを登録しているテーブルになっていまして。OASISではこのOASIS.INSERTOVERWRITEというデータ登録用のAPIを各エンドユーザーに提供していて、各エンドユーザーはこのAPIを使ってデータ登録を行うようにしています。実際、内部的にはSparkのinsertIntoを実行しているんですけれども。

ですので、今回はデータ登録後に統計情報を取得するようにしたいので、このinsertIntoを実施したあとにANALYZE TABLEを実行するというのを、そういったかたちでAPIを修正するという対応を行いました。

これを行うことによって、db.masterテーブルについては、Hive Metastore上に統計情報が入るようになります。

先ほど見たSparkアプリケーションのデータの見積もりも、この統計情報にもとづいて正しくデータサイズを見積もることができます。

この値が先ほどのBroadcast Hash Joinのしきい値を下回っているために、実行計画もBroadcast Hash Joinに変わりました。

それによって実行時間が1,500秒から60秒まで短縮できたという結果になります。

独自の最適化ルールによる性能改善

では、次に独自の最適化ルールを適用することによって性能改善したお話をします。

こちらはもともと6,000秒かかっていたクエリが200秒まで短縮できた例になります。

クエリとしてはこういったかたちになっていまして。db.transactionテーブルがあって、今回はパーティションキーが約半年間のパーティションデータ取ってくると。これがエイリアス「t」という名前がついていました。

これに対して、もう1つ、db.areaテーブルという、areaマスターというテーブルがありまして、各都道府県とかそういった地域情報を管理するようなareaマスターテーブルなんですけれども、そのテーブルとJOINして、GROUP BYしてカウントを取るようなクエリになっています。

こちらもデータ量としては、半年間のトランザクションデータを取ってくる部分が45億件、areaマスターのほうが90件というデータボリュームになっています。

こちらも先ほどと同じような事情で、db.areaマスターテーブルについては統計情報が取られていないがために、Sparkアプリケーションが正しくデータサイズを見積もることができずに、Sort-Merge Joinが選択されてしまってるという状況です。

先ほどの例と違うのが、このdb.areaマスターテーブルが、OASIS上でエンドユーザーが自分で登録しているようなテーブルではなくて、OASISの外にあるバッチ基盤でApache Sqoopを使って、サービス側のMySQLからインポートされているデータであるという点になります。

Hive自体、このhive.stats.autogatherという設定値が有効になっていて、データの登録処理があった場合には、自動的に統計情報が取られる設定になっているんですが、1つ制約があります。この一番下に書いてありますLOAD DATAステートメントについては統計情報が取得されないという制約があります。

Apache Sparkは内部的にそのLOAD DATAを使ってデータ登録処理を行っているので、今回はこの制約条件に合致して、統計情報が取られていないという状況になります。

こちらもちゃんとバッチ基盤側で統計情報を取るように変えてもらえればいいんですが、それは今回、OASISというツールでコントロールできる範囲外ということで、今回はdb.areaマスターテーブルについては統計情報が取られていない状況においても、OASIS上でがんばって、Broadcast Hash Joinをさせたいというのが取り組みの方針になります。

統計情報が立たないテーブルに対してBroadcast Hash Joinをする1つの方法としては、このBROADCASTヒントを使うというものがあります。これはクエリ上にこのBROADCASTヒントを書くことによって、JOINの方法をBroadcast Hash Joinに強制できるというものになっています。

ただし、これをOASIS上で運用するためには、クエリを書いているエンドユーザー一人ひとりに対して、「今回の問題となっているdb.areaマスターテーブルを参照するクエリを書く際は、必ずBROADCASTヒント句を書いてください」といったような運用ルールを周知徹底しなければならないのは現実的に難しいので、今回の取り組みではBROADCASTヒントを使えないという状況になります。

BROADCASTヒントは内部的には、LogicalPlanに変更される際にResolvedHintというプランが挿入されて、それがLogicalPlanがPhysicalPlanに変更される際にBroadcast Hash Joinに変わる仕組みになっています。

こちらは2〜3年前のSpark Summitの発表内容です。SparkのOptimizeに対して、独自の最適化ルールを外部から注入できる方法が紹介されていました。

spark.experimental.extraOptimizationsというフィールドに対して、自分が定義した最適化ルールを持つインスタンスを設定することによって、クエリのLogicalPlanが最適化される際に、その最適化ルールに自分の独自ルールを埋め込むことができます。

今回はこれを使うことによって、先ほど見たとおり、クエリ上はBROADCASTヒントが書かれていないのにもかかわらず、BROADCASTヒントが書かれたときと同じようなLogicalPlanを再現しました。

実際に書いたコードはこうなっています。やっていることはJOINのプランがあって、JOINのほうは右か左かどっちかで、参照しているテーブルがdb.areaマスターテーブルで、なおかつ、それに対してdtパーティションのリテラルが当たっている場合は、このResolvedHintをLogicalPlanに挿入するということが行われています。

こちらが適用したあとのLogicalPlanになるんですけれども、この青い部分がその実際に挿入された部分です。db.areaテーブルを参照しているプランについては、一番上にこのBROADCASTのヒントが挿入されると。

これがPhysicalPlanに変更される際に読み込まれてBroadcast Hash Joinに変わると。

これによって実行時間が6,000秒から200秒まで短縮できました。

Cost-Based Optimizerのメリット

では最後に、今後導入したいと思っていて、このCost-Based Optimizerについてはどういったメリットがあるのか検証しましたので、その内容のご紹介になります。

Cost-Based Optimizerは何かというと、Apache Spark 2.2.0から導入された新しいOptimizerです。それまではルールベースのOptimizerが使われていたんですけれども、ルールベースのほうが事前に定義されたルールに従ってクエリを最適化するという方向性に対して、コストベースのほうは、テーブルやパーティション、統計情報にもとづいて最適化します。つまりは、実際のデータ量にもとづいてクエリを最適化するような方向性がCost-Based Optimizerになっています。

Apache Sparkでこのルールベースを使うかコストベースを使うかは、Sparkの設定値で変更可能でして。ただ、今のSparkの最新版の2.4.0でも、ルールべースがデフォルトで使われるようになっています。

ただ、Dataflix社が提供しているDataflixという分析環境ですとか、そういったところではデフォルトでこのCBOがTrueになっているので、今後その「ルールベースからコストベースへ」という流れが来るのかなというようなことをちょっと感じていまして、それでどういったメリットがあるのかを調べてみた次第です。

2つのメリット

私が見ていたなかで大きく2つメリットがあるなと感じています。1つは「Broadcast Hash Joinをするか・しないかという選択をより精緻化できる」というところと、2つ目が「複数のテーブルをJOINする際に、そのJOINの順番を最適化できる」という、2点になります。

Broadcast Hash Joinの選択については、先ほどの説明でもあったとおり、ルールベースであってもするか・しないかが自動的に選択されるんですけれども、ルールベースの場合はあくまでもそのテーブルとかパーティションのデータサイズに対してするか・しないかを判断すると。

なので、例えばある1億件のテーブルがあって、それに対してWHERE句がついていて、WHERE句の絞り込みで1件まで絞り込まれると。そういった場合には、本来であればBroadcast Hash Joinを使えるべきなんですけれども、ルールベースのほうはそのWHERE句の条件を加味したデータサイズが見積もれていないがために、Sort-Merge Joinが選択されます。

コストベースの場合は、そういった場合に、カラム単位の統計情報を使ってWHERE句の絞り込みを加味した上でどれぐらいのデータサイズになるかというところまで見積もれて、それに対してBroadcast Hash Joinをするか・しないかを選択できる。そういったメリットがあるのが1つと。

JOINの順番の最適化については、次のスライドで説明します。

こういったクエリを考えます。テーブルa・b・c、3つがあって、それぞれを結合するようなクエリになっています。

テーブルaのデータ量が約15億件と、テーブルbが4億件。テーブルcが1件と、こういったテーブルをJOINする場合を考えます。

こういった場合にJOINする順番は2パターンあって。1つ目が、テーブルaとbを最初にJOINして、次にテーブルcをJOINする順番と、もう1つが、テーブルb・cをJOINさせて、最後にテーブルaをJOINするという、2パターンのJOINの方法があります。

ルールベースでどちらが選択されるかを見てみますと、こうなっています。

まず最初にテーブルa・bを結合させて、最後にテーブルcを結合させる。ですので、ルールベースの場合は、大きいテーブルの2つをまずJOINして、その結果また大きい結果セットができあがって、それに対してテーブルcが当たっているという状況になっています。

先ほどのSort-Merge Joinのところでも言いましたが、いかにシャッフルの負荷を下げるのかがパフォーマンス上重要になってきます。ですので、できればトータルのシャッフルされるデータ量をなるべく抑えたい。

そのためには、一番最初のJOINでできるかぎりデータが絞り込まれるような、そういった順番でJOINするのが最適になるんですけれども、ルールベースの場合はそこらへんが判断できなくて、最初に大きいテーブル同士をJOINさせて、そこで大量のシャッフルが発生して、結果、また大きい結果セットができあがって、それに対してまたJOINが発生する、という非効率が発生しています。

Cost-Based Optimizerによってどう変わるか?

それがCost-Based Optimizerを使うことによってどう変わるのかといいますと、JOINの順番が変わりまして、最初にテーブルbとcを結合させてからテーブルaを結合させるというふうに変わると。

これによって、一番最初のJOINでできるかぎりデータが絞り込まれるような方法でJOINされることによって、トータルのシャッフルされるデータ量が減って、それによって実行時間も、ルールベースのほうは1,000秒かかっていたものが、コストベースのほうは100秒まで短縮されたという結果になります。

コストベースのOptimizerがどういった判断基準でそのJOINの順番を決めているのかといいますと、各JOINの順番とパターンについて「cost」というのを算出していまして、そのcostが一番低くなるようなJOINのパターンが選択されるようになっています。

このcostがどう算出されるのかといいますと、あるweight、これはSparkの設定値で変更可能で、デフォルトでは0.7になっています。0.7×各JOIN後の結果セットのレコード数のトータルの値。それ+0.3×各JOIN後の結果セットのデータサイズの合計値がcostとして算出されて、それが一番低いJOINの順番が選択させるようになっています。

ですので、一番最初のJOINでなるべくデータが絞り込まれたり、トータルのシャッフルのデータ量が一番減るようなJOINの順番が選択されやすくなっています。

Cost-Based Optimizerを使うためには、いかにテーブルとかパーティションの統計情報を取るかだけではなくて、いかにカラム単位の統計情報を取るかというのが重要になってきて。なので、いかに統計情報を取得すべきカラムを自動的に特定して、なおかつ、それらのカラムの統計情報を自動的に取るというところの仕組みが必要になってきます。

なので、今後そういった仕組みをしっかり検討して、Cost-Based Optimizerを使えるようにしていきたいなと思っています。

以上です。ありがとうございました。

(会場拍手)