「Message Event Tracking」のアーキテクチャ

岡田遥来氏:今日は今回開発した新しいイベントトラッキングシステム、内部では「Message Event Tracking」というプロジェクト名で呼ばれていたんですが、これを開発する上で直面してきた課題や、どういった工夫でそれを乗り越えてきたかについて紹介していきます。

まず、システムのアーキテクチャの全体像はこういった感じになっております。今日の流れですが、管理画面からメッセージ配信をポチッと押してみなさんのスマホに届いて、という時系列に沿ってお話ししていこうと思います。

まず1番目は「Send Messages」とある、管理画面からメッセージを配信する部分。そして2番目は、「Track Events」と書いてありますが、配信したメッセージに対するユーザーリアクションをトラッキングする部分。

そして3番目に、その届いたイベントをストリーム処理して集計していく部分。そして4番目に、バッチ処理を行って集計する部分。そして最後に、この図には記載していないですが、ストリーム処理で集計したデータとバッチ処理で集計したデータを組み合わせて最終的な値を得る部分について。この流れで説明していきます。

そして、そのあと、最後にこのシステムを土台として開発中の新しい機能についても、少しご紹介できればと思っております。

メッセージとメタデータを分離

というわけで、まず1番目、メッセージ配信を行う部分についてです。このように、管理画面からメッセージを配信してそれがみなさんのスマホのLINEアプリに届くまでにも、さまざまなコンポーネントを通過します。

まず、管理画面からメッセージ配信をポチッと押したら、MySQLに格納します。ジョブキューです。Message Delivery Serverと呼ばれるコンポーネントがいて、このコンポーネントがMySQLに溜まったジョブキューをポーリングして、送信可能と判断したメッセージを取得してきます。この取得したメッセージをもとにMessaging APIと呼ばれるAPIにリクエストを送信します。ここまではいたって素直な構成です。

そして、Messaging APIはTalk Serverと呼ばれるLINEのメッセージングのコアを担うサーバにリクエストを送り、そのTalk Serverが公式アカウントの友だちに対してメッセージを届けます。

このとき、Talk Serverにリクエストを送ると同時に、Messaging APIはメッセージに関するメタデータを別途MySQLに保存しておきます。このメタデータには統計情報の集計に後々必要になってくるさまざまな情報が含まれています。

このようにメタデータが必要な集計を行う場合、大きく2つの設計が考えられます。まず1つ目は、配信するメッセージにメタデータ自体を埋め込んでしまって、ユーザーリアクションのイベントがあったときに、そのメタデータをイベントに含めて一緒に送り返す方法です。

もう1つ考えられるのは、メッセージとメタデータは今回のように分離しておいて、ユーザーリアクションのイベントが発生したときにサーバ側でstreaming joinする方法です。

今回、私たちがとったのは後者のメッセージとメタデータを分離する方法です。なぜなら、メッセージにメタデータ自体を埋め込んで送り返すということは、イベントのペイロードが膨らむことを意味します。これはユーザー体験の悪化につながる可能性があるため、避ける必要がありました。

ただし、イベントが起こった際に集計するときにこのメタデータとイベント自体をstreaming joinする必要があるので、アーキテクチャは多少複雑化します。

ユーザーリアクションのイベント計測

みなさんのスマホにメッセージが届いたとして、次はそのLINEアプリ内でユーザーリアクションのイベントの計測を行う部分です。

メッセージ配信機能では1回のメッセージ配信という操作で複数の吹き出しを含めることができます。画面で「Broadcast」と書いているのが1回のメッセージ配信で、「Balloon」と書いているのがそれぞれの吹き出しのことです。

LINEアプリにはEvent Tracking SDKが埋め込まれていて、この吹き出しごとに詳細なメトリクスを収集していきます。「Balloon 1がインプレッションされた」「Balloon 2がクリックされた」といった具合です。

そして、SDKで収集した各種のそういったメトリクスはEvent Trackerと呼ばれるコンポーネントに送信します。Event TrackerはこれをKafkaに送信します。Kafkaに流し込んだイベントはSpark Streaming経由でHDFSに保存します。

また別のBtoBのサービスなんですが、運用型広告サービスのLINE Ads Platformにおいて安定稼働の実績がすでにあるデータパイプラインの基盤を持っていました。したがって、今回のMessage Event Trackingの開発でもこの基盤を一部利用することにしました。

続けて、Kafkaに流し込んだイベントをストリーミング処理していく部分です。Event Processorと呼ばれるKafka consumer applicationがこの役割を担っています。

Event Processorはメッセージ送信時に、先ほど分離して、別途格納したおいたメタデータとイベントを紐づけたのち、RedisとHBaseへデータを保存します。このストリーミング処理を行うEvent Processorは開発において最も工夫が必要となった箇所の1つでした。

Messaging APIで送信時に格納しておいたメタデータをMySQLからルックアップしてイベントとjoinするわけですが、ピーク時のトラフィックは50万イベント毎秒となるため、直接MySQLにクエリを投げるのは避けたい状況でした。

こういった場合、一般的にはキャッシュレイヤーを設けたりします。しかし、今回のシステムではデータの鮮度が重要で、トラフィックの特性としてもキャッシュヒット率が低く、キャッシュを行うのは困難でした。したがって、メタデータのルックアップをある程度量バッチングして、それをまとめてMySQLに問い合わせることでリクエスト数を抑える設計としました。

このようにしてメタデータとイベントを紐づけたのち、集計単位ごとにキーを分けてRedisに保存します。このRedisに保存したデータをLINE公式アカウントの管理画面から直接参照することで統計情報をリアルタイムに提供する算段です。

ユニークユーザー数を計算する仕組み

ここでまず課題となったのが「どのようにユニークユーザー数を計算するか?」ということでした。単純に「何回クリックされた」みたいなイベントが発生した回数であれば当該の集計単位のキーをインクリメントしていけばいいんですが、ユニークユーザー数の計算は一般的にCount-distinct problemと呼ばれて、素朴な方法だと効率よく解きづらい問題です。

なぜかというと、あるユーザーのイベントが飛んできたとして、ユニークユーザーの計算はそのユーザーが今まで出現したかどうかを判定しなきゃいけないわけです。ということは過去に出現していた全ユーザーIDのハッシュセット的なものを持っておかなきゃいけないわけです。なので、ユーザー数に対して線形のメモリ空間が必要になるので効率が悪い。

LINEのようなユーザーベースが大きいサービスだと、これだとメインメモリに保持できる限界を超えてしまって、リアルタイムでユニークユーザー数を提供するのが難しくなってしまいます。

幸い、RedisにはHyperLogLogというアルゴリズムが組み込みで用意されています。ここではHyperLogLogのアルゴリズムの詳細には深入りしないんですが、これはこれですごくおもしろいアルゴリズムなのですが、これはある程度の誤差を許容してユニークユーザー数をconstant time、constant spaceで計算可能な確率的アルゴリズムです。

Redisの場合は標準誤差0.81パーセントみたいな感じになるんですが、今回リアルタイムでメトリクスを提供する用途としては、この誤差は許容範囲だったので利用することにしました。

Redisの書き込みにおいてもう1つのポイントは、1回のユーザーリアクションのイベントの発生で複数のRedisオペレーションを行う必要があることです。

例えば、あるメッセージをユーザーが見てインプレッションイベントが発生した場合、ユニークユーザー数を蓄積するための「PFADD」や、積み上げのイベント数をカウントするための「INCR」て、および無期限に保存することはできないので、TTLを設定するための「EXPIRE」など、いくつものRedisのコマンドを発行する必要があります。

したがって、ここではRedisのLua Scriptingを利用して、1回のネットワークリクエストで複数のオペレーションを発行するかたちとしました。

Event Processor内で行われている処理

ここまで、Redisにリアルタイムのメトリクスを書き込んでいく部分について紹介しました。Event Processorでは、Redisに書き込むのと同時にHBaseにもデータを書いています。HBaseに書いてあるのは生イベントログです。ここで書いているのには理由が3つあります。

まず1つ目に、前述のようにリアルタイムのユニークユーザー数はHyperLogLogを使って計算しているので、誤差を含む可能性があります。これについて生イベントログをバッチ処理することで、リアルタイム性は落ちるけど正確なユニークユーザー数をあわせて提供する目的があります。

2つ目は、将来的に予定している機能のためですね。こちらは最後に少し触れたいと思いますが、あるメッセージをインプレッションしたユーザーの一覧、といったターゲティングを今後提供する予定がありました。したがって、そのときに元データとして利用するためにHBaseに生イベントログを保存しています。これが2つ目です。

そして3つ目は、Redisのデータがなんらかの理由で欠損した場合に、この生イベントログから復旧を行うためです。

以上のように、Event Processorでは、MySQLからのRead、RedisへのPutおよびHBaseへのPutのような、I/Oを伴うストリーム処理を行っています。こういった外部ストレージへのアクセスを伴う処理では処理が失敗した場合の戦略を考える必要があります。

例えば、HBaseで一時的に特定のリージョンサーバへの書き込みが集中した場合、RegionTooBusyExceptionで書き込みが失敗する可能性があります。

これに対してとったのは、別のKafkaのTopicを用意しておいて、それをRetryキューとして使うという設計です。Retryキューに入れたイベントは、一定時間のバックオフを挟んだのち、成功するまで再度もう1回consumeして処理を行います。

このRetryキューイングの実装には、LINEの大規模なKafkaのクラスタがあるんですが、そのKafkaの運用チームがメンテしている「Decaton」と呼ばれる内製のconsumerフレームワークを使っています。

なぜストリーム処理とバッチ処理を併用しているのか

ここまでがこのMessage Event Tracking Systemで行っているストリーミング処理についての解説でした。次はバッチ処理についてご紹介いたします。

まず、なぜこのようにストリーム処理とバッチ処理を併用しているのかについてです。この理由は2つあります。まず1つ目は、先ほど少し触れましたが、HBaseに保存しておいた生ログをもとにバッチ処理によって正確なユニークユーザー数を計算するためです。そしてもう1つは、HBaseに入れた値を永続的な集計値として使いたいからですね。

ストリーム処理によってRedisに集計値を入れていますが、これを無制限のTTLで保持しておいて、永続的に管理画面から参照するには大きなメモリをたくさん積んだマシンがたくさん必要となるので、効率が悪いです。

なので、ユニークユーザー数については生イベントログからMapReduceで集計して、かつ、ユニークユーザー数じゃない積み上げの値についてはLINE Ads Platformのデータライン基盤で集計した値をHBaseへインポートするというかたちにしています。

このようにユニークユーザー数の集計と分けているのには、ユニークユーザー数の集計は、スキーマを最適化したかたちでHBaseを設計しておいて、MapReduceを実行するだけで簡単に集計が効率的にできるようにしているという背景があります。

ここでユニークユーザーの集計のためにMapReduceで行っている処理自体は大変シンプルです。MapperでRowKeyごとにColumn数をカウントして、Reducerでそれを足し上げるだけの処理ですね。HBaseはスキーマレスの柔軟性を持っているので、ユーザーIDをColumnとして使うことで、単純にColumnを数えればユニークユーザーが計算できるようなスキーマ設計としています。

以上までの流れで、ストリーム処理でリアルタイム用のデータがRedisに、そしてバッチ処理によって永続参照用のデータがHBaseにというかたちで揃いました。最後に、これらを組み合わせて管理画面に表示する最終的なデータを生成する部分についてご説明いたします。

HBaseには、メッセージ配信ごとに固有のIDとその日付、「〇日の集計値」「〇日の集計値」といった日付の組をキーにして、各種のクリック数、ユニーククリック数といったメトリクスが入っています。

なので、メッセージを送信した日時から最後のバッチが回るまでのデータについてはHBaseに永続値がすでにある状態なので、まずこれを取ってきます。そして、最後に回ったバッチ以降のイベントについてはRedisから取ってきて、それを足し上げて最終的な集計値としています。

開発中の新しい機能3つ

これでアーキテクチャの解説は以上です。最後にMessage Event Tracking Systemのアーキテクチャの全体像をここに再掲しておきます。

スケーラブルかつ障害に対しても堅牢で、そしてリアルタイムでちゃんと情報を提供できるという、当初の目的を満たす設計となりました。

では、これまでが技術的な話で、最後に、このMessage Event Tracking Systemをベースに現在開発中の新しい機能について、3つ紹介いたします。

まず1つ目はMessage Retargetingと呼ばれる機能です。「Retargeting」という用語はオンライン広告をやっている方だと印象があるかもしれませんが、どういったものかというと、過去に配信したあるメッセージについて、開封やクリックみたいなリアクションをとって興味を示してくれたユーザー一覧というユーザーセグメントを作成して、そのセグメントに対してターゲティングを行って新しくメッセージを配信することで、より価値のある情報を提供できる機能です。

これはLINE Ads Platformのターゲティング基盤ともなっている「LINE DMP」と呼ばれるコンポーネントと連携して開発を行っています。

LINE DMPにこのMessage Event Tracking Systemによって計測したイベントを提供して、LINE DMPがそのイベントが設定された特定の条件に合致するかを判定して、DMPの持っているユーザーセグメントStorageへ蓄積していきます。そして、新規にLINE公式アカウントの管理画面からメッセージを配信するときに、LINE公式アカウントからLINE DMPに「ユーザーセグメントをくれ」というリクエストを送って、ターゲティングを実現します。

そして、2つ目の機能がLINE Ads Platform自体との連携です。これはLINEの提供する広告配信プラットフォームです。

Smart Channelと呼ばれるトーク一覧画面の上部にバナー形式で表示される広告を見かけることがあるかもしれませんが、そちらなどがLINE Ads Platformで配信しているものです。

どのように連携するかというと、あるLINE公式アカウントから過去に配信したメッセージについて興味を示してくれたユーザーのセグメントを作成するんですが、LINE公式アカウントのメッセージのターゲティングを行うだけじゃなくて、LINE Ads Platformの広告のターゲティングとしてもこれを使うのがこの機能です。

そしてもう1つ、これもLINE Ads Platformと関わりがあるのですが、Look Alikeと呼ばれる機能です。Look Alikeというのは広告の界隈ではオーディエンス拡張と呼ばれることもある機能です。

これは、あるユーザーセグメントがあったとき、そのユーザーセグメントに含まれるユーザーに類似した特徴を持つユーザーというセグメントを新たに作成する機能です。LINE Ads Platformでは2017年からすでに提供をしています。

このとき、その素となるセグメントのことをソースセグメントと呼びます。LINE公式アカウントから過去に配信したメッセージについて興味を示してくれたユーザーを、今度はソースセグメントとしてそのユーザーに類似したLINEユーザーという拡張を行ってLINE Ads Platformから広告を配信することで、「潜在的に興味を持ってくれる可能性があるが、まだ友だちになっていない」みたいなLINEユーザーに対してリーチすることが可能となります。

では、ここで総括させてください。これまで見てきたように、いくつかこのMessage Event Tracking Systemの開発では課題がありましたが、これらを乗り越えて、私たちはスケーラブルで堅牢なMessage Event Tracking Systemを作り上げました。さらに、このシステムをベースとしてさまざまな機能が現在開発中となっています。

ご清聴ありがとうございました。発表は以上です。

(会場拍手)