Accelerator Aware Scheduling

猿田浩輔氏(以下、猿田):Barrier Execution Modeのお話はここまでで、次はAccelerator Aware Schedulingですね。最近Project Hydrogenの中ではこの機能の議論が活発で、Spark3.0への導入に向けて議論が進められているという段階になっています。

これは何かと言うとGPUとかFPGAとか、そういったアクセラレータが利用できる計算機を認識してExecutorを起動したり、それからアクセラレータの利用が必要なタスクを適切なExecutorにスケジューリングしたりとか。そういったことを可能にする仕組みになっています。

最近だと機械学習とかディープラーニングでは当たり前のようにGPUなんかが使われていると思うんですけれども、そういった需要に応えるための機能になっています。

今のところの機能ではStandaloneとかYARNとかKubernetesとか、3つのクラスタマネージャを対象として開発が行われる見通しです。Hadoop3.1.2からYARNがGPUをコンピューティングリソースとして管理できる機能が加わったので、YARN向けにはこの機能を利用するという方向で開発が進められています。

アクセラレータがどのくらい積まれていてどのくらい利用できるかというのはYARNなどのクラスタマネージャの機能を使うことで実現できるわけなんですけれども。今Sparkのjobを導入するときにどのくらいのコアとかメモリが必要かという要求を設定するオプションがあるかと思うんですけれども、それと同じようにどんなアクセラレータがどのくらい必要かという要求を設定できるような機能が追加されると思われます。

一方Sparkのスケジューラについても手が入る予定で、現在のSparkのスケジューリングというのはExecutorに割り当てられているコアを認識してタスクのスケジューリングを行います。これと同じようにExecutorにどのくらいのアクセラレータが割り当てられているか認識してスケジューリングすると。こういったスケジューラの改良が加えられる方針です。

アクセラレータスケジューリングの現在のステータスとしてはまず当面はSpark3.0向けにGPU向けのスケジューリング機能が実装される見通しです。

Spark Graph

今日最後のトピックになるんですけれども、最後のトピックはSpark Graphですね。Spark Graphは何かと言うとSparkの新しいグラフ処理ライブラリです。

Spark向けにはこれまでGraphXであるとかGraphFramesなどに代表されるようなグラフ処理ライブラリがあったわけなんですけれども、これらの課題を解決するものとして新たに開発が検討されているものです。

Spark Graphの話をする前に少しだけこれまでのSparkのグラフ処理ライブラリの状況であるとか問題点といったものを見ていきたいと思います。

Sparkには古くからGraphXと呼ばれるグラフ処理向けのライブラリが付属しています。今ではかなりつくりが古くなっているんですね。例えばMLlibだとすでにDataFrameに対応してますし、ストリーム処理向けにもStructured Streamingと呼ばれる新しいDataFrameに対応したストリーム処理系が開発されたんですけれども、GraphXだけはいまだにRDDをベースとしたライブラリになっています。

しかもScala向けのAPIしか整備されていなかったりとか、あとかなり致命的なのがあまりメンテナンスされてない、メンテナンスしている人がいないと、こういった問題もあります。

こういう課題を背景にサードパーティーパッケージでGraphFrames、もしかしたら使われている方もいらっしゃるかもしれないですけれども、DataFrameをベースにしたグラフ処理ライブラリも開発されました。

これはグラフをDataFrameで表現できるので、Spark SQLの最適化の恩恵が受けられるというメリットがあります。なんですけれどもグラフのノードとかエッジのセマンティクスがちょっと弱くてですね。グラフとしては単純なマッチングしか行えないと、こういう欠点もありました。

GraphFramesのデータモデル

最後のところが少しわかりづらいかもしれないので、GraphFramesのデータモデルをもう少し見ていきたいと思います。

GraphFramesではノードやエッジの集合をDataFrameで表現します。DataFrameの各レコードがノード1つエッジ1つに対応するというイメージですね。

DataFrameのレコードとして表現されるわけなので、ノードとかエッジに属性を付与することができます。なんですけれどもノードとかエッジそのものに型を定義することはできません。

例えばこのグラフを例に説明すると、緑のノードは都市、青のノードは人を表しています。つまり各ノードが違った種類のものを表しています。同じようにエッジについても色ごとに種類が異なる関係を表現しています。しかしGraphFramesではエッジとかノードの型を設定できないので、グラフとしてはこれらの種類を区別することができません。

GraphFramesではMotifsと呼ばれる簡単なクエリなんですけれども、このクエリによってグラフマッチングが行えます。なんですけれどもGraphFramesは型が定義できないデータモデルですので、グラフの形状に基づくマッチングしか行えないという欠点もあります。

グラフマッチングの結果はDataFrameとして返されます。このコード例で言うところのmotifsという変数がDataFrameになるわけです。

グラフマッチングの結果はDataFrameとして得られるので、形状に基づいたマッチングよりもうちょっと複雑なことをやろうとすると、得られたDataFrameに対してフィルターなどを適応して該当するエッジとかノードを絞るとか、要するに通常のDataFrameのオペレーションが必要になってきます。

例えば先ほどのグラフから居住者と居住地の関係を表す部分グラフを抽出しようとすると、Motifsで形状に基づくマッチングをしたあとにエッジの種類でフィルタする、つまりエッジの種類が居住になっているものをフィルタするという、こういう操作が必要になってくるわけです。

これはあまり便利ではないということで、もう少し強いセマンティクスのグラフデータモデルをサポートし、高度な分析に利用できるようにと現在考えられているのがSpark Graphです。

Spark GraphではProperty Graphと呼ばれるグラフデータモデルをサポートしています。これはエッジとかノードに型とか属性を定義できるグラフモデルになっています。ノードとかエッジに型が定義できるので、先ほどのグラフにあったようなことなる種類のノードであったりエッジの型というのも区別できます。

さらにGraphFramesではMotifsというクエリ言語が使われていたんですけど、Spark GraphではNeo4jとかだとよく使われているCypherと呼ばれるクエリ言語をサポートする予定です。Cypherを使うと形状だけではなくて、そのノードとかエッジの型とか属性に基づいたクエリを発行できます。なのでより高度なグラフマッチングが行えるようになります。

またマッチングが行えるというだけではなくて、マッチしたノードとかエッジの属性を返却するということも可能になっていて。返却された結果はDataFrameとして表現されます。

例えば先ほどの居住者と居住地の関係を抽出するというクエリもProperty GraphとCypherなら、ここのコード例にあるように型とか属性で制約を与えることでかなりシンプルに表現できるようになります。

2.4に追加されたもの

ここまでKubernetesサポート、そしてProject HydrogenとSpark Graphについて説明してきましたけれども。ほかにも2.4と3.0にはたくさんの機能が追加されてきました。

2.4についてはデータ分析向けの機能を中心に強化されてきました。例えば2.3から導入されたPandas UDFという機能があるんですけれども、2.4では新たにユーザ定義集約関数とWindow関数が定義できるようになりました。

あとビルトイン関数も2.4はかなり充実していて、新しく29個のビルトイン関数が追加されました。主に配列とかマップとか、そういう複雑なデータ型を対象にしたものが多く追加されたという感じですね。

追加されたビルトイン関数の中でも特徴的なのが高階関数になります。高階関数というのは何かと言うと例えば配列の全要素に対して同じ計算を行って、その結果を新しい配列として受け取ったりとか、あるいは配列の中から条件にマッチする要素だけを抜き出して新しい配列を作るとか、そういうふうに配列の各要素を対象にラムダ式などを適用した適用可能な高階関数がいくつか追加されました。

3.0のその他トピックス

Spark3.0でも、2.4では主に配列向けの高階関数が追加されたんですけれども、3.0では主にマップ向けの高階関数がいくつか追加される見通しです。

それからSpark SQLでサポートされるデータフォーマットもいくつか追加されていて、従来からのParquetであるとか、ORC、CSVに加えて、Avroと呼ばれるフォーマットが扱えるようになったりとか。あと画像ファイルも扱えるようになりましたね。

あとStructured Streamingについては2.3ではけっこう大きい機能追加があったんですけど、2.4はそれほどでもないんです。けれどもForeachBatchシンクと 言って、これ便利な機能で、複数のデータストアでの出力などに使えるシンクが新たに利用できるようになりました。

次は3.0で追加される、もしくは変更になるような見通しの機能になるんですけれども、いろいろとあります。まずJava11サポートですね。すでにmasterブランチではマージ済みなので予定通り3.0ではJava11のサポートがなされるのではないかなぁという見通しです。

それからScala2.12のサポートですね。実はSpark2.4からすでにScala2.12で動かせるんですけれども、3.0からはScala2.12での動作がデフォルトになる見通しです。

Hadoop3系のサポートも予定されています。先ほど少し触れたと思うんですけれども、Project HydrogenのAccelerator Aware SchedulingではHadoop3.1.2から導入された機能を利用する方向で検討されているので、Hadoop3系もサポートしていくことになるだろうというふうな見通しです。

実は今masterブランチのpom.xmlをよく見てみると、Hadoop3と連携するようにビルドできるようにはなっているんですけれども。Sparkが依存するHiveのバージョンとちょっと互換性がないという問題があって、Hadoop3もHiveも両方利用するようにビルドすると動かないんですね。エクセプションが投げられます。なのでこれと合わせてHiveのバージョンも1.2.1から2.3.4にアップグレードするという話も進められています。

今日の私からの話は以上です。ありがとうございます。

(会場拍手)