2024.12.10
“放置系”なのにサイバー攻撃を監視・検知、「統合ログ管理ツール」とは 最先端のログ管理体制を実現する方法
リンクをコピー
記事をブックマーク
上新卓也氏:これでLogical Planにキャッシュを使うプランが含まれてきたので、その次の処理としてはOptimizerですね。
これは今までプランの書き換えなどはやってこなかったんですが、ここからプランをガシガシと書き換えていってより効率のよい処理ができるようにプランを最適化していきます。
Optimizerはheuristicsとcostベース、経験則とコストベースでプランを書き換えていきます。今ここに6つ代表的なものを挙げていますが、すでに50以上のルールが組み込まれています。それらを自動的に利用することができます。
ただ、50以上もあってもすべてのケースに有効とは限りません。特殊なワークロードやクエリを実行したら、もしかするとあまり役に立たないかもしれません。そんな場合には、独自のOptimizerルールや、次のところで説明するPlannerを自分で実装して、Sparkの処理の中に組み込んでいくことができます。
ルールの実装については実際のルールを見て……まあ正直ちょっと難しいんですけれども(笑)、これを組んで、登録自体は……まず最初の候補としてはSparkSessionExtensionsという機能があります。
これはSparkSessionを起動するときに、OptimizerとPlannerにかぎらずあと2、3個差し込みができるんですが、先ほどざっと言ったフローの中にいくつかプラグインのポイントがありまして、ここにはいろいろなルールや、PlannerについてはStrategyを差し込んでいくことができます。
以前のSpark SummitでExperimentalMethodsを利用する例を解説しているビデオがありますので、興味がある人がこちらのリンクを見てみるとビデオが公開されています。
ここまででLogicalフェーズについて説明してきました。ここから先は、Physicalフェーズになります。
まずはじめがPlannerです。これは何をするかと言うと、今までLogical PlanだったのがPhysical Planになります。
Logical Planは何をしたいのかというのを表していたんですが、Physical Planになると今Logical Planで定義してある「何をしたいのか」というのをSparkがどのように実行するのかというHOWに変換します。WHATからHOWという感じですね。Sparkをどうやって使ったら実行できるかというものに変換します。
この変換の際に複数の実行方法がある場合があります。例えばJoinをすることになると、よくあるbroadcast hash joinであったり、sort merge joinであったりという選択肢、ほかにもいっぱいやり方はありますが、このような選択肢があります。
Spark SQLはこの中からどちらがコストが低いかを推定して、コストが低くなるものを選択します。broadcast hash joinとsort merge joinが候補に上がったとしたら、普通はbroadcast hash joinのほうがコストが低いのでそちらを使うようになります。
broadcast hash joinはさっきもちょっと言いましたが、通常はsort merge joinのようなシャッフルを伴うJoinですね、これを使わせるためにはtableの統計情報が必要になります。
まず1個目は設定値がありまして、spark.sql.autoBroadcastJoinThresholdというのがあります。tableの情報などから推定したデータサイズがこのThresholdより小さくなったときに、Broadcastを選択するというような動きをします。
なので、もし各サーバーが大きなメモリを持っているとすると、この値をググっと大きくしてBroadcastを使うようにしたほうが、より高速な動きが期待できます。
クエリの出力、tableの大きさというのは統計情報がありますが、この統計情報を最新に保つためにANALYZE TABLEコマンドを使ってそのtableの統計情報を確実に最新になるように調整していく必要があります。
最後に、強制的にBroadcastさせる方法もあります。それがこのbroadcastJoinヒントというものです。片方のデータが確実にメモリに載るということがわかっているにもかかわらず、なんか知らないけどSpark SQLのほうがどうしてもシャッフル系のJoinを選んでしまうという場合には、このbroadcastJoinヒントをクエリの中に付けることでSpark SQLに確実にbroadcastJoinをさせるということができるようになります。
もう1個Joinについて、これはわりとよく知られていますが、Joinの条件の中にはequal条件を付けるというかたちですね。下のように範囲の、大なり小なり的なやつだけだとどうなるかと言うと、上の例のように2つのtableを二重ループするというような動きになります。
直積やデカルト積とか、そういうあれになってしまって、これは非常に効率が悪いんですけれども、equalの条件があればhashを使って絞り込みができたり、equalでつながっているJoinキーをもとに各ノードにデータを分散しても大丈夫なのでJoinの速度は速くなり、パフォーマンスは上がります。
最後にQuery Executionです。
これは最終的に選択されたPhysical PlanをもとにRDDを実際に駆動するというところですね。ここで登場人物が3人、主要な役割をする人が出てきます。
1個目がMemory Manager、2個目がCode Generator、3個目がTungsten Engineです。
Memory Managerはメモリの利用状況を追跡してtaskやoperator間のメモリを調整します。Code GeneratorはPhysical PlanをコンパイルしてJavaコードを生成するものです。
ちなみにその生成されたJavaコードは例えばボクシングとかアンボクシングとか有名なボトルネックになりがちなやつがあると思うんですけど、ああいうやつとか、メソッドのディスパッチとかのコストができるだけ小さくなるように作られているのでかなり速いです。
最後にTungsten Engineですけれども、これはCPUやメモリに対して効率的なバイナリデータフォーマット、データ構造を提供しています。
Memory Managerはメモリをどれだけ持っているかというのを管理するんですけれども、これは基本的にSparkのexecutorのJVMヒープサイズのことになります。(それ)を管理します。
しかし、Sparkはすべてのメモリを管理するわけではなくて、例えばNettyとかParquetとかの内部で使っているバッファサイズはMemory Managerは管理していません。こういったexecutor.memoryとかmemory.fractionという設定を監視外のメモリのためにちょっと余裕を持った設定に抑えておかないと、YARNにkillされたりとかいうことになりますね。
もう1つ便利なのがoff-heapメモリを使うことです。off-heapメモリを使うとGCのオーバーヘッドが抑えられるといった利点がありますが、これはデフォルトでオフになってます。なのでこの設定を有効にして、そのとなりのoffHeap.sizeをクエリに必要なメモリ分だけ設定してあげると、GCなどのオーバーヘッドが抑えられるようになって速くなる可能性があります。
これを設定するとデータの大部分がoff-heapメモリにいくんですが、これが先ほどのJavaHeapメモリの外になるので、さっきのexecutor.memoryの設定をそれに合わせてちょっと低く設定しても大丈夫ということになります。
次に、Whole Stage Code Generationです。
普通よくあるのは、こういうプランができたときに、それぞれ1個ずつCode Generationをしてメソッドコールをして順番に実行していくみたいなやり方です。
Spark SQLはこれらを1個のステージ内にあるこういったオペレーターを全部一緒にCode Generationして、こういうシンプルなかたちのコードにして、よりコンパクトで高速なJavaコードになるようになっています。
ただ、これはちょっと欠点があります。
JVMのことに詳しい方はご存知かもしれませんが、あるメソッドのバイトコードのサイズが8キロバイトを超えると、JITコンパイラがネイティブコンパイルするのを諦めてしまってそのまま……なんだっけ?
(誰かが発言する)あ、そう! ありがとうございます(笑)。interpreter modeで実行してしまうのでかなり遅くなるんですね。
複雑なプランや入り組んだクエリを作ってしまうと、大きなメソッドを作ってしまう場合があります。WholeStage Codegenをオフにしたほうがむしろ速いという場合があります。
ここにhugeMethodLimitっていう設定がありますが、この値を設定すると、この設定値より大きなメソッドが見つかったらWholeStage Codegenをオフにして実行します。
様子を見てもし大きなメソッドができていてパフォーマンスがあまり速くない場合にはこのhugeMethodLimitの設定を8キロバイトとかにしておくとそれ以上の大きなメソッドになるとWholeStage Codegenは諦めるような動きになります。
ここまでがSpark SQLの中身の話ですね。
あとは実行、今のプランの前後にはもちろんデータの入出力があって、それがちょっと分離されているんですけれども、データ入力、データ出力が遅い場合にはどんなにSparkががんばってもそこがボトルネックになってしまって全体が遅くなる可能性があります。
一応これらの問題を改善するためにいろいろがんばっていまして、その1つはScanのVectorizationですね。例えばParquetやORCであればカラムナフォーマットというファイルフォーマットになっています。そういったデータがそのまま読めればより効率的になります。
また、そういったカラムナフォーマットであったりするとJVMがSIMDなどのハードウェアの機能を利用しやすくなるので、いろいろ利点があります。以前のリリースでParquetのリーダーをVectorizedの実装をしたところ、約10倍くらい速くなったことがあります。
もう1個は、先ほどもすこし触れたPartitioningとBucketingです。
データソースはファイルベースのもの、Parquet、ORC、CSV、JSONとかの場合はPartitioningとBucketingを検討するようにしてください。
これによって、不要なIOとシャッフルを避けられるようになるのでスピードアップを期待することができます。
この辺の例も以前Spark Summitでトークがあったのでこちらのリンクをたどるとビデオを見ることができます。
というわけで、データソースとしては、データの入出力としてはvectrized readingが可能なファイルフォーマットをできるだけ利用するようにしたほうがいいです。今だとParquet、ORCが対応しています。またファイルベースのデータソースであればPartitioningやBucketingを検討してみてください。
以上になります。ありがとうございました。
(会場拍手)
2024.12.10
メールのラリー回数でわかる「評価されない人」の特徴 職場での評価を下げる行動5選
2024.12.09
10点満点中7点の部下に言うべきこと 部下を育成できない上司の特徴トップ5
2024.12.09
国内の有名ホテルでは、マグロ丼がなんと1杯「24,000円」 「良いものをより安く」を追いすぎた日本にとって値上げが重要な理由
2024.12.12
会議で発言しやすくなる「心理的安全性」を高めるには ファシリテーションがうまい人の3つの条件
2023.03.21
民間宇宙開発で高まる「飛行機とロケットの衝突」の危機...どうやって回避する?
2024.12.10
職場であえて「不機嫌」を出したほうがいいタイプ NOと言えない人のための人間関係をラクにするヒント
2024.12.12
今までとこれからで、エンジニアに求められる「スキル」の違い AI時代のエンジニアの未来と生存戦略のカギとは
PR | 2024.11.26
なぜ電話営業はなくならない?その要因は「属人化」 通話内容をデータ化するZoomのクラウドサービス活用術
PR | 2024.11.22
「闇雲なAI導入」から脱却せよ Zoom・パーソル・THE GUILD幹部が語る、従業員と顧客体験を高めるAI戦略の要諦
2024.12.11
大企業への転職前に感じた、「なんか違うかも」の違和感の正体 「親が喜ぶ」「モテそう」ではない、自分の判断基準を持つカギ