Optimizer

上新卓也氏:これでLogical Planにキャッシュを使うプランが含まれてきたので、その次の処理としてはOptimizerですね。

これは今までプランの書き換えなどはやってこなかったんですが、ここからプランをガシガシと書き換えていってより効率のよい処理ができるようにプランを最適化していきます。

Optimizerはheuristicsとcostベース、経験則とコストベースでプランを書き換えていきます。今ここに6つ代表的なものを挙げていますが、すでに50以上のルールが組み込まれています。それらを自動的に利用することができます。

ただ、50以上もあってもすべてのケースに有効とは限りません。特殊なワークロードやクエリを実行したら、もしかするとあまり役に立たないかもしれません。そんな場合には、独自のOptimizerルールや、次のところで説明するPlannerを自分で実装して、Sparkの処理の中に組み込んでいくことができます。

ルールの実装については実際のルールを見て……まあ正直ちょっと難しいんですけれども(笑)、これを組んで、登録自体は……まず最初の候補としてはSparkSessionExtensionsという機能があります。

これはSparkSessionを起動するときに、OptimizerとPlannerにかぎらずあと2、3個差し込みができるんですが、先ほどざっと言ったフローの中にいくつかプラグインのポイントがありまして、ここにはいろいろなルールや、PlannerについてはStrategyを差し込んでいくことができます。

以前のSpark SummitでExperimentalMethodsを利用する例を解説しているビデオがありますので、興味がある人がこちらのリンクを見てみるとビデオが公開されています。

Planner

ここまででLogicalフェーズについて説明してきました。ここから先は、Physicalフェーズになります。

まずはじめがPlannerです。これは何をするかと言うと、今までLogical PlanだったのがPhysical Planになります。

Logical Planは何をしたいのかというのを表していたんですが、Physical Planになると今Logical Planで定義してある「何をしたいのか」というのをSparkがどのように実行するのかというHOWに変換します。WHATからHOWという感じですね。Sparkをどうやって使ったら実行できるかというものに変換します。

この変換の際に複数の実行方法がある場合があります。例えばJoinをすることになると、よくあるbroadcast hash joinであったり、sort merge joinであったりという選択肢、ほかにもいっぱいやり方はありますが、このような選択肢があります。

Spark SQLはこの中からどちらがコストが低いかを推定して、コストが低くなるものを選択します。broadcast hash joinとsort merge joinが候補に上がったとしたら、普通はbroadcast hash joinのほうがコストが低いのでそちらを使うようになります。

broadcast hash joinはさっきもちょっと言いましたが、通常はsort merge joinのようなシャッフルを伴うJoinですね、これを使わせるためにはtableの統計情報が必要になります。

まず1個目は設定値がありまして、spark.sql.autoBroadcastJoinThresholdというのがあります。tableの情報などから推定したデータサイズがこのThresholdより小さくなったときに、Broadcastを選択するというような動きをします。

なので、もし各サーバーが大きなメモリを持っているとすると、この値をググっと大きくしてBroadcastを使うようにしたほうが、より高速な動きが期待できます。

クエリの出力、tableの大きさというのは統計情報がありますが、この統計情報を最新に保つためにANALYZE TABLEコマンドを使ってそのtableの統計情報を確実に最新になるように調整していく必要があります。

最後に、強制的にBroadcastさせる方法もあります。それがこのbroadcastJoinヒントというものです。片方のデータが確実にメモリに載るということがわかっているにもかかわらず、なんか知らないけどSpark SQLのほうがどうしてもシャッフル系のJoinを選んでしまうという場合には、このbroadcastJoinヒントをクエリの中に付けることでSpark SQLに確実にbroadcastJoinをさせるということができるようになります。

もう1個Joinについて、これはわりとよく知られていますが、Joinの条件の中にはequal条件を付けるというかたちですね。下のように範囲の、大なり小なり的なやつだけだとどうなるかと言うと、上の例のように2つのtableを二重ループするというような動きになります。

直積やデカルト積とか、そういうあれになってしまって、これは非常に効率が悪いんですけれども、equalの条件があればhashを使って絞り込みができたり、equalでつながっているJoinキーをもとに各ノードにデータを分散しても大丈夫なのでJoinの速度は速くなり、パフォーマンスは上がります。

Query Execution

最後にQuery Executionです。

これは最終的に選択されたPhysical PlanをもとにRDDを実際に駆動するというところですね。ここで登場人物が3人、主要な役割をする人が出てきます。

1個目がMemory Manager、2個目がCode Generator、3個目がTungsten Engineです。

Memory Managerはメモリの利用状況を追跡してtaskやoperator間のメモリを調整します。Code GeneratorはPhysical PlanをコンパイルしてJavaコードを生成するものです。

ちなみにその生成されたJavaコードは例えばボクシングとかアンボクシングとか有名なボトルネックになりがちなやつがあると思うんですけど、ああいうやつとか、メソッドのディスパッチとかのコストができるだけ小さくなるように作られているのでかなり速いです。

最後にTungsten Engineですけれども、これはCPUやメモリに対して効率的なバイナリデータフォーマット、データ構造を提供しています。

Memory Managerはメモリをどれだけ持っているかというのを管理するんですけれども、これは基本的にSparkのexecutorのJVMヒープサイズのことになります。(それ)を管理します。

しかし、Sparkはすべてのメモリを管理するわけではなくて、例えばNettyとかParquetとかの内部で使っているバッファサイズはMemory Managerは管理していません。こういったexecutor.memoryとかmemory.fractionという設定を監視外のメモリのためにちょっと余裕を持った設定に抑えておかないと、YARNにkillされたりとかいうことになりますね。

もう1つ便利なのがoff-heapメモリを使うことです。off-heapメモリを使うとGCのオーバーヘッドが抑えられるといった利点がありますが、これはデフォルトでオフになってます。なのでこの設定を有効にして、そのとなりのoffHeap.sizeをクエリに必要なメモリ分だけ設定してあげると、GCなどのオーバーヘッドが抑えられるようになって速くなる可能性があります。

これを設定するとデータの大部分がoff-heapメモリにいくんですが、これが先ほどのJavaHeapメモリの外になるので、さっきのexecutor.memoryの設定をそれに合わせてちょっと低く設定しても大丈夫ということになります。

Whole Stage Code Generation

次に、Whole Stage Code Generationです。

普通よくあるのは、こういうプランができたときに、それぞれ1個ずつCode Generationをしてメソッドコールをして順番に実行していくみたいなやり方です。

Spark SQLはこれらを1個のステージ内にあるこういったオペレーターを全部一緒にCode Generationして、こういうシンプルなかたちのコードにして、よりコンパクトで高速なJavaコードになるようになっています。

ただ、これはちょっと欠点があります。

JVMのことに詳しい方はご存知かもしれませんが、あるメソッドのバイトコードのサイズが8キロバイトを超えると、JITコンパイラがネイティブコンパイルするのを諦めてしまってそのまま……なんだっけ?

(誰かが発言する)あ、そう! ありがとうございます(笑)。 interpreter modeで実行してしまうのでかなり遅くなるんですね。

複雑なプランや入り組んだクエリを作ってしまうと、大きなメソッドを作ってしまう場合があります。WholeStage Codegenをオフにしたほうがむしろ速いという場合があります。

ここにhugeMethodLimitっていう設定がありますが、この値を設定すると、この設定値より大きなメソッドが見つかったらWholeStage Codegenをオフにして実行します。

様子を見てもし大きなメソッドができていてパフォーマンスがあまり速くない場合にはこのhugeMethodLimitの設定を8キロバイトとかにしておくとそれ以上の大きなメソッドになるとWholeStage Codegenは諦めるような動きになります。

データソースについて

ここまでがSpark SQLの中身の話ですね。

あとは実行、今のプランの前後にはもちろんデータの入出力があって、それがちょっと分離されているんですけれども、データ入力、データ出力が遅い場合にはどんなにSparkががんばってもそこがボトルネックになってしまって全体が遅くなる可能性があります。

一応これらの問題を改善するためにいろいろがんばっていまして、その1つはScanのVectorizationですね。例えばParquetやORCであればカラムナフォーマットというファイルフォーマットになっています。そういったデータがそのまま読めればより効率的になります。

また、そういったカラムナフォーマットであったりするとJVMがSIMDなどのハードウェアの機能を利用しやすくなるので、いろいろ利点があります。以前のリリースでParquetのリーダーをVectorizedの実装をしたところ、約10倍くらい速くなったことがあります。

もう1個は、先ほどもすこし触れたPartitioningとBucketingです。

データソースはファイルベースのもの、Parquet、ORC、CSV、JSONとかの場合はPartitioningとBucketingを検討するようにしてください。

これによって、不要なIOとシャッフルを避けられるようになるのでスピードアップを期待することができます。

この辺の例も以前Spark Summitでトークがあったのでこちらのリンクをたどるとビデオを見ることができます。

というわけで、データソースとしては、データの入出力としてはvectrized readingが可能なファイルフォーマットをできるだけ利用するようにしたほうがいいです。今だとParquet、ORCが対応しています。またファイルベースのデータソースであればPartitioningやBucketingを検討してみてください。

以上になります。ありがとうございました。

(会場拍手)