Spark SQLの仕組みとパフォーマンスチューニング

上新卓也氏:それでは発表を始めます。『Deep Dive into Spark SQL with Advanced Performance Tuning』ということで、Spark SQLの内部の詳細とそれらを応用したパフォーマンスチューニングについてお話します。

Databricksでソフトウェアエンジニアとして働いています。Apache Sparkのコミッターで、TwitterやGitHubは基本的にueshinというアカウントで活動しています。

DatabricksはApache Sparkの開発初期メンバーによって設立された会社で、Sparkをベースとしたデータサイエンスであったりエンジニアリングであったりを統合的に扱うUnified Analytics Platformを提供しています。このクラウドサービスはAzureやAWSで利用可能ですので、興味のある方はご利用ください。

それではSpark SQLの話に移っていきます。Spark SQLはSparkのコンポーネントの1つで、スケーラブルで効率的にリレーショナル処理を行うためのものになっています。

Spark SQLはさまざまな異なるデータソース、例えばCassandraやKafkaやRDBMSなどにあるデータや、またファイルベースのさまざまなファイルフォーマット、Parquet、ORC、CSV、JSONなどのデータを読み込んで統合して解析や処理をすることができます。

APIとしてはSQLをはじめとしてそれ以外にもPython、Scala、Java、Rといった言語から利用できるAPIを提供しています。

Spark SQL is not only SQL

実際のところはSpark SQLとは言うんですが、SQLのためだけのものではありません。Sparkのエコシステムの新しいコアとなりつつあります。

ざっくりこんな感じになっています。

Spark SQLは一番上のところにある、SQLやDataFrame、Datasetといったこの辺がUser APIとして提供してあります。

そのクエリが、その下にあるCatalyst OptimizationやTungsten execution engineと呼ばれるもので実行されます。データについてはData Source Connectorsがありまして、標準でビルトインで提供してあるものと、APIは公開されているので独自で実装することもできるようになっています。

現在、DataSource APIのバージョン1はすでに提供されているんですけれども、より洗練されたかたちとしてv2の実装が進行中です。Spark3.0までにはみなさんが利用できるようになる見込みになっています。

この濃い青い部分がSpark SQLの中身です。この上にStructured Streaming、ストリーミング処理のライブラリや、機械学習のためのMLlib、あとはグラフ処理のためのGraphFrameなど、その他いろいろなプログラム、ライブラリが実装されています。

SQLやDataFrame、Dataset APIを使って書いた、みなさんが作るSparkのアプリケーションもこれから説明するSparkSQLの最適化であったり、そういった高速化のためのいろいろな機能の恩恵を受けることができるようになっています。

Spark SQLの処理の流れ

Spark SQLというのは本質的に何かと言うと、みなさんが定義するクエリからSparkを駆動するためのRDDのコンパイラですね。クエリからRDDに変換するためのコンパイラが本質的な動きになります。のちほど1個1個詳細に追っていきますが、ざっくりと処理の流れを見てみます。

まず先ほどのUser APIが一番左にあって、ここで何をしたいのかをみなさんに定義していただきます。そのAPIで作ったプランがここにあって、これがいわゆるASTとか、Abstract Syntax Treeと呼ばれるものです。この時点では例えばどの部分がtableを表すとかそういうことはまだわからないかもしれない。

そのあとのAnalyzerであったり、ここにMetadata Catalogがありますけれども、このあたりを使ってそれぞれのツリーのノードがどういうことを表しているのかを解析します。そのあとにOptimizer、Plannerがheuristics、つまり経験則、もしくはコストに基づいたルールで実行を最適化していきます。

最後にできあがったPhysical Planに対して、これの動きを実際にRDDで動かすためのJavaコードを吐いて、それをコンパイルしてJVM上で実行するというような流れで処理を行なっています。

間にちょいちょいチューニングの話が入ってくるんですけれども、一番基本的な話ではこのプランを見て解釈してそれをチューニングしていくというのが基本的なSpark SQLのチューニングの話になります。

このプランはSQLでEXPLAINコマンドであったりとか、Dataset、DataFrameAPIであればexplain()というメソッドが提供されているのでそれを使ってプランを見るというかたちになります。

今日の発表ではプランの見方などには触れませんが、この間あったHadoop / Spark Conferenceのときに弊社のマネージャーが来ていまして、その人が実際の例に沿ってプランの見方やそれに対するいろいろな解釈を用意しています。そのスライドが公開されていますので、興味がある方はHadoop Conferenceで探して見てみてください。

Declarative APIs

詳細について1個1個見ていきます。

一番はじめの左のほうにあるやつがUser APIですね。

Spark SQLのUser APIは宣言的APIです。何をしたいのか、何がほしいのかを宣言的に組み立てます。まあSQLとかはそういう言語ですよね。

SQLのAPIとしては標準SQLの2003、もしくはHiveQLを利用することができます。あとそれ以外にDataset、DataFrameのAPIがありますけれども、これはSQLで表現しきれない範囲であってももっといろんな処理を書くことができるようになっています。

Dataset、DataFrameについてはさっきもありましたが、Java、Scala、Python、R、それぞれの言語に組み込まれた普通のライブラリのようなインターフェイスを持っていまして、かなり便利に使うことができます。

ちなみにDatasetについてはJavaとScalaのコンパイラの機能を使っている関係上、PythonやRではサポートされていません。SQLとかDataset、DataFrameを使ってプログラムを書いていくと自動的に裏側ではUnresolvedなLogical Planというのができます。

Metadata Catalog

この次の処理としてはAnalyzerがMetadata Catalogを使ってどのノードが何をしている処理を表すのかというのを解決していきます。

MetadataのCatalogには4種類あります。

1つ目はHive metastoreですね。これはtableのためのカタログです。設定によってPersistentにすることができるので、Sparkアプリケーションを1回落としてまた起動して続きをやるみたいなこともできるようになっています。

次2つはtemporary view managerとglobal temporary view managerで、これは違いとしてはSession-localであるかCross-sessionであるかと。sessionをまたいで使えるかという違いがあります。

最後にfunction registryというのは関数ですね、関数のカタログになっています。これはSession-localなので、UDFとかを定義してあるsessionで登録しても別のsessionでは新たに登録し直さないといけないので、気をつけてください。

CatalogについてPerformance Tipsですけれども、Hive metastoreからPartition metadataを取得するのに非常に時間がかかる場合があります。この場合にはまず1個目はとりあえずHive metastoreのアップグレードをしてみてください。新しいバージョンではパフォーマンスが改善しているので、できるだけ新しいバージョンを使ったほうがいいです。

可能であればCardinalityの高いパーティションカラムを避ける、ですね。パーティションというのはまたあとでもう1回説明しますが、データのセレクトをするときに余計なファイルを読まないようにするような機能があります。Hiveでもあるので、ご存知の方も多いと思います。

それがあるんですけれども、あまりにもCardinalityの高いパーティションがあるとHiveのMetastoreのパフォーマンスが落ちるので、あんまり高すぎると良くないという感じですね。

せっかくパーティションを使っているのであったら、すべてのパーティションを使うのではなく、パーティションの絞り込みを行うような条件をクエリに付けるようにしてください。

Cache Manager

これで、MetadataのCatalogからtableであったり関数であったりとか、そういうものをいろいろと解決していくわけです。

次ですね。Optimizerの処理の前にCache Managerが入ります。キャッシュをまず先に差し込みます。

Cache Managerはクエリのプランを調べて、そのプランの一部がキャッシュのプランと一致した場合に、そのプランの一致した部分をキャッシュのプランに置き換えます。

Cache ManagerはCross-sessionなので別のsessionで意図せずキャッシュを使っていた、みたいなことがあって。基本は速くなるかもしれませんが、「あれ、おかしいな」みたいなことになるかもしれないので、これは気をつけたほうがいいかなと思います。

キャッシュデータはキャッシュを使うクエリが初めて実行されたときに作成されます。また、依存しているtableやviewのデータが更新されるとキャッシュを一旦破棄して、次回利用時に再度キャッシュを作るということをします。

これは依存しているtableのデータの更新があったにもかかわらず前のキャッシュを使っていると、なんかtableと整合性が取れないよ、みたいなことになるので、もとになるデータが更新されたらキャッシュをクリアするというようなことですね。

ここでTipsなんですけれども、実はキャッシュは必ずしも速いというわけではないんですね。キャッシュは基本的にメモリにあるんですけど、メモリがいっぱいになってしまうとディスクに書いてしまいます。メモリを1回クリアして、次に利用するときにまたディスクから読み直すということをします。そのため、場合によってはキャッシュではなく、それまでのクエリをもう1回実行したほうが速い場合があります。

なのでメモリがいっぱいあるからいいやとガシガシとキャッシュを使っていると、むしろ遅くなる場合があるので、必要のないキャッシュはしないようにして、ピンポイントで「ここは絶対必要」みたいなところにピッピッピッと置いていくのがいいかなと思います。