Kubernetes Jobによるバッチシステムのリソース最適化

芝田将氏(以下、芝田):よろしくお願いします。今回は、Kubernetes Jobによるバッチシステムのリソース最適化というテーマでお話しさせていただきます。

まず、簡単に自己紹介からさせてください。芝田将と言います。

昨年の4月にサイバーエージェントに新卒入社しまして、現在は、先ほどオリジン監視システム「Procyon」の話をしていた久保と同じチームで働いています。

自分はオープンソースの開発が好きで、業務に関連するソフトウェアだとkube-promptというインタラクティブなKubernetesクライアント、あとはGo言語で書いたrtmp1.0準拠のサーバ実装も公開しています。GitHubは「c-bata」というIDでやっているので、よければチェックしてみてください。

今回のトークでは、AbemaTVのトランスコーダーを例に、バッチシステムの設計パターンの紹介と、トランスコーダーの改修で培った知見の共有をします。逆に今回話さないこととしましては、動画のトランスコードパラメーターの話やKubernetes自体の解説は行いません。それでは本題に入っていきます。

AbemaTVのトランスコーダーの役割

バッチシステムの設計を考える際には、そのシステムの役割やバッチシステムが担当するタスクの内容が設計方針に大きく影響します。そのために、まずはAbemaTVのトランスコーダーがどういうことを行っているのか、簡単にお話しします。

この図は、AbemaTVの動画コンテンツの納品から、視聴ユーザーに映像が届けられるまでの大まかな構成を表しています。AbemaTVでは生配信とかも行ってはいるんですが、トランスコーダーは、コンテンツプロバイダさんからいただいた動画ファイルの変換を担当します。

具体的には、AbemaTVのコンテンツ運用者がコンテンツプロバイダさんからいただいた動画をマスタリングして、AbemaTVにとって使いやすい形式に変換します。それをGoogle Cloud Storageにアップロードして、アップロードが終わったら、トランスコーダーに対してトランスコードのリクエストを送ります。

トランスコーダーはそれを受け取ると、Google Cloud Storageに上がっている動画マスターファイルから、配信に必要なメタデータや動画ファイルを生成します。トランスコードが終わると、コンテンツ運用者がまた番組を実際に編成して、配信サーバーがその編成に従って配布していく流れになります。

トランスコーダーのタスク

それでは、トランスコーダーの細かいタスクの内容について見ていきます。

まず、入力された動画ファイルから、配信に必要なメタデータを生成します。具体的には、キーフレームの位置を決定したりしています。次に、動画プレイヤーがシークするときに表示するサムネイルの生成を行います。

あと、HTTP Live Streamingで使っているMPEG2-TSというコンテナフォーマットがあるんですけれど、AbemaTVはアダプティブビットレートといって動画プレイヤーがネットワークの状況とかを見ながら、動的に解像度を切り替えていったりするんです。それに対応するためには、複数の解像度を生成する必要がありまして、トランスコーダーは180pから1080pまでの6つの解像度の動画を生成します。

また、HLSではMPEG2-TSが利用されているんですが、MPEG-DASHではFragmented MP4というまた別のコンテナフォーマットが利用されているので、この動画ファイルも生成します。最後に、DRMのパッケージ処理。これは、コンテンツ保護の処理になります。

バッチシステムとしての特徴

それでは、バッチシステムとしての特徴を考えてみます。

まず、動画トランスコードは非常に多くの計算リソースを消費します。AbemaTVでは、先ほどお話ししたとおり、解像度の異なる動画を複数生成するために、1コンテンツに対して複数回トランスコードを行います。扱っている番組数も非常に多くて、1日あたり200件。それもだいたい尺平均1時間の動画が納品されるので、それを変換していかなくてはいけません。

あと、運用の都合上、トランスコードにかかる実行時間を抑えたいのもあります。コンテンツ運用者がトランスコードをかけて、それが終わったらやっと編成に入れるんですけれど、何か動画に問題があって「やっぱり編成した動画を差し替えたい」みたいなときに、動画のトランスコードの時間が短いとそれが間に合うケースがあったりして、コンテンツ運用者から非常に喜ばれたりします。なので、並列に実行して実行時間を抑えることができる場合は、並列に実行したい。

並列処理の一般的なパターンとして、バッチシステムに限りませんが、Fan-Outパターンがあります。

Fan-OutにはQueueとWorkerという2つのコンポーネントが存在して、Queueには、トランスコーダーがやるべき仕事をタスクという単位で分割して追加していきます。

一方で、Workerが複数台あって、何もしていなければQueueからタスクを取り出してそれを処理して、その処理が終わってまた手が空いたら、また新しいタスクをQueueから取り出して処理していきます。Workerが複数台あれば、Queueに追加されたタスクは並列に処理されます。

初期のトランスコーダーの設計は、このFan-Outパターンを利用した非常にシンプルなものでした。

トランスコードのリクエストが走ると、タスクを作成して、MongoDBにタスクの詳細を保存します。あと、タスクのIDをRedisのQueueに入れておくことで、Workerがそれを取り出して処理していくんです。Workerとして利用しているのは、Kubernetes上で動いているPodです。

Fan-Outパターンによって設計されたこのトランスコーダーは、非常にシンプルな構成でした。Worker Podの処理が何か間に合わなくなってきても、Worker Podの数を増やせば並列度も上がりますし、処理のスピードも上がる。Queueに溜まっていたタスクを速いスピードで処理することもできます。

トランスコーダーが抱えていた課題

シンプルなシステムなんですが、これまでAbemaTVの動画変換を支えてきてくれました。ただ、課題がなかったわけではありません。Worker Podとして利用しているPodのリソースは一律固定です。ただ、実際に必要な処理量は、担当するタスクによって大きく変化します。

例えば、1080pの非常に解像度の高い動画をトランスコードするのと、180p・240pみたいな解像度の低い動画を生成するのでは、必要なリソースに差があります。Worker Podの性能としては、1080pのトランスコードにも耐えるぐらいのリソースを振らないといけないんですが、その場合、180pの生成、あとはメタデータ生成みたいにリソースをあまり必要としないものでは、性能をちゃんと使い切ることができず無駄が発生します。

これは無視できる場合もあるとは思うんですが、トランスコーダーはバッチシステムの中でも非常に処理量の多い、多くの計算リソースを必要とするシステムです。なので、これまでお金で解決してきたところもあったんですが、リソース効率の悪さの改善に取り組むことにしました。

リソース改善のためにやったこと

それでは、リソース問題の改善のためのアプローチについて見ていきます。

リソースを改善するためのアプローチは、いくつかありました。例えば、Kubernetes上のPodのリソース制御の仕組みを利用することや、Fan-Outパターンとはまた異なるアーキテクチャを選定することも挙げられます。最終的には、タイトルにあったとおり、Kubernetes Jobを用いたアプローチを取ったんですが、他の選択肢もある中でどう比較して採用に至ったかをお話しします。

まず最初に紹介するのは、KubernetesのPodのリソース制御の仕組みを使った方法です。これは、Kubernetesを利用している方だとご存じの方も多いとは思います。Workerとして利用しているKubernetesのPodは、2つのパラメータを指定することで柔軟にリソースの割り当てを制御できます。それが、requestsとlimitsの2つです。

requestsで指定したリソース量は、KubernetesによってPodへの割り当てが最低限保証されます。一方で、これを超えるリソースが必要になった場合にも、まだリソースをPodに対して割り当てることができて、その最大量をlimitsで指定することができます。逆に、このlimitsも超えちゃうと、例えばメモリをlimitsも超えて使っちゃったケースでは、OOM Killerとかが動作してPodが殺されてしまいます。

なので、解決策の1つとしては、requestsをすごく小さく、limitsを大きく設定すれば、Kubernetes側でリソースを柔軟に制御できるようになります。

図にするとこのような感じです。

みなさんから見て画面左側は、Worker Pod 1とWorker Pod 2があるんですけど、Worker Pod 1の方が重たい処理をしていて、Worker Pod 2はそんなに重たい処理をしていません。なので、Worker Pod 1はlimitsギリギリまでリソースを消費しています。

ただ、そのタスクの処理が終わって、次のタスクをQueueから取り出しました、となったときにWorker Pod 2が次はすごく重たい処理をして、Worker Pod 1はそんなに重たい処理をしていないとなった場合には、Kubernetes側でWorker Pod 1のリソース量を減らして、Worker Pod 2はlimitsギリギリまで上げて、みたいなことが制御できます。

ただ、この方法には1つ問題がありました。同じNode内の複数のWorker Podが、それぞれみんな一斉に1080pのトランスコードや重たい処理をし始めると、Nodeのリソースが当然足りなくなります。Nodeのリソースが足りなくなると、それから溢れたWorker Podは一部Nodeから追い出されて、別のNodeに再配置されます。

時間のかかる1080pとかの動画のトランスコードが、追い出しによって途中で停止されて、また最初からやり直さないといけないというのは、トランスコード時間がまた非常に長くなってしまいます。コンテンツ運用チームのことを考えると、あまりそういう運用の負担を上げることはやりたくないので、この追い出しのリスクを回避するためには、requestsとlimitsは分けずに同じ値に設定する必要がありました。

Priority Queueパターンのメリットとデメリット

では次に、Priority Queueパターンという、Fan-Outパターンとはまた異なったアーキテクチャの紹介をします。Priority Queueパターンでは、Fan-Outパターンとは違って、Queueが複数存在して、それに対応するWorker Podも複数存在します。

タスクが作成された時には、そのタスクの役割に応じて詰めるQueueを切り替えます。重たい処理は高いパフォーマンスを持つWorker Podに処理させて、軽い処理はあまりリソースを振っていないWorker Podに処理させたりすることができます。

Priority Queueパターンのメリットとデメリットを挙げてみました。

1つは、Fan-Outパターンと大きくアーキテクチャは変わらないので、これまでFan-Outパターンで設計していた自分たちにとっては、すごく魅力的な選択肢になります。シンプルなアーキテクチャをある程度は保ったまま、タスクに割り当てるリソースを細かく調整することができました。

一方、デメリットとしては、当然Fan-Outパターンに比べるとシステムは複雑になってしまいます。細かい調整をするためにWorker Podの数を増やしていったり、Queueを増やしていったりするとそれにコストがかかってしまいますし、管理にもコストがかかります。なので、リソース調整の選択肢としては十分に有力なんですが、他のパターンも検討することにしました。

Queuing Chainパターン

次に検討するのは、Queuing Chainパターンです。Queuing Chainパターンでは、タスクごとに専用のQueueとWorker Pod群を用意して処理していきます。

Priority Queueパターンに比べると、少しコンポーネントの数も多くはなってしまうんですが、各工程がQueueでつながれていることで、非常に疎結合なシステムになります。

メリットとデメリットを挙げると、このようになります。

タスクごとに専用のWorkerが用意されて、Priority Queueパターンよりも細いリソース調整が可能になりました。また、大きなメリットしては、疎結合なことから、問題発生時の調査や対応が容易になるケースもあります。

ただ、デメリットとしては、コンポーネントがやや多くなってしまいます。タスクを追加するたびに、そのタスクを処理するためのWorker Pod群やQueueを新しく追加しないといけません。改修コストは当然大きくなっていきます。

こういった問題に関しては、Workflow EngineのようなQueueやWorkerの追加を抽象化する仕組みを導入することも検討できました。有名なWorkflow Engineとしては、Spotifyが出しているLuigiやTreasure Dataさんのdigdag、あとはKubernetesとの親和性が高いArgoも考えられました。

ただ、運用を考えると、これらのWorkflow Engineの導入は慎重になる必要があります。あと、Workflow Engineの調査コストも無視できません。トランスコーダーは、最初にお見せしたとおり、スライド1枚に収まるぐらいのシンプルなworkflowを組んでいるので、これらのWorkflow Engineの導入はややオーバースペックなんじゃないかという印象を受けました。

Kubernetes Jobを検討

それで最後に検討したのが、Kubernetes Jobの利用です。

JobはPodの立ち上げから完了までの管理を担当します。なので、バッチ処理のようなシーンで利用されることを想定された仕組みです。AbemaTVの開局当時、開発初期にはまだKubernetesにJobがなくて、当時は導入を検討することができなかったんですが、これは今問題となっていたリソース消費の問題の解決に使えるんじゃないかということで、検討を始めました。

それでは、Jobを使うと、最初に紹介したFan-Outパターンのアーキテクチャがどのように変わるかお見せします。アーキテクチャを図にすると、このようになります。

特徴的な点としては、事前にWorker Podを複数台用意しておく必要はありません。処理が必要になったときに、その場でPodを立ち上げれば大丈夫です。

なので、事前にWorkerを用意しておかないといけなかったFan-Outパターンとは違って、必要になったときに、必要なリソースだけを指定してPodを起動できます。requestsとかlimitsを分けなくても割り当てに柔軟な制御が可能で、requestsとlimitsが同じなので、先ほどあった追い出しのリスクも回避できます。

Priority QueueやQueuing Chainパターンも、選択肢として非常に有力ではあったんですが、システムの規模・複雑さを抑えつつ、割り当てリソースを最適化できるKubernetes Jobによるアプローチを選定しました。

Kubernetes Jobにまつわる知見

それでは実際に、Kubernetes Jobを使ってシステムを設計していく際に、Jobのどういう機能は使えて、どういう機能は使えず自分たちで実装したのか、トランスコーダー改修で培った知見の共有をします。

トランスコーダーの処理の流れを、このようにもう一度書き出しました。

タスクという単位に分割すると、四角く囲われたのがタスクという単位です。縦の流れは順序を保って実行される必要があって、横に並んでいるのは並列に実行可能なタスクです。例えば、MPEG2-TSの各解像度の生成は並列に生成可能なものです。

このworkflowをKubernetes Jobベースで組み上げる際に、いくつか気になった点がありました。まず1つは、タスクの失敗時のハンドリング。Retryの仕組みです。

Fan-Outパターンを用いた初代トランスコーダーの実装では、Workerとして利用しているPod群が使い回されるので、自分たちでRetry回数を保存したり、実装を加える必要がありました。

ただ、Kubernetes Jobには比較的充実したRetry処理が組み込まれていて、失敗時のリスタート回数も指定できます。

自分たちのRetry管理のための実装をなくすことができて、ここはKubernetes Jobの仕組みに乗っかることで、非常に楽に解決することができました。

ではもう1つ、サブタスク群の完了管理です。

複数の解像度のMPEG2-TSの生成時に、180pから1080pまでの6つの動画のトランスコードを並列に実行して、それらの完了を待ってからFragmented MP4の生成に移れます、みたいなケース。これらは、サブタスク群の完了を同期する仕組みが必要でした。

Kubernetes Jobには、並列実行のための仕組みが1つ用意されています。それは、parallelismというパラメータで指定できるものです。

.spec.parallelismというパラメータを整数値で指定することで、その指定した数のPodが立ち上がって処理していきます。また、並列に立ち上げたPodの中で、いくつのPodが正常に終了すれば、全体としてこのJobが成功したと判断するかを指定できる仕組みもあります。

ただ、これは自分たちのユースケースで利用できる柔軟な仕組みではありませんでした。複数のPodが立ち上がる際に、これらのそれぞれの割り当てリソースを細かく調整することはできません。また、Podごとに実行コマンドを変えることもできないので、別々の処理をさせたい。これは180pの処理をさせて、これは1080pの処理をさせたいみたいなケースでは、もう少しコマンド実行に頼らない別の仕組みを考える必要があり、それは自分たちで実装すると複雑になってしまうので、parallelismの利用は諦めました。

Kubernetes Jobを利用して

そこで、自前のサブタスク群の完了を管理する仕組みを、このように用意しました。

Job MBR Manager(MBR: Multi BitRate)というJobを1つ用意して、そのJobが180pから1080pまでの動画を生成する、またJobを6つ生成します。Managerは、そこで立ち上げたJobが正常にすべて終了することを監視してチェックして、問題がなければ次のタスク呼び出し、Fragmented MP4の生成に移る感じです。

ここで悩ましいのが、Job MBR Managerが子Jobの状態を知るために、各子Jobから親Jobに対してステータスを通知する仕組みを考えなくてはいけません。例えば、それはmongoとかどこかに保存して、Job MBR Managerからポーリングするということも可能ですが、そういうのはできるだけ避けたいということで、KubernetesのGo言語のクライアントclient-goを使って、子Jobの状態をそれぞれ監視するようにしました。

Clientset.BatchV1().Jobs().Watch()がそれに利用できます。

ただ、これも課題がないわけではなくて、内部的にはHTTPのChunked Transfer Encodingを使って、ポーリングとかではなくてストリーミングでステータスが監視できる仕組みなんですけれど、当然セッションが切れることもあるので、その際は適切にリコネクトして、ウォッチを再開する実装を入れないといけません。

なので、自分たちの書くプログラムは少し複雑になって、もしかしたらそれが何らかのバグにつながってしまうリスクもありました。実際、KubernetesのドキュメントにもJobパターンという章があって、今回のケースみたいに単一のJobが複数のPodを適切にハンドリングする仕組みは複雑になると言及されています。

なので、複雑なworkflowを組む場合には、他の選択肢を検討することも必要かと思います。

Kubernetes Jobを使ってトランスコーダーのworkflowを作った際に、使ったJobの機能、使わなかったJobの機能を紹介しました。RetryにはrestartPolicyを使って、完了同期は自前で作りました。

それでは、まとめに入ります。AbemaTVのトランスコーダーを例に、各種バッチシステムの設計パターンの紹介やKubernetes Jobをベースにしたバッチシステムの設計知見・プラクティスの共有を行いました。みなさんがバッチシステムの設計を担当する際には、お役に立てば幸いです。以上で発表を終わります。ご清聴ありがとうございました。