Spark 2.4 と3.0の新機能を解説

猿田浩輔氏(以下、猿田):みなさんこんばんは。私からはApache Sparkの現時点での最新フィーチャーリリースである2.4と、今年リリースが期待されているSpark3.0の新機能をいくつかご紹介させていただきたいと思います。

はじめに私の自己紹介を簡単にさせてください。

私はエヌ・ティ・ティ・データの猿田と申します。2015年くらいからApache Sparkのコミッタを務めております。

これまでの私の業務としてはお客様向けにHadoopとかSparkとか、OSS並列分散処理系の導入支援であったり技術開発、テクニカルサポートなどを行なってまいりました。

最近はちょっと毛色が違うんですが、HadoopとかSparkに関わりつつもPersistent Memoryであるとか新しいトレンドのハードウェアの活用に興味を持っております。

本題に入っていく前に、もしかしたら今日いらっしゃっている人の中にはSparkのことをよくご存知ない方もいらっしゃるかもしれないので、簡単にSparkについて説明したいと思います。

Apach Sparkの概要

Sparkはオープンソースの並列分散処理系で、端的に言うとここに絵があるとおり数百ギガとか数テラとかそういった大量のデータを、大量のサーバで並列分散処理することで現実的な時間で目的の結果を得る。そういったことを可能にする処理系になっています。

複数の計算機を用いた並列分散処理だと、障害時のリカバリであるとか、タスクの分割、あとはスケジューリング、そういった本質的なデータ処理とは関係のないところを考えていかなければいけません。そういった面倒なことがたくさんあるわけですが、そういった部分はSparkが面倒見てくれると。こういった処理系になっています。

続いてアプリケーション開発フレームワークという側面からのSparkについてです。

Sparkは開発当初からUnified Analytics Engineという方向性を貫いていて、単純なバッチ処理に利用できるのはもちろん、機械学習やグラフ処理、あとはストリーム処理、そういったさまざまなパラダイムの処理をSparkという1つのプラットフォームの上で実現できるようにさまざまなライブラリが整備されています。

またSpark自体はScalaやJavaで実装されているんですが、それだけではなくてAnalyticsとかだとよくSQLとかPythonとかRとか、そういう言語が使われることが多いと思います。そういった言語でアプリケーション開発や分析処理を実装することも可能になっています。

Sparkのコンポーネントの1つに、Spark SQLと呼ばれるものがあります。これはSpark1.0から導入されたものなんですが、Spark SQLの登場を境にSparkはこのSpark SQLを中心にした処理系に変化していきました。

これによって宣言的なAPIを用いたSQLとかで処理が書けたり、オプティマイザによる最適化が受けられたり。あとは開発言語による性能差が出にくいというのはパフォーマンス上のメリットであるとか利便性とかそういった恩恵が受けられるようになりました。

機械学習とかストリーム処理向けのライブラリもSpark SQLをベースにしたものが整備されていて、例えばMLlibというライブラリが同梱されています。古くからSparkに含まれるライブラリの1つなんですが、このMLlibのspark.mlと呼ばれるパッケージにDataFrameをベースとしたアルゴリズムが実装されています。

ストリーム処理に関しては従来Spark Streamingと呼ばれるストリーム処理系が同梱されていて、Spark2.0からはSpark SQLをベースにしたStructured Streamingというものが新たに同梱されるようになりました。

Spark 2.4 & 3.0の新機能

ここまでSparkの基本について説明してきましたけれども、ここからは2.4と3.0の新機能について説明していきたいと思います。本日は大きく3つトピックを用意しています。

Spark2.3から新たに追加されたKubernetesサポートのアップデート、それからAI向けのユースケースをカバーするための包括的な取り組みである「Project Hydrogen」、そしてSparkの新しいグラフ処理系「Spark Graph」。この3つについて紹介してまいりたいと思います。

さっそく個別に見ていきたいと思います。まず最初はKubernetesサポートについてです。現時点での最新フィーチャーリリースの1つ前、2.3からSpark向けのクラスタマネージャとして従来のYARNとStandaloneとMesosに加えてKubernetesがサポートされるようになりました。

マイクロサービスを構成する基盤としてはかなり活用されているKubernetesですけれども、クラスタリソースの管理の仕組みを大規模データ処理向きの基盤としても活用しようという動きがあるわけです。

このKubernetesサポート、Spark2.3の時点でかなり制約が大きかったんですけれども、2.4以降で少しずつ完成度を高めてきています。先日、もしかしたら参加された方もいらっしゃると思うんですけれども、Hadoop / Spark Conference Japan 2019というイベントがあってですね、事前にアンケートを取らせていただきました。

Sparkをプロダクションで利用している方が400名いらっしゃったんですけれども、その中ですでに23名がKubernetesの上でSparkを動かしているという結果も出ていてですね、登場間もないながらも重要な機能になっています。

Spark2.4でKubernetesサポートがどうなったか、そして3.0でどうなるかというのを特徴的なものを見ていきたいと思います。

まず2.4から見ていきたいと思います。大きな機能追加としてはサポートする開発言語の強化。それからクライアントモードでの動作が挙げられます。2.4ではデータサイエンティストに人気のあるPySparkであったり、SparkR、こういったものがKubernetes上で利用できるようになりました。

またクライアントモードで動作可能になったことで、インタラクティブシェルであったり、最近使われている方多いと思うんですけどノートブック、こういったツールと連携して使えるようになりました。

3.0ではどうなるかと言うとまだまだ開発中のものであったり議論中のものもあって3.0に必ず入るという保証はないんですけれども、一応3.0への導入を目標にしているものはいくつかあります。

ここで主だったものをいくつか挙げていますけれども、1つはKerberosサポートですね。HDFSとかHBaseという分散データストアのOSS、Sparkと組み合わせて利用されるケースが非常に多いんですけれども。これらをセキュアに使いたいといった場合には、HDFSとかHBaseをKerberizedして使うということがよくあります。

こんなふうにKerberizedされたHadoop関連のコンポーネントをSparkのjobから利用するためにはSpark job自体がKerberosに対応していたりとか、あとHDFSとかHBaseのトークンと呼ばれる機能に対応している必要があります。

Sparkを従来YARNなどで動かす場合はすでにこの機能って実装されているんですけれども、Kubernetesサポートでもこの機能が3.0で実装される見込みです。

それからKubernetesではテンプレートからマニフェストを生成してPodの設定を記述できると思うんですけれども、Execulor PodとDriver Podに関してもそれが可能になります。

あとここからはまだ議論中の機能になるんですけれども、リソースの利用状況に応じて動的にExeculor Podを起動したり停止したりするダイナミックリソースアロケーション、これもYARNなんかではもうすでに実装されている機能ですね。

それから、同じノードの上で動くExeculor Podを通して効率的にシャッフルを行うためのExtemal Shuffle Serviceとか、こういった機能が3.0に向けて検討されています。

Project Hydrogen

次のトピックはProject Hydrogenです。Project Hydrogenは何かと言うとSparkでAI、とくにディープラーニングのユースケースをカバーするための包括的な取り組みになっています。

SparkはこれまでもIntelのBigDLであるとかDatabricksのDeepLearningPipelineと組み合わせたり、あとはTensorflow on SparkとかCaffe on Sparkとか既存のフレームワークと組み合わせることでディープラーニングのユースケースとしても活用されてきたという歴史があります。

しかし活用されて行く中でいろいろと弱点も浮き彫りになってきたわけですね。そういった弱点を強化するための取り組みがHydrogenということになります。ここに挙げているとおりHydrogenは大きく3つの取り組みから構成されていて、それぞれBarrier Execution Mode、Accelerator Aware Scheduling、そしてOptimized Data Exchange。この3つです。

今日はSpark2.4で基本的な機能が実装済みであるBarrier Execution Mode、それから3.0の導入に向けて開発が進められているAccelerator Aware Scheduling、この2つについて説明していきたいと思います。

まずはBarrier Execution Modeから見ていきたいと思います。最初に少しだけSparkの処理単位とかスケジューリングの仕組みについておさらいしていきたいと思います。

Sparkではタスクと呼ばれる最小の処理単位があります。複数のExecutorがそれぞれ異なるタスクを処理することでクラスタ全体で分散処理する。こういう仕組みになっています。

Executorへのタスクの割り当てはSparkのスケジューラが行います。ステージは依存関係になっていて、先行するステージに含まれるタスクがすべて完了したら後続のステージがスケジューリングされると。例えばこの図で言うと一番左のステージ1が完了したあとにステージ2のスケジューリングが行われる。こういった動きになります。

Sparkのスケジューラはデータローカリティであるとか、Executorの空いてるコアの数などに基づいて、なるべくExecutorのコアを遊ばせないようにタスクをスケジューリングします。

つまりあるステージに含まれるタスクはそれぞれのタイミングでスケジューリングされるのであって、同じステージに含まれるタスクが一斉にスケジューリングされるとは限りません。タスクが失敗した場合は当該タスクのみリトライされる。こういった方式でスケジューリングが行われます。

Barrier Execution Mode

ここまで見てきたように従来のSparkのスケジューリング方式は各タスクが独立に動作するという前提で作られています。一方で例えば分散ディープラーニングなどではAllReduceなど、複数のタスクで通信しながら協調動作が必要になるものもあります。

そういった類のタスクは同時に計算リソースが割り当てられるようにまとめてスケジューリングされる必要があったり、タスクの一部が失敗したらまとめてほかのタスクも再スケジューリングされなければいけないということが多いはずです。

しかし従来のSparkのスケジューラではこういった協調動作を前提としたタスクのスケジューリングができませんでした。それから協調動作のためにはタスクの同期のメカニズムも必要になってくると思うんですけれども、そもそもそういった仕組みも実装されていませんでした。

そこでSpark2.4からBarrier Execution Modeと呼ばれる新スケジューリング方式が追加されました。

これは何かと言うと協調動作が必要なタスクをまとめてスケジューリングしたりとか、エラー時のリトライなども協調動作が必要な複数のタスクに対してまとめて行われます。それからタスク間での同期のメカニズムなんかも実装されました。

プログラミングインターフェースとしてはRDDの#barrierメソッドを呼び出すことでBarrier Execution Modeのスケジューリング対象となる部分を設定することができます。

またBarrierTaskContext、ここにコード例があると思うんですけれども。BarrierTaskContextの#barrierメソッドを呼び出すことでタスク間の同期点を設定できるというふうになっています。

ちょっと注意が必要なんですけれども、Barrier Execution Modeで提供するのはあくまで協調動作が必要なタスクを包括的にスケジューリングする仕組みと、タスクに同期点を設定する仕組みだけだということですね。

実際の協調動作、例えばタスク間の通信などはアプリ開発者であったり、Barrier Execution Modeをもとにディープラーニングフレームワークを開発する人たちが実装する必要があります。