一人ひとりのユーザーに対して最適な記事を届けるニュースアプリ

米田武氏:株式会社グノシーの米田が発表させていただきます。よろしくお願いします。

最初に自己紹介ですけども、ネット上ではマスタケと呼ばれています。本名は米田武といいます。

GithubとTwitterのアカウントはこんな感じで、去年の3月に大阪大学で数学の修士号、コンピュータサイエンスではなく数学の修士号を取ったあとに、グノシーにジョインしました。

今はその数学のバックグラウンドを活かして、推薦システムのモデルを作るところから、最適化の問題を解いたり、それをユーザーに届けるサーバーサイドのエンジニアリングなど、幅広く担当しています。

簡単に会社の紹介なんですけれども、株式会社グノシーといいます。グノシーは「データとアルゴリズムの会社」とうたっていまして、企業理念は「情報を世界中の人に最適に届ける」と言っています。人工知能を研究する3人の大学院生からスタートした会社で、そういうバックグラウンドを持っています。

代表的なアプリとして、「グノシー」というアプリ、みなさんご存知の方が多いかと思いますが。それ以外にも最近は、「ニュースパス」や「LUCRA」といった新しいメディアであったり、右側でいくと最近はブロックチェーンの研究開発部がけっこうがんばっていて、有名になってきていたりしています。

もう1個がアド領域で、アドネットワークを自社で抱えていたりして、データとアルゴリズムを活かして事業を展開しています。

今回は、僕が担当しているニュースパスというアプリの、推薦システムの話をしたいと思います。「ニュースパスの推薦システムを支えるAWSアーキテクチャ powered by aws」と題しまして、発表させていただきます。

背景として、ニュースパスは「一人ひとりのユーザーに対して最適な記事を届けるニュースアプリ」です。これはけっこうすごくて、アプリをインストールして1クリックした瞬間からパーソナライズが始まります。

ニュースのアプリケーションなので、新着の記事は1日で多いときで1万記事を超えてくると。なので、大量の記事と大量のログから、ユーザー一人ひとりに対して最適な記事を選んで届けなければなりませんので、そのアルゴリズムを開発しています。

ニュースの推薦は難しい

アルゴリズムを開発するだけではなくて、それを高速かつ安定に実行して届けなければなりませんので、それをAWS上で構築していて、そのアーキテクチャの話を今回はしていきたいと思います。

本題に入る前に、ニュース推薦ってけっこう難しい問題として知られていて。主にこういう理由があります。

まず左側なんですけども、ニュースの価値って基本的に、時間減衰します。例えばここにある例でいくと、「〇〇さんが死去」というニュース。ほかには「サッカー日本代表が勝利」。もう1個、「〇〇県で震度5強の地震」があると。こういうニュースは本当に時事性が強いので、例えばもう数時間後には誰もクリックしなくなるとか。「価値が時間減衰する」と僕たちは呼んでるんですけども、こういう推薦をしなければならないアイテムの性質があります。

これがあるので、既存のアルゴリズムが単純に適用不可能であると。例えば機械学習の教師あり学習で使うには、ログが溜まらなければいけないんですけども、ログが溜まるころにはそのニュースそのものの価値が落ちてしまっているので、単純に適用することが不可能であると。

もう1個難しい点があって、推薦システムの界隈では有名なんですけれども、内容ベースの推薦。例えば「水をよく買っているユーザーに対しては水を推薦する」というアルゴリズムは、これはニュースではけっこう難しい点がありまして。例えばここに書いてあるとおり、表面的な言葉の一致度のみで推薦をすると、けっこう質が低くなってしまいます。

具体的な例を言うと、例えば鹿島アントラーズの熱狂的なファンの方が、「鹿島アントラーズが勝利」という記事をクリックしたとします。その方に対して、「鹿島アントラーズの試合を〇〇という芸能人が観戦」というニュースを推薦したとしても、まぁクリックされないだろうと。「鹿島アントラーズ」という表面的な言葉の一致だけでは、なかなかそのアイテムの質が担保されなくて、推薦が難しいというところはあります。

今回お話するのは、そういう難しさを克服したアルゴリズムを作ったんですけども、それを実際にAWS上で構築しましたよ、という話をします。

「局所話題性」と「野次馬のダイナミクス」

大きく2つありまして、1個目は「ユーザー行動の数理モデルとその実現」ということで、ユーザーの行動を具体的にシステム上でモデリングして、それを実際にユーザーの推薦システムに使うというところの構築の話をします。もう1個が、そのユーザーのモデリングを使って、実際に記事を届けるサーバーなりデータレイクなりの話をしていきます。

今回関係するサービスですけど、こんな感じです。

S3やDynamoDBであったり、Dynamo Accelerator、Lambda、Kinesisだったり、EMRだったりします。

本題に入っていきますけれども、ユーザー行動の数理モデルとその実現、ということでお話していきます。

一旦システムの話から離れて、ユーザーの行動心理を考えてみると、みなさんは「読みたいニュース」ってどういうものでしょうか。私たちはここに書いてあるような、「Local Popularity(局所話題性)」が大事だと思っています。

これはどういうことかと言うと、例えば僕がTwitterでクリックしたくなるような記事とか話題のニュースって、僕は機械学習コミュニティの人なので、機械学習コミュニティでよくシェアされてる記事だとか、機械学習のコミュニティの中だけでけっこう流行っている記事だとか。

そういうのを「局所話題性」って僕は呼んでるんですけども、こういうのが大事であろうと。近所のコミュニティで流行してる記事が読みたいというのは、まぁ自然な発想であると思われます。

これは現実の社会でもけっこう起こっていて、例えば近くで大きな事故やニュースが起こったとすると、野次馬ができますよね。それで、野次馬を見た人はその中でなにが起こっているのか知りたい。これを僕は「野次馬のダイナミクス」と呼んでいます。

こうした「局所話題性」とか、「野次馬のダイナミクス」と呼ばれるものを実現する数理モデルを作りたいと。それを数学的に表現するために、いろいろがんばってきました。

ニュースの推薦ってアカデミアでは、もちろんオープンなデータなんてあんまりないので、研究論文が少ないです。なのでゼロからのスタートで、単なる数理的な表現だけではダメで、それをベースに高速な推薦アルゴリズムを作らなければなりませんので、そういうプラクティカルなアルゴリズムを作んなきゃいけないという背景があります。

ニュース記事をベクトル空間に埋め込んだ後にユーザーのベクトルの作成へ

それがこのビジネス要件に現れています。

リアルタイムにユーザーの行動、「野次馬のダイナミクス」が反映されて、かつユーザーからの大量の行動ログをハンドリングしつつ、そのモデルを実現するというビジネス要件があります。

ちょっとAWSの話から遠いですけども、数理モデルの話をします。

まずここでなにをやってるかと言うと、ニュースの記事を連続なベクトル空間で埋め込みます。ここでは本田圭佑さんのニュースや、「六本木で殺人事件」というタイトルのニュースがあります。この本田圭佑さんに関する2つのニュースは、ユークリッド距離が近いように連続なベクトル空間に埋め込まれていて。「六本木で殺人事件」という、「えぇ……」って思うようなベクトルは、そこからは離れた位置になるように置かれています。

この具体的なアルゴリズムは機械学習のコアな話になるので、今日は割愛しますけれども、こんな感じでまずニュースの記事を連続なベクトル空間に埋め込みます。

そのあとに、ユーザーのベクトルを作ります。ユーザーのベクトルは、直近クリックしたM記事の重み付け平均で定義します。

実際にユーザーがクリックの行動を行うと、そのM記事の中から最も古い1記事を除いて、新しい記事のベクトルを取ってきて、重み付け平均を取り直すという操作をします。

具体的になにが起こるか見てみると、わかりやすいんですけども。ここに6人のユーザーさんがいます。

本田圭佑さんのニュースがあり、安倍首相に関するニュースがあります。それぞれクリックを行うと、こういった感じで移動して、「野次馬のダイナミクス」がこのベクトル空間上で表現されていると。

記事の重み付け平均を取ってユーザーのベクトルを作る

こんな感じで表現されました、数理モデルはできました。ただ、これを即時性を担保しながらシステム上で、AWS上で実行したい。

なので、こんな感じのアーキテクチャを作りました。これ、上がユーザーのベクトルの生成フローで、下が記事のベクトルの生成フローになっています。

上の説明をまずしますと、左からクリックのログのストリームが流れてきて、そのクリックのログをLambdaが受け取って、直近M件のクリックのログをグルーピングします。それをDynamoにプットします。プットをトリガーとして、またユーザーをベクトル化するLambdaが起動して……という感じで流れていきます。

より具体的に見ていくと、まずクリックのログの、M件のグルーピングのところを話をします。

1番クリックのログが発生してるところはクライアントのアプリケーションなので、そこからクリックに限らないすべてのログが、Kinesis Streamsに流れてきます。それをfluentdが入ったサーバーが受け取って、クリックのログのみフィルターして、また新たなクリックのログしか流れてこないKinesis Streamsにfluentdが流します。それをトリガーとしたLambdaがユーザーIDをキーにして、最近クリックしたM記事のIDが入ったリストをバリューとして持ったDynamoのレコードをプットします。

具体的にはどうやってプットするかと言うと、アップデートリクエストをします。Dynamoのレコードに対して、ユーザーIDのキーに対してアップデートリクエストすることで、直近M件のクリックログというものを保持するように、アップデートリクエストを行います。

それで、直近M件の記事のログは得られましたと。それをトリガーにして、記事の重み付け平均を取ってユーザーのベクトルを作らなければいけないので、その処理の話をします。

高精度で高速な推薦のシステムを構築したい

まず、先ほどのM件の記事が入ったDynamoDBをトリガーにして、ユーザーをベクトル化するLambdaが起動します。このLambdaは直近M件のクリックのログをストリームから受け取って、かつ裏側で別の、記事のベクトルが入ったDynamoDBにそれを取りにいって、重み付け平均を取って、ユーザーのベクトルのテーブルにプットしにいきます。

ただ、その記事のベクトルを取得しなければならないんですけども、複数のLambdaが同じ記事に関するベクトルを取りにいってしまうと、レイテンシーとコストを考えて、キャッシュしたほうがいいよねという話になります。

そこで、DAXというものを使いました。DynamoDBへのGETリクエスト……まぁGETだけではないんですけども、GETリクエストのキャッシュといえばDynamoDB Accelaratorです。

ユーザーをベクトル化する、Lambdaが使用する記事のベクトルを、DAXを挟むことでキャッシュを行います。DAXはなにをやってるかと言うと、DynamoDBの前段に論理的に置かれるインメモリキャッシュです。DynamoDBにアクセスするのとまったく同じインターフェースでDAXにアクセスすることで、インメモリキャッシュとして使用することができます。

具体的にこのLambdaとDAXがどう会話をしてるかと言うと、Lambdaは単純にDAXに対して、DynamoDBにアクセスするノリで「ベクトルある?」って聞いて、DAXはもしメモリ上になければ、裏側でDynamoDBに取りにいって、メモリに乗せてLambdaに返します。

これによって低レイテンシー・低コストで記事のベクトルをキャッシュして、ユーザーをベクトル化することができました。

今、ユーザーをベクトル化するフローの説明をしたんですけども、記事のベクトルはどう作ってるか。

これは簡単で、単純に記事を持って来るクローラーがCeleryのワーカーに記事のベクトルのもとになるものを渡して、そのワーカーがDynamoDBにプットする、ということをやっています。

これで「野次馬のダイナミクス」を、Celeryのワーカーのところは除けば、ほぼサーバーレスにユーザーをベクトル化するという処理を実現することができました。

それで、ユーザーはベクトル化できました、それを使って高速な推薦システムを作りたいですと。ユーザーの行動の数理モデルは、KinesisとDynamoDBとDAXとLambdaを使って、ほぼサーバーレスに実現することができました。Dynamoのオートスケーリングとかあるので、もう十分に高いスケーラビリティを持って実現できています。

これを用いて、高精度で高速な推薦のシステムを構築したいというモチベーションが次にあります。

推薦リスト生成=流行探し

先ほど言ったように、推薦リスト生成=流行探し。ここにアクセスしたユーザーが、この周りで流行ってる記事ってなんだろうって考えます。そういう流行ってる記事を、ユーザーに対して返すようなサーバーを構築したいですよと。

僕たちのアルゴリズムでは、記事の質、流行り度。流行、鮮度を、ベクトル空間上の密度で表現します。ここでは記事Aと記事Bの分布があります。ユーザーからリクエストがあったときに、そのベクトルの周りでの密度を計算して、その密度の高さによって記事をソートして、リストを作って返すということをします。

具体的に言うと、例えばここにベクトルを持つユーザーがリクエストしてきたとしますと、この点では記事Aのほうが密度が高いので、Bより高スコアで、Bを渡すよりはAを表示してあげたほうが喜ぶよね、というアルゴリズムになってます。

また一方で、こっちにアクセスがあったユーザーに対しては、記事Bのほうが密度が高いので、Bを返すほうがいいよねという判断でアルゴリズムが返して、リストを生成します。

ただこの密度の計算って、具体的な数式の話はしてないので曖昧なんですが、大きいです。“50msec or die.”という言葉があって。50msec以内に推薦システムはリストを返さないとダメだよね、みたいなルールが社内ではあります。

どう実現するかと言うと、これは左から右に向かって、ユーザーのリクエストからリストが返却されるまでの、リクエストの処理のフローです。

まずユーザーからリクエストがあった瞬間に、DynamoDBに先ほど作ったユーザーのベクトルをGETしにいきます。ここではキャッシュを挟まずに……なんでかと言うと、ユーザーの興味の変化の反映を早くするためにキャッシュはしません。キャッシュをせずに、必ず最初のリクエストのときに、DynamoDBにユーザーベクトルをGETしにいきます。

そのあと、推薦とはまた別の社内のロジックがあるので、ほかの必要なデータもDynamoDBから取得しにいきます。ここではキャッシュをしたりしますけども、この部分はDynamoDBのオートスケーリングによって瞬間的な高負荷にも耐えますし、けっこう高速に処理ができます。

そのあとに、メモリ上に乗せられた行列と単純に線形代数をするだけで、先ほどの密度を計算できるように数式を設計してあるので、高速なスコアリングが可能になっています。

行列のデータはだいたいEMR上で作っている

その密度計算の結果を使ってリストをソートして、レスポンスを生成して返却します。平均レスポンスタイムが今であれば、これはアプリケーションロードバランサーのメトリクスの1つのターゲットレスポンスタイムの指標ですけれども、25msec/requestでこのリスト生成を行うことができています。

さっきも言ったんですけども、その行列演算するもととなるデータが、最初からメモリに乗っています。バックグラウンドで乗せているんですけども、その行列のデータをどこから取ってきてるかと言うと、S3から取ってきます。そのS3に置かれている行列のデータはどうやって作ってるかという話を、今からします。

こちらの左側が、先ほどの世界線の話です。

推薦のAPIがあって、そのメモリ上にS3から取ってきた行列のデータが載せられています。それを使って推薦のAPIはユーザーに対してリクエストを返しています。S3から各インスタンスに各行列のデータがばらまかれていて、ユーザーからのリクエストとは非同期でメモリにノードしていて、データ操作は全部S3なのでインスタンスに対してスケールするという感じになっています。

ですが、起動時のロードのタイミングではけっこう高負荷になるので、注意が必要です。

その行列のデータは誰が作ってるかと言うと、だいたいすべてEMR上で作っています。ほぼすべて、EMRを使って作っています。

そのEMRの周りのデータレイクの話をします。EMRのSparkなりPrestoなりHiveなりが用いるデータは、すべてHive Metastoreを参照して取り扱っています。これによってほぼすべてのアプリケーションから、ほぼ共通のインターフェースでアクセスすることができます。

このHive Metastoreって、聞いたことない方もけっこうおられるかと思うので簡単におさらいしておくと、Hive Metastoreというのはテーブルの定義と……ここでHDFSと書いいますが、ここでは実際にはS3のことだと読み替えてください。

データの詳細を隠蔽してアクセスができる

テーブルの定義とS3上の実際のデータの対応に関するメタデータを保管するものを、Hive Metastoreと言います。

それを用いるクライアント、ここではSparkなりHiveなりPrestoというアプリケーションは、S3上のログの実体にアクセスする前にMetastoreから情報を取得して、こういうスキーマのデータがS3のこのパスのところにあるよね、というのをまず取得してからデータを取りにいきます。

そのメタデータそのものは、MySQL内に保持していて。具体的なテーブルの定義の例でいくと、こんな感じになっています。"users"というテーブルがあって、そのテーブルの定義があります。それがS3のこの場所にありますよね、というクリエイト文が打たれていて、実際にSparkなりHiveなりPrestoはこのスキーマを読んでS3にアクセスすることができます、と。

具体的にどういうものに使ってるかと言うと、まずSparkは、機械学習を行っているSparkがSparkSQLを使って、Hive Metastoreを経由してS3のデータにアクセスします。大規模な集計もPrestoを使ってやっているんですが、それもHive Metastoreを使ってアクセスしています。

そのもととなる中間テーブルなり、S3のデータにアクセスするためにパーティションを切らなければいけないんですけども、それもHive Metastoreを経由していっています。

サービスに関わるほぼすべてのログ、ほぼすべてのマスターデータもS3に集約していて、それに対してHive Metastoreの情報を与えておけば、実際のデータのもととなっているMySQLなりに影響を与えず、S3上のデータに対してSQL、SparkSQLなりHiveQLなりでデータにアクセスすることができます。

今言ったことをさらっとまとめてみるとこんな感じです。

データの詳細を隠蔽してアクセスができます。スキーマをあらかじめ定めておいて、それを共有しておいて、変更がある場合にもデータの実体はS3にしかないので、実体に影響を与えずスキーマを変更することができます。例えばDrop Tableでやり放題だったりします。

EMRクラスタに対する負荷は注意しなければいけない

各アプリケーションはほぼ共通のインターフェース、例えばこういうSQLでS3上のデータに対してクエリを書くことができます。

非常に高いスケーラビリティと書いてあるのは、そのデータの大元になるのがS3なので、負荷を心配する必要がほとんどないと。要するに、このデータレイクに対するwriteのリクエストというのはS3へのプットになるので、し放題ですよねという感じになっています。

ただその、実際にそのデータをこねくり回す処理をするEMRクラスタに対する負荷は、けっこう注意しなければならなくて。例えばHive Metastoreを共通利用することで、EMRの用途を分けて管理という選択も取れます。

例えば弊社でいくと、バッチ処理をするためのEMRクラスタとアドホック分析するためのEMRクラスタと、先ほどの行列を作るための推薦システム用のクラスタ。これ全部分かれていて、負荷が分散されていて。ただHive Metastoreは共通利用してるので、共通のインターフェースでアクセスできますよね、という感じになっています。

まとめです。ユーザー行動が成す力学系とそれを支えるアーキテクチャということで、Lambda、Kinesis、Dynamo、DAXをフル活用して、先ほどの「野次馬のダイナミクス」を実際にAWSのシステム上で実現することができました。

もう1個が、その高速な推薦システムを支えるデータレイクとしまして、耐久性の高い推薦API。これはDynamoDBのオートスケーリングなり、S3から非同期で推薦のためのデータをロードするなりという工夫で、耐久性の高い推薦APIになっています。

もう1個が、S3とHive Metastoreを組み合わせてデータレイクを使うことで、データの詳細を隠蔽して利便性が向上すると。かつ1番後ろのバックエンドがS3なので、耐久性・可用性が高いですよねという感じになっています。

以上です。ありがとうございました。

(会場拍手)