2024.12.19
システムの穴を運用でカバーしようとしてミス多発… バグが大量発生、決算が合わない状態から業務効率化を実現するまで
リンクをコピー
記事をブックマーク
上新卓也氏:これでLogical Planにキャッシュを使うプランが含まれてきたので、その次の処理としてはOptimizerですね。
これは今までプランの書き換えなどはやってこなかったんですが、ここからプランをガシガシと書き換えていってより効率のよい処理ができるようにプランを最適化していきます。
Optimizerはheuristicsとcostベース、経験則とコストベースでプランを書き換えていきます。今ここに6つ代表的なものを挙げていますが、すでに50以上のルールが組み込まれています。それらを自動的に利用することができます。
ただ、50以上もあってもすべてのケースに有効とは限りません。特殊なワークロードやクエリを実行したら、もしかするとあまり役に立たないかもしれません。そんな場合には、独自のOptimizerルールや、次のところで説明するPlannerを自分で実装して、Sparkの処理の中に組み込んでいくことができます。
ルールの実装については実際のルールを見て……まあ正直ちょっと難しいんですけれども(笑)、これを組んで、登録自体は……まず最初の候補としてはSparkSessionExtensionsという機能があります。
これはSparkSessionを起動するときに、OptimizerとPlannerにかぎらずあと2、3個差し込みができるんですが、先ほどざっと言ったフローの中にいくつかプラグインのポイントがありまして、ここにはいろいろなルールや、PlannerについてはStrategyを差し込んでいくことができます。
以前のSpark SummitでExperimentalMethodsを利用する例を解説しているビデオがありますので、興味がある人がこちらのリンクを見てみるとビデオが公開されています。
ここまででLogicalフェーズについて説明してきました。ここから先は、Physicalフェーズになります。
まずはじめがPlannerです。これは何をするかと言うと、今までLogical PlanだったのがPhysical Planになります。
Logical Planは何をしたいのかというのを表していたんですが、Physical Planになると今Logical Planで定義してある「何をしたいのか」というのをSparkがどのように実行するのかというHOWに変換します。WHATからHOWという感じですね。Sparkをどうやって使ったら実行できるかというものに変換します。
この変換の際に複数の実行方法がある場合があります。例えばJoinをすることになると、よくあるbroadcast hash joinであったり、sort merge joinであったりという選択肢、ほかにもいっぱいやり方はありますが、このような選択肢があります。
Spark SQLはこの中からどちらがコストが低いかを推定して、コストが低くなるものを選択します。broadcast hash joinとsort merge joinが候補に上がったとしたら、普通はbroadcast hash joinのほうがコストが低いのでそちらを使うようになります。
broadcast hash joinはさっきもちょっと言いましたが、通常はsort merge joinのようなシャッフルを伴うJoinですね、これを使わせるためにはtableの統計情報が必要になります。
まず1個目は設定値がありまして、spark.sql.autoBroadcastJoinThresholdというのがあります。tableの情報などから推定したデータサイズがこのThresholdより小さくなったときに、Broadcastを選択するというような動きをします。
なので、もし各サーバーが大きなメモリを持っているとすると、この値をググっと大きくしてBroadcastを使うようにしたほうが、より高速な動きが期待できます。
クエリの出力、tableの大きさというのは統計情報がありますが、この統計情報を最新に保つためにANALYZE TABLEコマンドを使ってそのtableの統計情報を確実に最新になるように調整していく必要があります。
最後に、強制的にBroadcastさせる方法もあります。それがこのbroadcastJoinヒントというものです。片方のデータが確実にメモリに載るということがわかっているにもかかわらず、なんか知らないけどSpark SQLのほうがどうしてもシャッフル系のJoinを選んでしまうという場合には、このbroadcastJoinヒントをクエリの中に付けることでSpark SQLに確実にbroadcastJoinをさせるということができるようになります。
もう1個Joinについて、これはわりとよく知られていますが、Joinの条件の中にはequal条件を付けるというかたちですね。下のように範囲の、大なり小なり的なやつだけだとどうなるかと言うと、上の例のように2つのtableを二重ループするというような動きになります。
直積やデカルト積とか、そういうあれになってしまって、これは非常に効率が悪いんですけれども、equalの条件があればhashを使って絞り込みができたり、equalでつながっているJoinキーをもとに各ノードにデータを分散しても大丈夫なのでJoinの速度は速くなり、パフォーマンスは上がります。
最後にQuery Executionです。
これは最終的に選択されたPhysical PlanをもとにRDDを実際に駆動するというところですね。ここで登場人物が3人、主要な役割をする人が出てきます。
1個目がMemory Manager、2個目がCode Generator、3個目がTungsten Engineです。
Memory Managerはメモリの利用状況を追跡してtaskやoperator間のメモリを調整します。Code GeneratorはPhysical PlanをコンパイルしてJavaコードを生成するものです。
ちなみにその生成されたJavaコードは例えばボクシングとかアンボクシングとか有名なボトルネックになりがちなやつがあると思うんですけど、ああいうやつとか、メソッドのディスパッチとかのコストができるだけ小さくなるように作られているのでかなり速いです。
最後にTungsten Engineですけれども、これはCPUやメモリに対して効率的なバイナリデータフォーマット、データ構造を提供しています。
Memory Managerはメモリをどれだけ持っているかというのを管理するんですけれども、これは基本的にSparkのexecutorのJVMヒープサイズのことになります。(それ)を管理します。
しかし、Sparkはすべてのメモリを管理するわけではなくて、例えばNettyとかParquetとかの内部で使っているバッファサイズはMemory Managerは管理していません。こういったexecutor.memoryとかmemory.fractionという設定を監視外のメモリのためにちょっと余裕を持った設定に抑えておかないと、YARNにkillされたりとかいうことになりますね。
もう1つ便利なのがoff-heapメモリを使うことです。off-heapメモリを使うとGCのオーバーヘッドが抑えられるといった利点がありますが、これはデフォルトでオフになってます。なのでこの設定を有効にして、そのとなりのoffHeap.sizeをクエリに必要なメモリ分だけ設定してあげると、GCなどのオーバーヘッドが抑えられるようになって速くなる可能性があります。
これを設定するとデータの大部分がoff-heapメモリにいくんですが、これが先ほどのJavaHeapメモリの外になるので、さっきのexecutor.memoryの設定をそれに合わせてちょっと低く設定しても大丈夫ということになります。
次に、Whole Stage Code Generationです。
普通よくあるのは、こういうプランができたときに、それぞれ1個ずつCode Generationをしてメソッドコールをして順番に実行していくみたいなやり方です。
Spark SQLはこれらを1個のステージ内にあるこういったオペレーターを全部一緒にCode Generationして、こういうシンプルなかたちのコードにして、よりコンパクトで高速なJavaコードになるようになっています。
ただ、これはちょっと欠点があります。
JVMのことに詳しい方はご存知かもしれませんが、あるメソッドのバイトコードのサイズが8キロバイトを超えると、JITコンパイラがネイティブコンパイルするのを諦めてしまってそのまま……なんだっけ?
(誰かが発言する)あ、そう! ありがとうございます(笑)。interpreter modeで実行してしまうのでかなり遅くなるんですね。
複雑なプランや入り組んだクエリを作ってしまうと、大きなメソッドを作ってしまう場合があります。WholeStage Codegenをオフにしたほうがむしろ速いという場合があります。
ここにhugeMethodLimitっていう設定がありますが、この値を設定すると、この設定値より大きなメソッドが見つかったらWholeStage Codegenをオフにして実行します。
様子を見てもし大きなメソッドができていてパフォーマンスがあまり速くない場合にはこのhugeMethodLimitの設定を8キロバイトとかにしておくとそれ以上の大きなメソッドになるとWholeStage Codegenは諦めるような動きになります。
ここまでがSpark SQLの中身の話ですね。
あとは実行、今のプランの前後にはもちろんデータの入出力があって、それがちょっと分離されているんですけれども、データ入力、データ出力が遅い場合にはどんなにSparkががんばってもそこがボトルネックになってしまって全体が遅くなる可能性があります。
一応これらの問題を改善するためにいろいろがんばっていまして、その1つはScanのVectorizationですね。例えばParquetやORCであればカラムナフォーマットというファイルフォーマットになっています。そういったデータがそのまま読めればより効率的になります。
また、そういったカラムナフォーマットであったりするとJVMがSIMDなどのハードウェアの機能を利用しやすくなるので、いろいろ利点があります。以前のリリースでParquetのリーダーをVectorizedの実装をしたところ、約10倍くらい速くなったことがあります。
もう1個は、先ほどもすこし触れたPartitioningとBucketingです。
データソースはファイルベースのもの、Parquet、ORC、CSV、JSONとかの場合はPartitioningとBucketingを検討するようにしてください。
これによって、不要なIOとシャッフルを避けられるようになるのでスピードアップを期待することができます。
この辺の例も以前Spark Summitでトークがあったのでこちらのリンクをたどるとビデオを見ることができます。
というわけで、データソースとしては、データの入出力としてはvectrized readingが可能なファイルフォーマットをできるだけ利用するようにしたほうがいいです。今だとParquet、ORCが対応しています。またファイルベースのデータソースであればPartitioningやBucketingを検討してみてください。
以上になります。ありがとうございました。
(会場拍手)
2024.12.20
日本の約10倍がん患者が殺到し、病院はキャパオーバー ジャパンハートが描く医療の未来と、カンボジアに新病院を作る理由
2024.12.19
12万通りの「資格の組み合わせ」の中で厳選された60の項目 532の資格を持つ林雄次氏の新刊『資格のかけ算』の見所
2024.12.16
32歳で成績最下位から1年でトップ営業になれた理由 売るテクニックよりも大事な「あり方」
2023.03.21
民間宇宙開発で高まる「飛行機とロケットの衝突」の危機...どうやって回避する?
PR | 2024.12.20
モンスター化したExcelが、ある日突然崩壊 昭和のガス工事会社を生まれ変わらせた、起死回生のノーコード活用術
2024.12.12
会議で発言しやすくなる「心理的安全性」を高めるには ファシリテーションがうまい人の3つの条件
2024.12.18
「社長以外みんな儲かる給与設計」にした理由 経営者たちが語る、優秀な人材集め・会社を発展させるためのヒント
2024.12.17
面接で「後輩を指導できなさそう」と思われる人の伝え方 歳を重ねるほど重視される経験の「ノウハウ化」
2024.12.13
ファシリテーターは「しゃべらないほうがいい」理由 入山章栄氏が語る、心理的安全性の高い場を作るポイント
2024.12.10
メールのラリー回数でわかる「評価されない人」の特徴 職場での評価を下げる行動5選
Climbers Startup JAPAN EXPO 2024 - 秋 -
2024.11.20 - 2024.11.21
『主体的なキャリア形成』を考える~資格のかけ算について〜
2024.12.07 - 2024.12.07
Startup CTO of the year 2024
2024.11.19 - 2024.11.19
社員の力を引き出す経営戦略〜ひとり一人が自ら成長する組織づくり〜
2024.11.20 - 2024.11.20
「確率思考」で未来を見通す 事業を成功に導く意思決定 ~エビデンス・ベースド・マーケティング思考の調査分析で事業に有効な予測手法とは~
2024.11.05 - 2024.11.05