LINEのビッグデータプラットフォーム

杜佐民(Neil Tu)氏:ご来場のみなさま、こんにちは。私はNeilと申します。

昨今、ビッグデータは我々の生活にとって重要なものとなっています。まるで新時代のエネルギー源のように、新しい価値ある情報を発掘することで、そして新しいサービスや商品を作ることで、我々の生活はより便利になります。

現在、ビッグデータにはさまざまなソリューションがあります。例えば、AmazonのAWSやGoogleのGCPなど、さまざまなクラウド環境を利用することができます。あるいは、LINEのようにオンプレミス環境を使うことができます。

このようなデータ、あるいは新しい情報の抽出には、プラットフォームが必要です。本日は、LINEがいかにして高効率なデータプラットフォームを構築しているかをご説明いたします。

まず、私の自己紹介をさせてください。

私は、データアーキテクトおよびデータエンジニアで、Hadoop分散システムと環境のエキスパートです。私は5年以上、イメージプロセシング・コンピュータービジョン・パターン認識などに携わってきました。

では、さっそく本日のテーマのお話を始めたいと思います。こちらが今日のアジェンダです。

まずはLINEのデータプラットフォームのご紹介と役割についてお話しいたします。LINEには複数のプラットフォームがありますが、本日は時間に限りがありますので、Pipeline PlatformとAnalysis Platform、つまり分析プラットフォームに話を絞りたいと思います。

最後に、エコシステムのご紹介をいたします。我々の経験をお話しすることで、みなさまのプラットフォームの改善にもつながることを願います。

まず、LINEのData Platformの話から始めます。ご存知かもしれませんが、LINEは日本・韓国・台湾・タイ・インドネシアなど、さまざまな国でたくさんのサービスをローンチしてきました。

したがって、イベントログ・取引データなど、非常に大きなデータが毎秒実行されて、作成されています。我々は、こうしたさまざまなサービスからデータを収集しています。例えば、LINEのタイムライン・LINE NEWS・LINE Pay・広告データなどです。このようなデータを集めたあと、LINEのエンジニア、あるいはデータサイエンティストが、データを活用して新しい商品を作ったり、現行のサービスの機能性を向上させたりすることができます。

いまは、システムインテグレーション・データトラッキング・データランディングを行い、データの正常化を行っております。あるいは数式を使って機械学習・ディープラーニングを利用することで、結果の分析やレコメンデーションが可能になっています。

これらが、我々がすでに利用している一般的な技術です。

ビッグデータを扱うための3つのプラットフォーム

これほどの大きなデータを扱うために、弊社には3つの主要なプラットフォームがあります。

1つ目がTracking Service Platformです。例えばみなさんがLINEアプリを利用する際、「ボタンを押した」「リンクをクリックした」などのログ、あるいはデータが「イベントログ」というかたちで収集されます。そして、このTracking Service Platformにリアルタイムで格納されます。

2つ目がPipeline Platformです。サービスエンジニア用のプラットフォームで、自分のサービスのデータ処理に使います。これは、サービスプロダクションレイヤーで使われます。

3つ目はAnalysis Platformです。名前のとおり、これはデータサイエンティストがデータを分析するために、あるいは機械学習をするために使います。

では、まずはPipeline Platformからお話しいたします。このプラットフォームは、LINEのサービスシステムにつながっています。したがって、このプラットフォームではサービスエンジニアリングのためのさまざま機能が提供されています。

こちらの3つの数字をご覧ください。

Pipeline Platformでは、現在600種類以上のデータタイプを処理しています。これらはすべてLINEサービスのものです。データの規模はおよそ30ペタバイト、最大スループットは毎秒650万メッセージ以上です。

さまざまなデータを扱っているため、データセットの管理が難しいときもあります。Pipeline Platformは、このような大規模なデータストリームのプロセシングを重視しています。

続いて、こちらをご覧ください。

開発者は、自分のデータモデルにフォーカスすることができます。新しい定義がGitHubにコミットされてマージされると、新しいモデルがそれぞれのコンポーネントにデリバリーされます。データプランナー・データサイエンティスト・エンジニアは、自動的に新しいデータモデルが利用可能となります。このプロトコルモデルには後方互換性があるため、過去のバージョンのデータモデルを使用したメッセージであっても最新のモデルを使用したコンポーネントから利用することができます。

また、LINEのオープンソースプロダクトでもあるCentral Dogmaを利用して、データフローの設定変更を動的に検知し、KafkaやFlinkのようなストリーミングコンポーネントへ新しい設定を自動的に反映させることもできます。

こちらがプロトコル化されたData Modelの一例です。

Pipeline PlatformではProtocol Bufferを使ってコンポーネント間に流れるすべてのデータモデルを表現しています。Protocol Bufferには拡張性・互換性があり、そして最も重要な使い勝手のよさが備わっています。

例としてこの8番をご覧ください。これはメッセージが実際に発生されたevent_timeです。カスタマイズされたオプションであるuse_as_timestampをオプションで指定することで、タイムスタンプのフィールドを定義することができます。これを使用することで、メッセージを処理したProcessing Timeとは別に実際にサービス側でイベントが発生した時間を記録することができ、ElasticsearchのindexやHive Partitionに利用可能です。

分析を行うためのAnalysis Platform

Analysis Platformの説明をさせていただきます。このプラットフォームは、データサイエンティストがハイレベルの分析を行うために使います。例えば、このようなデータフローで利用されます。サービスデータをコピーして、こちらのAnalysis Platformに保存します。

続いて、LINEのデータエンジニアがこのデータを処理します。例えば、データフィルタリング、クリーニング、あるいはウェアハウスへの移行などです。この処理が終わると、BIツールと接続してレポートを作成したり、ウェアハウスを提供して、データサイエンティストがデータを分析できるようにします。

最後に、同じ方法で結果をサービスサイドに戻すことができます。そして、サービスサイドはこのデータを使って商品の最適化ができます。

さらに3つの数字をご覧ください。

現在このプラットフォーム上には、Hiveテーブルが1,600以上あり、25ペタバイト以上のデータが格納されています。ユーザー数は500以上で、毎日これだけの人数に使われています。そして、この数字は毎日増え続けています。

もしかしたら「データサイエンティストは何人いるのか?」「なんでユーザーが500人以上いるのか?」と思われるかもしれません。

実はこのAnalysis Platformはオープン化されていて、LINE社員であれば誰でも利用できます。つまり、LINE社員は誰でもこのプラットフォームに個人のアカウントから接続することができます。現在、LINEのData Labsに所属しているデータサイエンティストはおよそ30人ですが、ユーザーはこのとおりです。

このプラットフォームに用いられるツールは、COGNOS・Tableau以外はすべてオープンソースです。

例えば、Hadoop HDFSやMySQLを使ってストレージをしたり、MapReduce・Sparkを使って演算を行ったり、Hive・Prestoを使うことも可能です。

また、Apache Ranger・LDAPはセキュリティ用です。そしてクラスタの管理。Airflow・Jenkinsを使ってスケジュールを管理したり。このようにさまざまなツールがありますが、streaming processing toolなどもこのようにあります。

Analysis Platformのアーキテクチャ

これがLINEのAnalysis Platformのデータ構造です。

データソースは3種類あります。

サービスのアプリの場合、ボタンのクリックあるいはページリードといったイベントログは、リアルタイムでログをとります。

このようなデータはKafkaに送られ、Kafkaを通ってElasticsearchなどに格納されます。このようにお話ししているいまも、ユーザーは、Elasticsearchにクエリを送って結果を取得したり、Kibanaと接続してレポートを作成することができます。

あるいは、Kafkaのデータを利用し、Hadoop HDFSに置くこともできます。これについては、次のスライドでお話しいたします。

データがリレーショナルデータベース……例えばMySQLやOracleのものであれば、Sqoopを使ってデータのコピーができます。あるいは別のHadoopクラスタ、HBase・MongoDB・Redisといったその他ストレージ、FTPのようなファイルシステムでは、処理にMapReduceを利用することができます。

データの準備が終わってHadoop HDFSに格納されたら、今度はData Hubに接続できます。 Data HubはREST APIでHDFSディレクトリにマッピングします。つまり、ユーザーはファイルをダウンロードしたりアップロードしたりするのに、このData Hubを使えます。非常に便利です。

あるいは、Hadoop-clientをインストールでき、Spark・MapReduce・Tezなどを使って仕事を減らすことができます。

最後はPrestoです。Prestoはクエリエンジンで、Facebookが開発したものです。これはHive メタストアを参照して、ツールを使ってこれに接続し、レポーティングができます。

データフローとFlink・NiFiを使う理由

では、Kafkaを用いたデータフローをもう少し詳しく見てみましょう。

前述のとおり、サービスデータベースからデータを取得しています。あるいはHadoopクラスタ、あるいはKafkaです。

リレーショナルデータベース、あるいはHadoopクラスタの場合、たくさんのETLバッチを使ってデータのマイグレーション、コピーを行います。バッチは、15分おきに、あるいは毎時、毎日、毎週、毎月実行できます。

Elasticsearch以外では、KafkaはFlinkとNiFiを使ってリアルタイムでデータを処理します。そこで、「なぜFlinkとNiFiの両方が必要なのか?」と疑問に思われるかもしれません。どちらもデータストリーミングをしますよね。「じゃあ、どちらかでいいんじゃないか」と思われるかもしれません。そこで、なぜLINEでは両方使っているのかをご説明いたします。

こちらにいくつかのユーザーシナリオを表示しました。

まず、Kafkaを通じてデータがElasticsearchに送られ、最後にKibanaを通ったとします。そうすると、このメソッドを使ってユーザーはリアルタイムでデータを取得できます。

しかし、前述のとおり複数のHiveテーブルがあるため、Hive JoinをいまのHiveテーブルで使うのが不便な場合があります。

続いてのシナリオです。Flinkはstreaming processing toolですが、Kafkaのメッセージをリアルタイムで利用、またはデータの処理を行います。

例えば、LINE LIVEのサービスの場合、何人がいま、この時間に動画を視聴しているかを知りたいかもしれません。そうすると、動画の下に数字が表示されていて、リアルタイムで、「+1、-2、+3、-4」と計算をしているかもしれません。このプロセスにはFlinkが非常に有効です。

そのKafkaのデータを使って、Hadoop HDFSにリアルタイムで置きたい場合、まずはソースコードを書く必要があります。そして、そのときにソースコードでファイルを開いて、ファイルにデータを書いて、ファイルを閉じて、最後にHadoop HDFSに置きます。

しかし、ユーザーによってはそこまで複雑なことをしたくないかもしれません。Hiveを通じて、データをなるべく早く読みたいだけかもしれませんし、このリアルタイムデータをほかのHiveテーブルと接続したいと思うかもしれません。ここでNiFiを使います。

ここでは、そこまでソースコードを書く必要がありません。NiFiは、リアルタイムでKafkaデータを使い、HDFS内に小さなファイルを作成します。

たくさんの小さなファイルをHadoopに書くため、この小さなファイルを15分おきにマージする必要があります。これは非常に便利で、ユーザーはHiveクエリを送るだけで、リアルタイムでデータを使えます。現在は、NiFiを使って1秒におよそ18万のデータを処理できます。

NiFiを使う利点

また、NiFiを利用する理由はほかにもあります。レイテンシが低く、スループットが高いこと。そしてSSL Encryptionに対応しているだけでなく、UIが非常に美しいのも1つの理由です。

このUIを使えば、ダイアログを書いて、接続して、基本のフローが生成されます。

2つ目の理由はセキュリティです。認証と承認機能に対応しているため、LDAPあるいはKerberosに接続できます。そして、ユーザーの権限を管理できます。例えば、ユーザーAはリードだけ、ユーザーBはさらに修正ができるといった権限を管理できます。

3つ目がローカルのバックアップです。こちらはとても重要です。NiFiはクラスタであるため、1つのノードが切れてしまった場合、ローカルバックアップを使ってプロセシングを完了させます。そして、ノードが復帰したら、また正常なプロセスに戻ります。

Hadoop上での処理とエコシステム

続いて、Hadoop上、Analysis Platformでどのようにデータを読み、処理しているのかをお話しいたします。Presto、Hadoop HDFS、Hadoop MapReduce、そしてSparkが演算エンジンとストレージです。次にご紹介するツールは、我々が作成したデータエコシステムです。

yanagishimaはPresto・Hive用のWeb UIです。

ユーザーはこれらのツールから簡単にクエリを実行することが可能です。これはLINEエンジニアによって開発されました。GitHubのURLがここにありますので、ぜひご覧ください。

yanagishimaを使い、データベース・テーブルを選択します。テーブル範囲を広げ、そしてクエリを実行するか、クエリをこのedit boxに追加します。もしくは変数をここに設定します。

また、この関数を保存したりクリップボードに入れることもできます。リフォーマットすることも可能です。UDFのレコメンデーションも提供されますので、レコメンデーションを使って、そのパラメータや変数を使い、クエリを実行することもできます。

また、結果をチャートに、もしくはヒストグラムなどに映すこともできます。そして、クエリのヒストリー・関数を同僚に送りたいという場合、ショートカットキーもありますし、このウィンドウで設定することもできます。

もう1つ、LINE Analyticsです。

これはレポーティングのツールです。このツールでは、現在のデータの状態を見ることができます。例えば、現時点でどのニュースがどれだけのページビューを得ているのか、そして、このステータスがどのように変化しているかを見ることができます。セキュリティの問題上、この機密情報・詳細情報に関しては隠しております。

そして、OASISはインタラクティブなData Analyticsツールです。

ここにソースコードを書くことができます。そして実行し、結果をチャートとして表示することも可能です。これはZeppelinというツールに似ています。

こちらに関しては、別のセッションでより詳細にお話ししますので、ぜひご参加ください。

最後がAquariumです。

こちらはデータカタログツールです。非常に多くのHiveテーブルがありますので、たまにデータがどこにあるのか、わかりづらくなってしまいます。また、どういったデータがプラットフォーム上にあるのかもわかりづらくなります。ですが、このツールがあれば、ユーザーがテーブルの状態を知ることができます。

例えば「Analysis」というテーブルがあります。そして「applog」というものもありますし、それぞれのapplogテーブルの説明があります。

さらに、定義も見ることができます。ユーザーは、こういったテーブルの構造を見ることができます。例えばlogというものがありますが、logフォーマットはJSONだということがわかります。

もう1つのよい機能としては、テーブルの系統を知ることができる点です。この例では、chat_historyはapplogテーブルに依存性を持っていることがわかります。

誰でも見られるからこそ求められるセキュリティ

続いて、セキュリティについてご紹介します。先ほども申し上げましたが、このプラットフォームはLINEの従業員全員が共有しています。よって、非常に高いレベルのセキュリティが求められます。

このプラットフォームにアクセスできたとしても、すべてのデータを読むことができるわけではありません。プラットフォーム側で各ユーザーを識別し、アクセス権をコントロールしています。

プラットフォームは、その固有のセキュリティシステムを持っており、LINEの社内のセキュリティシステムからは独立して、独自のLDAP、そしてKerberosがあります。みなさんがLINEの従業員だとしても、このセキュリティをパスできなければ、プラットフォームにアクセスすることはできません。

(このプラットフォームでは)リアルユーザー、ユーザーグループ、システムユーザー、システムグループという4つのIDが管理されています。

ユーザーフローはこのようになっています。

Gateway serverにラップトップからアクセスしたとします。そこから内部のKerberosのアクセスをパスし、クライアントサーバにアクセスします。

ここまでの認証は、Kerberosによって行っています。そのあと、クライアントサーバにアクセスすると、ここからのユーザー認証はプラットフォーム独自のLDAP、Kerberosによって管理されます。そして、アクセス許可はApache Rangerによって行われています。

すべてのIDがパスすると、Hadoopにアクセスできることになります。なぜこのような複雑なID管理が必要なのでしょうか? それは、このプラットフォームには非常に多くの機密情報があるからです。

例えば、LINE Payの情報や課金の情報です。よって、各ユーザーのアクセス権をこのように制限する必要があるのです。

プラットフォームには、いつでもアカウントの登録ができるようになっており、Web UIがありますので、ユーザーはこのUIに接続し、個人情報を見たりパスワードを見ることができます。

その確認が終わると、システムから社内のアクティブディレクトリに行き、そのユーザーの情報を取得します。例えば、社員ID、部署、Emailアドレス、名前などです。

その後、自動的に個人アカウントをLDAP上に作ります。そしてKerberos上にも作ります。そして、HDFSのユーザーホームディレクトリが同時に生成されます。

この作業が終わると、プラットフォームが利用できる状態になります。もちろん、この時点では公開されているパブリックデータ以外は読むことはできません。カントリーコードなどのパブリックデータだけです。

サービスデータを見たい場合には、ワークフローで承認を得なければいけません。プロジェクトマネージャーなどの承認を得たあと、こういったサービスデータを読むことができるようになります。

サーバとクラスタのモニタリング

セキュリティの次は、モニタリングについてお話しします。大きなクラスタ、もしくはプラットフォームを管理する上で、基本的なモニタリングが必要となります。現在、多くのツールを使用しています。

Prometheus、Ganglia、Grafanaや、ユーザーによってはSplunkを使うこともできます。これらを用いて、Java JVM、ネットワークトラフィック、そしてディスク使用率などをモニタリングしています。

本日は、別のモニタリング方法もお見せします。それは、クラスタのモニタリングです。先ほどのスライドにあったものはサーバのモニタリングですが、ここではクラスタのモニタリングについてお話しします。

ファイルのステータス、使用率、そして「小さなファイルはどこにあるのか?」「どのフィアルがEmpty filesなのか?」「誰が作成したファイルなのか?」「このファイルはいつ編集されたのか?」などといった情報をモニタリングする必要があります。

もし、大きなクラスタの管理をしたことがある方であれば、小さなファイルはクラスタにとって非常にやっかいです。なぜかというと、ネームノード(NN)の負荷が上がるからです。ネームノードがダウンすると、クラスタもダウンしてしまいます。

では、いかにしてこれらすべてのモニタリングを行っているかをお見せします。

2つのネームノードが高可用性のために存在し、そして3つのジャーナルノード(JN)があります。みなさん、こちらについてはご存じだと思います。

そして、3つ目のネームノードがあります。このネームノードはほかの2つと似ていて、メタデータについて、ジャーナルノードと同期を取っています。この3つ目のネームノードは常にStandbyです。これがアクティブになることは決してありません。

よって、この3つ目のネームノードのメタデータを使うことができます。どれだけ分析し、モニタリングしたとしても、この3つ目のネームノードが、ネームノード1、ネームノード2に影響を与えることはありません。このように、私たちはモニタリングを行っています。

ビッグデータプラットフォームを運用する上で大切なこと

また、多くの小さなファイルがHDFSクラスタにあるのは非常に危険です。なぜかというと、これは時限爆弾をクラスタ上に抱えているようなものだからです。しかし、心配しないでください。どのようにしてそれをなくすのか、そして、どのようにチューニングしているのかをお見せします。

通常、小さなファイルはネームノードの負荷を上げます。ネームノードは、すべてのファイルのロケーションを管理しなくてはいけないからです。小さなファイルが多すぎると、メタデータ、そしてfsimageのサイズが大きくなってしまいます。これは、ネームノードにとって負荷がかかりすぎます。よって、小さなファイルをできるだけ減らさなくはいけません。

5つのパラメータがあります。

これらは、spark.history、yarn.logに関連するものです。もしクラスタが大きく、非常に多くのジョブが日々実行されている場合、これらのパラメータを設定することをおすすめします。また、それほど多くのログは必要ないので、2日間分だけで十分だと思います。

もう1つがHiveの最適化です。

こちらのコンフィギュレーションによって、Hiveクエリを実行するときに小さなファイルをマージすることができます。平均ファイルサイズがこの閾値よりも小さい場合、マージの関数がトリガーされます。

また、ALTER TABLE、CONCATENATEのコマンドを使って、小さなファイルをマージすることもできます。しかし、このコマンドは現在、ORCファイル、そしてRCファイルのみで対応しています。

ここまで、私たちの経験についてお話をしました。大規模なビッグデータプラットフォームを管理している、運用しているときにはイライラすることもあるでしょう。しかし、心配しないでください。なにが必要かといいますと、忍耐力、結果をどうやって達成するかという試行錯誤、そして、諦めないことです。

私からは以上となります。ご清聴いただき、ありがとうございました。

(会場拍手)