データパイプラインの各構成要素

吉田真也氏:ここからは我々が構築したデータパイプラインの各構成要素についてご説明いたします。はじめにKafkaに送信するデータのプロトコルですが、HBase 0.94のクラスターへのReplayerが1.2.5のClientに依存しないように、Protocol Buffersを用いて独自に定義します。

定義したデータ構造としては、WALのメタ情報、ここではWALのエントリーやHLogKeyといったようなものです。それに加えて、HBaseの変更をあらわすCellというデータ構造を定義します。これらのデータ構造は1.2.5で用いられているプロトコルと同様です。

KafkaにWALのエントリーを送信するReplication Endpointは、独自に定義し、Kafkaの送信先であるTopicは、テーブル名とprefix、suffixを用いて決定します。

Kafkaに送信するkeyには、Regionの識別子であるEncoded region nameや、各CellごとのRowkeyなどを指定します。valueには、WALのエントリを先ほど定義したプロトコルへと変換したものを用い、Kafkaに送信します。

このKafkaReplicationEndpointのセットアップは、HBase Shellのadd_peerコマンドを用い、このように行います。先ほど定義したKafkaReplicationEndpointのクラス名を指定し、Kafkaの接続情報や、Clientの情報、そしてtopic名のprefixやsuffixを指定します。

HBase 0.94のクラスターにデータを送信するReplayerは、KafkaからWALのデータをコンシュームしてき、そのWALのエントリーを0.94のmutationへと変換し、0.94のライブラリを用い、指定されたクラスターに対して書き込みます。

我々のHBaseとKafkaのデータパイプラインは、一般には「変更データキャプチャー」といった名前で知られており、我々のパイプラインの強みとしては、データベースの変更に基づく処理を簡単に実現できたり、HBaseのレプリケーションの実装や、Kafkaのおかげで非常に高い信頼性を実現できています。

一方で留意しなければならない点としては、非同期で処理が行われるため、必然的に遅延が生じる可能性があるほか、その変更が発生したタイミングでのほかのデータに対してアクセスするためには、コンシューマーサイドでアグリゲーションが必要であったり、必要に応じてデータベースにアクセスする必要があります。

パイプラインの仕組みを用いなかった場合

我々のパイプラインの強みについて理解してもらうために、我々のパイプラインの仕組みを用いずに、同様の処理をアプリケーションサーバー内で実装する可能性を考えてみます。

その場合、HBaseへの書き込みに成功したのちに、その情報をKafkaにサーバーサイドアプリケーション内で送信するといったような実装が考えられます。この場合、いくつかの懸念点や問題点が発生します。

1つ目は、HBaseのすべての書き込みパスに対して、Kafkaへの送信のパスが追加できているかどうかを注意深く確認しなければなりません。また、Kafkaに送信が失敗した場合、その際のリトライ処理がどのようになっているか。さらにそのリトライ処理によるアプリケーションへの影響も懸念する必要があります。

また、最も解決が難しい問題点として、Kafkaにデータの送信中やリトライ処理中にアプリケーションサーバーに障害が発生した場合、すべてのHBaseの書き込みを送信できない可能性が存在します。

一方で、我々のパイプラインの仕組みを用いることで、これらの懸念点や問題点がすべて簡単に解決できています。まず、すべてのHBaseへの書き込みパスに対応するかという点ですが、これはReplication Sourceが指定されたテーブルのすべての書き込みを処理の対象とするため、単にReplicationをセットアップするだけで実現できます。

また、Kafkaに対するリトライ処理も、Kafka Clientのリトライ処理に加え、Replication Sourceに実装されているリトライ処理のおかげで、非常に信頼性が高いです。さらに短期間のKafkaの障害であれば、サービスに対してパフォーマンスの影響を及ぼさないということが確認できています。

また最後に、信頼性の問題については、冒頭で紹介したZooKeeper上にReplication offsetを保存し、RegionServerに障害が発生したとしても、ほかのRegionServerがそこからレプリケーションを再開するといったような、レプリケーションのフェイルオーバーのおかげで、RegionServerに障害が発生したとしてもすべてのHBaseの書き込みを漏れなくKafkaに送信できます。

活用例1:レプリケーションやデータのマイグレーションのための活用

ここからは、HBaseとKafkaのパイプラインの活用例について説明をいたします。我々は2017年にこのパイプラインを構築して以来、数年間にわたっていくつかのアプリケーションを開発してきました。現在では20を超えるテーブルに対してこのパイプラインが適用されており、デイリーのピークで秒間120万件のWALのエントリーをKafkaに送信しています。

ここでは、すでに開発が完了した4つの活用例をご紹介いたします。1つ目が、先ほども紹介したようなレプリケーションや、データのマイグレーションのための活用です。

先ほども紹介したように、1.2.5というバージョンから0.94のクラスターへのレプリケーションは、標準ではサポートされていませんし、また、認証が備わっていないHBaseクラスターから「Kerberos」による認証が備わったセキュアなHBaseクラスターへのレプリケーションも、標準ではサポートされていません。こうした標準ではサポートされていないレプリケーションのために、Replayerの実装を適切に行うことによって、こうしたレプリケーションも行えます。

また、HBase以外のほかのミドルウェア、データベースに対してもデータのレプリケーションやマイグレーションのためにデータを送信できます。具体的な例としては、「TiDB」という比較的新しいデータベースへの評価のために、こうしたパイプラインを用い、実データで評価するような活用もできます。

活用例2:「UserSettings」をより使いやすいように

2つ目の活用例は、もう少しサービスやアプリケーションに近い活用例です。「UserSettings」というマイクロサービスを我々は開発しています。このUserSettingsサービスは、ユーザーごとの設定項目をキー・バリューのかたちで管理できるマイクロサービスです。我々はこのサービスをメッセージングプラットフォームだけではなく、社内のほかのサービスにも開放しています。

ほかのサービスは必要に応じ、UserSettingsサービスにリクエストを送信することによって、そのユーザーの最新の設定値を取得できます。

一方でサービスによっては、ユーザーの設定が変更されたタイミングで、追加の処理をしたいといったような要望があります。こうした要望に応えるために、我々はこのパイプラインを活用しています。

HBaseのuser-settingsテーブルのWALをKafkaに送信し、そのWALをコンシュームするConsumerは、よりサービスやアプリケーションが扱いやすいようなデータフォーマットであるsettings eventに変換し、Kafkaに送信します。このsettings eventをコンシュームすることによって、ユーザーの設定の変更に応じた処理をほかのサービスでも実装できます。

活用例3:Near-realtimeでの統計解析

3つ目の活用例は、Near-realtimeでの統計解析です。LINEにおいて、1年のうちでデイリーのピークの3倍から4倍のリクエストがバーストするタイミングが存在します。それは、新年の00時00分のタイミングです。日本では「あめおめLINE」と呼ばれるような文化で知られており、この新年のタイミングで、みんながLINEで新年の挨拶をするために、こうしたリクエストのバーストが発生します。

我々は、毎年このタイミングで厳戒態勢を敷いており、さまざまなメトリクスのモニタリングをしています。その中でも重要なメトリクスの1つに、メッセージの件数があります。このメッセージの件数は非常に重要なメトリクスで、さまざまなコンポーネントの負荷がメッセージの件数に比例して上昇するためです。

そのため、このメッセージの件数を非常に高い解像度で、そしてリアルタイムに近いかたちでモニタリングしたいという要求があります。非常に高い解像度というのは、秒間や100ミリ秒ごとにといったタイムレンジです。

また、リアルタイムに近いというのは、数秒のディレイのうちにこうした正確なメトリクスが観測できるということを意味します。このメッセージの件数のメトリクスのために、我々はパイプラインを活用しています。アプリケーションサーバーは、ユーザーがメッセージを送信するとSEND_MESSAGEというイベントをHBaseのoperationテーブルに保存します。

HBaseのoperationテーブルのWALをKafkaに送信し、このSEND_MESSAGEの件数をカウントするConsumerを動かします。このConsumerは、SEND_MESSAGEのタイムスタンプをもとに、100ミリ秒といったようなバケットで件数をカウントします。そしてその件数を「Elasticsearch」へと送信し、それを我々は「Grafana」上で可視化しています。

非常に興味深いデータですので、実際のメトリクスで見てみましょう。こちらは2021年の新年のタイミングでのSEND_MESSAGEの件数です。日本や台湾、タイといったような各国が新年を迎えるタイミングで、リクエストがバーストしていることがわかります。特に日本においては、00時00分のタイミングで、およそ4倍にリクエストがバーストしているということがわかります。

日本のもう少し細かいデータを見てみると、これは11時59分50秒から00時01分までのメトリクスですが、00時00分03秒のタイミングで、およそ40万件のメッセージが送信されているといったようなことがわかります。

また、100ミリ秒といったようなタイムレンジで見ると、このようなかたちになっており、こうした正確なメトリクスが数秒といったようなディレイで、すべての開発者に共有がなされています。

活用例4:不正なユーザーの検出

4つ目の活用例に不正なユーザーの検出があります。LINE Messaging Platform上では、さまざまな不正なユーザーが存在しており、我々はさまざまな観点でこうした不正なユーザーを検出しています。

特に永続化ストレージであるHBaseという観点においては、長期間にわたって大量のデータを生成するような不正なユーザーのパターンがクリティカルです。これは、単にdisk usageの問題だけではなく、HBaseのパフォーマンスに対して、非常に悪い影響を与える可能性があるためです。HBaseのパフォーマンスが悪化すると、非常に多くのユーザーに影響が出てしまいます。

こうした長期間にわたる、大量のデータを生成する不正なユーザーの検出のために、我々はパイプラインを活用しています。不正なユーザーを検出したいテーブルのWALをKafkaに送信し、そのWALをコンシュームするConsumerは、1分や1日、2週間といったようなタイムレンジで、ユーザーごとに書き込みをカウントします。

そして、「Central Dogma」という、弊社がOSSで開発している設定リポジトリ上に保存されたペナルティの条件に従って、不正なユーザーを検出しています。

検出された不正なユーザーは、社内のPenaltyGatewayというコンポーネントへと通報され、我々のHBaseクラスター上にあるユーザーのペナルティなどのデータが保存されているテーブルに、ユーザーのペナルティが保存され、LINEのアプリケーションサーバーは、このペナルティの情報をもとに、不正なユーザーからのリクエストをブロックします。

今後検討されている活用例1:Secondary index

我々は今後も、このHBaseとKafkaのデータパイプラインの活用を広げていくというような考えがあります。ここではSecondary indexとIncremental backupという、今後検討されている活用例について紹介いたします。Secondary indexはMaterialized viewという名前でも知られています。

まず、Secondary indexについてですが、HBaseは普通のインデックス、つまり、rowやcolumnといったようなKeyからValueをルックアップするインデックスのみをサポートしています。

例えば、アリスがボブと友達になった時、アリスをKeyとし、ボブをValueとするレコードをHBase上に保存するという実装を考えます。この場合、アリスをKeyとしてボブをルックアップできますが、ボブからアリスをルックアップするということは行えません。

一方でValueからKeyをルックアップするためのSecondary indexという機能が必要になる場合があります。「Apache Phoenix」というのは、HBase上でSQLを実装するミドルウェアなのですが、このApache Phoenixは、このSecondary indexを機能の一部として提供しています。

ですがApache Phoenixは、SQLを実現するためのものなので、オーバーヘッドが存在していたり、単にSecondary indexのためだけに用いるとオーバーキルになる可能性があります。そこで我々は、こうしたSecondary indexのために、「Redis」や「Cassandra」といったようなミドルウェアを活用しています。

一方で、いくつかの理由により、HBase上でSecondary indexを実現したい場合があります。こうしたSecondary indexを実現するための要素技術として、パイプラインの活用を考えています。キー・バリュー形式のデータをもとにValueからKeyへ変換し、HBase上に書き戻します。このようにすることで、アプリケーションサーバーはValueからKeyをルックアップできます。

実際にこうした機能を実現しようと思うともっと複雑になるのですが、こうした機能の実現の要素技術のためにパイプラインを活用することを検討しています。

今後検討されている活用例2:Incremental backup

続いて、Incremental backupです。まずHBaseが提供する標準のIncremental backupについて説明します。HBaseのIncremental backupは、はじめにフルバックアップ、完全なバックアップを取得し、HDFSやAmazon S3といったようなストレージにアップロードします。

そしてCron jobなどを用い、定期的にIncremental backupを取得します。Incremental backupの取得はMapReduce jobによって行われ、WALファイルを読み込み、これをHFileへと変換します。そしてこのHFileをIncremental backupとしてストレージ上にアップロードします。

このHBaseの標準のIncremental backupには、いくつかのペインポイントが存在します。まずMapReduce jobを動かす必要があるため、クラスターに対して余分な負荷がかかる可能性があります。また、Incremental backupを取得する間、WALファイルの削除が行えず、Cron jobが起動する間隔において、すべてのWALファイルがクラスター上に残ってしまいます。そのため、Disk Usageの問題が生じる可能性があります。

また、Incremental backupからの復元においてもペインポイントが存在し、例えばこの場合では、t2のあとにバグがリリースされ、このバグがリリースされたことによって、データに損害が生じたとします。そのため、このバグがリリースされるタイミングまで、データを巻き戻したいというシナリオを考えます。

その場合、フルバックアップとt1、t2のバックアップからテーブルを復元できますが、t2からバグがリリースされるまでの正常なデータが失われてしまいます。

こうしたペインポイントを解決するために、我々はパイプラインを使用することを検討しています。どのようにIncremental backupを取得するかを説明します。

まず完全なバックアップとなるスナップショットを取得し、ストレージ上にアップロードします。そしてWALをコンシュームするConsumerが適宜ストレージ上にWALをアップロードします。より高速なデータの復元のために、一定間隔ごとにWALをHFileの形式へと変換します。このようにすることで、Incremental backupを取得でき、先ほどのペインポイントを解消できます。

これらのIncremental backupからリストアする場合を考えます。このバグがリリースされたタイミングの直前までリストアすることを検討してみましょう。その場合、フルバックアップのスナップショットとt1とt2のIncremental backupからテーブルを復元し、t2からバグがリリースされるまでのWALをリプレイすることによって、データのロスを最小限に減らし、テーブルをリストアできます。

ぜひ変更データキャプチャーを

ここまでが、我々が開発をしたHBaseとKafkaのデータパイプラインと、その活用例についてでした。我々のパイプラインは、HBaseのWALや、レプリケーションといったHBaseの仕組みを用いて構築しており、LINE Messaging Platform上でこうしたパイプラインを活用しております。

このパイプラインは、データベースの変更に基づいて処理するための、非常に信頼性が高くパワフルな手法です。4つの実際の活用例と、今後検討している2つのユースケースをもとに、この変更データキャプチャーがどのような活用が行えるかということを見てきました。

ぜひみなさんのサービスでも、変更データキャプチャーを実際に実践してみてはいかがでしょうか。以上で、本日の発表を終わります。