LINEが取り組んでいるマシンラーニング案件

タティヤーヌパンウォン ウィット氏:Data Labs Machine Learning Developmentチームのウィットと申します。今日は大規模マシンラーニングアプリケーションのための分散処理ライブラリというテーマで、ウィット、洪偉、齋藤の3人体制で発表します。よろしくお願いします。

早速ですが、大規模なマシンラーニングということで、まず我々が取り組んでいるマシンラーニング案件を紹介します。ざっくり3つの領域です。1つ目:クラス分類、2つ目:推薦エンジン、3つ目:表現学習があります。詳細は説明しませんが、それぞれ大規模データを活用しています。

どれくらい大規模なのかをスケールで説明いたしますと、対象ユーザー数はアクティブ、インアクティブを含めてグローバルで8億人超えです。アイテム数では一番大きなSticker Packageで言うと、1,000万パッケージ超えです。そして我々が運用しているマシンラーニングジョブの数は100件超えとなっています。

本セッションの内容

機械学習には、さまざまな仕様がありますが、現在GPUを活用したディープモデルが主流です。大規模データを使ってディープモデルをGPUで学習するにあたっては、いくつか技術的な課題があります。本セッションでは課題を解決するために我々が開発したライブラリ群を紹介いたします。

本セッションは3つに分かれています。1つ目:ghee。大規模データを分散処理するためのライブラリです。2つ目:ghee-models。gheeで実装したモデルを再利用と管理するためのライブラリです。3つ目:cumin。マシンラーニングデータセットを構築するための再現性を向上するためのライブラリです。

gheeを開発した背景

ではパート1、さっそく私から説明いたします。gheeのアジェンダです。まずghee開発の背景を紹介しまして、それからgheeの紹介、最後にgheeの実装について順に発表いたします。

まず背景ですが、冒頭に説明したようにディープモデルが主流となっています。この図は、大規模データをGPUマシンで学習する構成と手順です。左側は全社共通のHadoopクラスタ、右側が機械学習用の計算クラスタです。この構成でマシンラーニングチームはふだん1~4の手順でモデル開発を行なっていました。

まず1番、Hadoopクラスタでデータセットを構築いたします。次に構築したデータセットを計算クラスタにコピーいたします。3番、計算クラスタ上でGPUマシンを確保して、開発環境を整えて、モデル開発とオフライン評価を行います。最後に、モデルおよび推論した結果を計算クラスタからHadoopクラスタへ書き戻します。

このワークフローはシンプルですが、開発案件とデータの数がテラバイト単位で増えていくとさまざまな課題がありました。

ディープモデルの課題

次に課題を説明いたします。一番メインの課題はデータの偏りとデータの重複です。 この図はモデルを分散学習するための初期の構成です。Hadoopクラスタから学習データを事前に配布されたかたちになっています。

本来は各GPUカードで均等に学習データを処理するのが理想なんですが、これを担保するのが難しいのです。配布されたデータは同じものだとしてもGPUにおいてサンプリングなどを行なった場合、データの偏りが発生し、学習の精度が低下いたします。

そしてデータ重複についてなんですが、例えばグリッドサーチ、パラメータ探索を行う場合にこのような構成が複数発生します。そうしますと大量のネットワーク帯域とストレージを使ってしまいます。

またリソース管理について言うと、今までマシンラーニングエンジニアは、手動でGPUマシンを確保したり解放したりしていました。長く占有された場合に単純にGPUの台数が足りなくなります。そしてGPUの使用率についてなんですが、GPUマシンにおいても前処理、サンプリングなどをナイーブに実装しますと、CPU/IOオーバーヘッドの処理の影響でGPUの使用率が低下いたします。

課題を解決するためのライブラリやフレームワーク

以上の課題を踏まえて、我々は機械学習のモデル開発に集中するために次の機能をもつライブラリ、またはフレームワークを導入したいと考えていました。

1つの機能はデータ転送です。Hadoopなどのメインストレージからデータをコピーではなく、ストリームでデータを流す機能です。2つ目、使いやすさです。環境などインフラ周りの設定を吸収してくれるライブラリ、またはインターフェースです。3つ目、分散学習についてですが、手間なく分散学習を実装できること、それから学習データを均等に複数のGPUワーカーへ配布できる機能です。

そんな中、我々は既存のSparkとDaskも検討いたしました。Sparkについては使いやすいですが、データ転送速度はまだ我々の用途には足りませんでした。Daskのデータ転送速度は十分でしたが、まだ使いにくいと感じました。

このため我々は使いやすさとデータの転送速度を重視したいと考え、独自のライブラリgheeというものを開発することになりました。以上が背景です。次はgheeの紹介となります。

gheeとは

gheeはデータを分散処理するためのPythonライブラリです。主な用途として、マシンラーニングアプリケーションを想定しています。ユーザーは次の3つを指定して使います。

1つ目は入出力の仕様。例えばHadoopクラスタのファイルpath、あるいはKafkaのコンシューマトピック。2つ目は実行環境。例えば使いたいCPUリソースのメモリ、あるいはDockerイメージです。3つ目が一番重要なのですが、ユーザーが実行したいPythonのプログラムです。例えば前処理の関数、学習ループなどです。

これらを指定することで機械学習アプリケーションを実行することができます。この裏でgheeはデータ転送とKubernetesのリソース管理などをハンドリングしてくれるので、マシンラーニングエンジニアはモデル開発に集中できます。

次にgheeで書いたプログラムを実行するときにどう動くかを説明いたします。

まず最初に指定された実行環境でKubernetesのPodを立ち上げます。この例で言いますと、Preprosess、PredictまたはTrain、Postprosessの3つを立ち上げます。次に指定されたインプットデータを読み込んで、ストリームでPodへ送信します。3つ目、指定されたユーザープログラムを使って流れてきたデータを処理いたします。この例で言いますと、インプットデータから流れてきたものを前処理します。

最後に処理の結果を次のステージにストリームで送信します。例えばこの例で言いますと、Preprosessしたものが次のTrainのインプットとして流れていきます。このフローを繰り返して、最後に次のステージがない場合はアウトプットストレージへと結果を書き込みます。

gheeで書いたプログラム

次にgheeで書いたプログラムのソースコードを紹介いたしますが、その前に参考のために、こちらは一般的なPyTorchの学習プログラムです。真ん中ですが、普段、データセットはディスクから読むかたちになっています。ではgheeで書いた場合はどうなるか、次のスライドで紹介いたします。

こちらはgheeで書いたプログラムでモデルを学習する例です。注目してほしいのは、下の3行です。ここで入力データ、前処理の関数(Preprosess関数)、そして学習ループ(Train関数)を指定することで、gheeのプログラムを実行します。

Preprosessは一番上で定義していますが、単純に学習batchを作っています。真ん中の学習ループなんですが、先ほどの一般的なPyTorchのプログラムとほぼ変わりません。一番の違いは真ん中あたりのデータセットを構築するときに、ディスクから直接読む代わりにPreprosessの結果で流れてきたものを処理するところです。

データセットが大きくなる場合、こちらはスケールするためにどんな設定になるかを説明いたします。真ん中あたりでタスクというものを作るときに、各Podの設定をすることができます。こちらで言うと、CPU、メモリ、レプリカ、あるいはDockerイメージを設定することができます。

こうしてデータ量に応じて使用する計算リソースを簡単に調整することができます。以上、gheeの紹介でした。最後にgheeの裏の実装について説明いたします。

gheeの内部アーキテクチャ

一番複雑なのはデータの転送です。右側の図はCPU Pod、例えばPreprosessからGPU Podへデータが流れるイメージとなっています。Pod間通信を実現するために我々はZeroMQのPush/Pullの仕組みを使っています。社内ではZeroMQの利用実績があり、速度と安定性が優れていたため採用いたしました。

また複雑な複数Push/Pullを管理するために、我々はTransfer Managerというモジュールを実装しました。最後に並列計算を行うためにすべてのステージはMPIで実装、実行いたします。こうしてMPIベースの並列学習(例えばHorovod)は自然に対応できます。

次にユーザーコードのリモート実行について説明いたします。基本的にユーザーが書いたPythonプログラムをcloudpickleを使ってKubernetesのPod上に送って実行させています。

ここでKubernetes上の実行なんですが、Kubernetes関連の設定を抽象化するために我々は独自ライブラリswimmyというものを社内で開発して採用いたしました。swimmyはPod上の実行機能以外に、Podの状態管理とヘルスチェック機能があります。

下の図は実行スタックです。すべてのKubernetes Podの上にswimmyというエージェントがあり、Pod状態を管理します。その上でmpirunとユーザーコードが実行されます。

我々はgheeの設計は速度と使いやすさ重視だとお話しましたが、実際どうなっているかDaskと簡単に比較してみました。

MovielensをFactorization Machineで学習してみました。1枚のGPUで学習していますが、前段の前処理は16CPU並列でする設定となっています。前処理の重さはnというパラメータで調整できるようになっています。

結果は右のグラフのとおりです。縦軸は処理する時間で、緑はgheeの時間、青はDaskの時間です。ご覧のようにgheeはDaskに負けない速度となっています。また行数について言うと、この実験を実装するためにgheeで書いたコードの行数も、Daskの半分くらいとなっています。

調べますと、ほぼ差分はKubernetesのPod状態管理とデータの読み書き機能となってます。gheeはこうした機能が充実しているため、簡単に実装できました。以上、gheeの発表となります。ご清聴ありがとうございました。