CLOSE

LINE Messaging Platform におけるHBaseとKafkaのデータパイプラインと活用例(全2記事)

2021.11.18

Brand Topics

PR

LINEはHBaseをどのように活用しているのか LINE Messaging Platformでの取り組み

提供:LINE株式会社

2021年11月10日と11日の2日間、LINE株式会社が主催するエンジニア向け技術カンファレンス「LINE DEVELOPER DAY 2021」がオンラインで開催されました。そこで吉田真也氏が、HBaseとKafkaのデータパイプラインの活用例について紹介しました。まずはHBaseのアーキテクチャについて。

LINE Messaging Platformのサーバーサイドの技術スタック

吉田真也氏:「LINE Messaging PlatformにおけるHBaseとKafkaのデータパイプラインとその活用例」について、吉田が発表いたします。

こちらが本日の発表内容です。はじめに「HBaseとLINE Messaging Platformにおける活用」について紹介します。そしてそのあと、本日の本題である「HBaseとKafkaのデータパイプラインとその活用例」について紹介します。

それでは、HBaseとLINE Messaging Platformにおける活用についてお話しします。はじめに、私は2018年にLINEに新卒で入社しまして、LINE Messaging PlatformのHBase Unitというチームに所属しています。

こちらの図が、LINE Messaging Platformのサーバーサイドの技術スタックになります。LINEのサーバーサイドは、複数のサーバーサイドアプリケーションで構成されており、それぞれをJavaやSpringを用いて開発しています。

各サーバーサイドアプリケーションは、データストアとして、「Redis」や「HBase」「Kafka」にアクセスします。我々HBase Unitは、LINE Messaging Platformのサーバーサイドで用いられている、HBaseクラスターの構築や運用を行っています。

また、HBaseに対してパッチの開発をしたり、アップストリームにパッチを貢献したりしています。さらに、HBaseクラスターの運用だけではなく、サーバーサイドアプリケーション内におけるHBaseへのデータアクセスロジックや、ストレージ実装などに関しても開発しています。

HBaseクラスターは、「Hadoop」や「ZooKeeper」に依存しているのですが、そうしたHBaseクラスターのためのHadoopクラスターやZooKeeperクラスターの構築や運用も行っています。モニタリングには、「Prometheus」「Grafana」「Elasticsearch」「Kibana」といったようなものを使用しており、プロビジョニングでは「Ansible」を活用しています。

HBaseクラスターにどのようなデータが保存されているか

我々のHBaseクラスターに、どのようなデータが保存されているかを簡単に紹介したいと思います。ここには、LINE Messaging Platformのユーザー情報や、ユーザーのデバイス情報、設定情報などが保存されています。また、ユーザー間の友だち関係や、ユーザー間のチャットやグループのメタ情報、そしてユーザーが送受信するメッセージも保存されています。

さらに、ユーザーがメッセージを送信したり受信したりすると、SEND_MESSAGEやRECEIVED_MESSAGEと呼ばれるイベント、我々はこれをトークオペレーションというふうに呼んでいますが、こうしたイベントがユーザごとに発行され、HBaseクラスターに保存されています。

HBaseのアーキテクチャ

ここで簡単に、HBaseのアーキテクチャについて説明します。HBaseは、先ほど申し上げたとおり、HadoopやZooKeeperに依存しています。HBaseクラスターは、奇数台のController nodeと数十や数百台といったような規模のWorker nodeで構成されます。このスライドのZKQuorumは、ZooKeeperのデーモンで、我々はController nodeでこれを動かします。

NameNode、JournalNode、DataNodeは、それぞれHadoopファイルシステムのデーモンで、NameNodeとJournalNodeをController nodeで動かし、DataNodeをWorker nodeで動かしています。Hadoopファイルシステムでは、ファイルは複数のブロックに分割され、DataNode上に保存されます。各ブロックは64メガバイトや、128メガバイトといったような単位です。

我々は、Replication Factorに3を使用しており、つまり同じブロックが3つのDataNodeに保存されています。NameNodeは、ファイルがどのブロックによって構成されていて、各ブロックがどのDataNodeに保存されているかを管理します。

HMasterとRegionServerは、HBaseのデーモンで、HMasterをController nodeで動かし、RegionServerをWorker nodeで動かしています。Regionというのは、HBaseのテーブルを水平に分割した単位で、HMasterは、各Regionを各RegionServerに割り当てます。

Clientは、アクセスしたいデータに対応したRegionが割り当てられているRegionServerにリクエストを送信します。各RegionServerは、必要に応じてDataNode上のブロックにアクセスし、Clientからのリクエストに応答いたします。

ここで、1台のWorker nodeに障害が発生してしまったとします。その場合でも、3つのレプリカのおかげで、引き続きHadoopファイルシステム上の各ブロックにはアクセスできます。そのため、障害が発生したノード上で動いていたRegionServerに割り当てられていたRegionを、ほかのRegionServerに割り当てたのち、引き続きClientからのリクエストに応答できます。

HBaseの書き込みのフロー

ここで、HBaseの書き込みのフローについて簡単に説明いたします。Clientは、書き込みしたいデータに対応したRegionが割り当てられているRegionServerにリクエストを送信いたします。

リクエストを受け取ったRegionServerは、Regionごとに存在するメモリ上のデータストア、memstoreを更新いたします。そして、HDFS上のWALファイル、Write Ahead Logファイルに書き込みを追記します。

このWALファイルへの書き込みに成功した段階で、データの永続化に成功したものとみなし、Clientに応答します。WALファイルへの書き込みに失敗した場合は、memstoreをロールバックします。

memstoreが一定のサイズに達するか、一定の時間が経過するごとに、このmemstoreの内容をHDFSにHFileと呼ばれるファイルフォーマットでフラッシュします。

ここで、このRegionServer Aに障害が発生した場合を考えます。その場合、HMasterは、このRegionServer Aに割り当てられていたRegion 1を別のRegionServer、ここではRegionServer Bに再び割り当てます。

先ほど言ったとおり、HDFS上のWALファイルやHFileには、引き続きアクセスが可能ですが、メモリ上のデータストアであるmemstoreの内容が失われてしまいます。そこで、RegionServer Bは、RegionServer AのWALファイルを読み込み、Region 1のmemstoreの状態を、障害が発生する直前まで復旧します。WALファイルは、このようなデータの信頼性のために用いられています。

HBaseには、クラスター間のレプリケーションの機能が標準で備わっています。ここでは、Sourceクラスターから、Destinationクラスターへのレプリケーションをセットアップしたものとし、このレプリケーションについて説明します。

Sourceクラスターでレプリケーションをセットアップした場合、各RegionServerでは、Replication Sourceと呼ばれるスレッドが起動します。このReplication Sourceは、HDFS上のWALファイルを読み込み、WALの各エントリーをReplication Endpointに渡します。

Replication Endpointは、プラガブルなかたちで提供されており、標準のReplication Endpointは、指定されたDestinationクラスターの各RegionServerにWALのエントリーを送信します。WALのエントリーを受け取ったDestinationクラスターのRegionServerは、WALのエントリーをリプレイすることによって、レプリケーションを完了させます。

DestinationクラスターのRegionServerへのリクエスト送信が失敗する場合があります。その場合、Replication Endpointが用いるHBaseクライアントのリトライ処理に加え、Replication Sourceでもリトライ処理が実装されており、Replication Endpointの処理が成功するまで、該当のWALのエントリーをReplication Endpointへと渡し続けます。

Replication Sourceは、最後にReplication Endpointの処理が成功したWALのエントリーのロケーションをReplication offsetとして、ZooKeeper上に保存します。

ここで、このRegionServer Aに障害が発生した場合を考えます。その場合、ほかのRegionServer、ここではRegionServer Bが、ZooKeeper上に保存されているAのReplication offsetを受け取り、そこからレプリケーションの処理を再開することによって、すべてのデータを漏れなくレプリケーションできるような仕組みになっています。

レプリケーションのセットアップは、HBaseの管理ツールである「HBase Shell」などを用いてセットアップでき、HBase Shellを用いる場合、add_peerコマンドによってセットアップが行えます。

ここでは、レプリケーションのIDである1と、Destinationとなるクラスターを指定しています。我々はこのレプリケーションを、Tokyo region内のプロダクションクラスターからBackupクラスターへのレプリケーション、そしてTokyo regionからOsaka regionに存在するディザスタ・リカバリ用のクラスターへのレプリケーションに使用しています。

先ほどReplication Endpointがプラガブルであるという話をしましたが、これについてもう少し詳しく説明します。ここでは、すべてのWALのエントリーをログに出力する「ReplicationEndpoint」を考えてみます。

プラガブルなReplicationEndpointを独自に定義するには、ReplicationEndpointインターフェイスを継承するクラスを定義します。そしてreplicateメソッドをオーバーライドし、この場合ではすべてのWALのエントリーをログに出力しています。replicateメソッドの戻り値は、レプリケーションの処理に成功した場合、trueを返します。

そしてこのReplicationEndpointを用いるためには、このReplicationEndpointのクラスがRegionServerなどのクラスパス上に存在する状況で、HBase Shellのadd_peerコマンドでこのようにクラス名を指定することで、ReplicationEndpointの処理を独自に定義したものに切り替えられます。

HBaseとKafkaのパイプライン

ここからは、本日の本題であります、HBaseとKafkaのパイプラインについて説明します。我々は、このパイプラインを2017年に構築いたしました。ここでは、2017年当時の状況と、このパイプラインが構築された背景から説明します。

2017年当時、我々はプロダクションのクラスターに、HBase 0.90.6というバージョンを使用していました。そして統計解析のために、statsクラスターと呼ばれるHBaseクラスターにレプリケーションをセットアップしていました。

このstatsクラスターは別のチームによって管理されており、このクラスターのHBaseのバージョンは0.94でした。我々が用いる0.90.6というバージョンは2012年にリリースされたバージョンで、2017年当時すでにコミュニティからのサポートが得られない、非常に古いバージョンとなっていました。

そこで我々は、2017年当時は比較的新しいバージョンだった1.2.5というバージョンへのマイグレーションを進行していました。マイグレーションは、1.2.5で構成される新しいクラスターを作り、アプリケーションサーバーから古いクラスターと新しいクラスター両方に書き込みます。そして、古いクラスターから新しいクラスターにデータをコピーすることによって、マイグレーションを進行しました。

ここで、古いクラスターへの書き込みを止めるためには、引き続き統計解析が行えるよう、新しいクラスターからstatsクラスターへレプリケーションをセットアップする必要がありました。ですが残念ながら、非互換性の問題により、1.2.5のオフィシャルのレプリケーションが0.94へのレプリケーションをサポートしていませんでした。

これはなぜかと言いますと、こちらがHBaseのバージョンとそのリリース年表です。2013年にリリースされた0.96というバージョンは、コミュニティでは「Singularity」と呼ばれており、通信のためのプロトコルが大幅に変更されました。

また、2015年にリリースされた1.0では、APIのクリーンアップが行われており、これらのバージョンをまたがった通信が行えませんでした。そのため、我々が用いる1.2.5から0.94へのレプリケーションがサポートされていませんでした。

statsクラスターへのレプリケーションが行えない場合、マイグレーションを完了させられません。このレプリケーションの問題を、我々はHBaseとKafkaのデータパイプラインを構築することによって解決しました。

先ほど紹介した「Pluggable replication endpoint」の機能を用い、独自に定義したReplication EndpointがWALのエントリーを独自のプロトコルでKafkaに送信します。

KafkaからWALのエントリーのデータをコンシュームしたReplayerは、0.94のHBaseクライアントとプロトコルを用い、statsクラスターへと書き込みます。

ここで簡単に、Kafkaについて説明します。Kafkaは、Pub/Subモデルのイベントストリーミングミドルウェアです。Kafkaにデータを送信するProducerは、Topicを指定してキー・バリューの形式でデータを送信いたします。

TopicはPartitionと呼ばれる複数の単位に分かれており、keyからPartitionが決定されます。Kafkaからデータを受け取るConsumerは、各Partitionをサブスクライブし、データに対して処理します。

後半へつづく

続きを読むには会員登録
(無料)が必要です。

会員登録していただくと、すべての記事が制限なく閲覧でき、
著者フォローや記事の保存機能など、便利な機能がご利用いただけます。

無料会員登録

会員の方はこちら

LINE株式会社

関連タグ:

この記事のスピーカー

同じログの記事

コミュニティ情報

Brand Topics

Brand Topics

  • ランサムウェア攻撃後、わずか2日半でシステム復旧 名古屋港コンテナターミナルが早期復旧できた理由 

人気の記事

人気の記事

新着イベント

ログミーBusinessに
記事掲載しませんか?

イベント・インタビュー・対談 etc.

“編集しない編集”で、
スピーカーの「意図をそのまま」お届け!