2024.10.10
将来は卵1パックの価格が2倍に? 多くの日本人が知らない世界の新潮流、「動物福祉」とは
提供:LINE株式会社
リンクをコピー
記事をブックマーク
吉田真也氏:「LINE Messaging PlatformにおけるHBaseとKafkaのデータパイプラインとその活用例」について、吉田が発表いたします。
こちらが本日の発表内容です。はじめに「HBaseとLINE Messaging Platformにおける活用」について紹介します。そしてそのあと、本日の本題である「HBaseとKafkaのデータパイプラインとその活用例」について紹介します。
それでは、HBaseとLINE Messaging Platformにおける活用についてお話しします。はじめに、私は2018年にLINEに新卒で入社しまして、LINE Messaging PlatformのHBase Unitというチームに所属しています。
こちらの図が、LINE Messaging Platformのサーバーサイドの技術スタックになります。LINEのサーバーサイドは、複数のサーバーサイドアプリケーションで構成されており、それぞれをJavaやSpringを用いて開発しています。
各サーバーサイドアプリケーションは、データストアとして、「Redis」や「HBase」「Kafka」にアクセスします。我々HBase Unitは、LINE Messaging Platformのサーバーサイドで用いられている、HBaseクラスターの構築や運用を行っています。
また、HBaseに対してパッチの開発をしたり、アップストリームにパッチを貢献したりしています。さらに、HBaseクラスターの運用だけではなく、サーバーサイドアプリケーション内におけるHBaseへのデータアクセスロジックや、ストレージ実装などに関しても開発しています。
HBaseクラスターは、「Hadoop」や「ZooKeeper」に依存しているのですが、そうしたHBaseクラスターのためのHadoopクラスターやZooKeeperクラスターの構築や運用も行っています。モニタリングには、「Prometheus」「Grafana」「Elasticsearch」「Kibana」といったようなものを使用しており、プロビジョニングでは「Ansible」を活用しています。
我々のHBaseクラスターに、どのようなデータが保存されているかを簡単に紹介したいと思います。ここには、LINE Messaging Platformのユーザー情報や、ユーザーのデバイス情報、設定情報などが保存されています。また、ユーザー間の友だち関係や、ユーザー間のチャットやグループのメタ情報、そしてユーザーが送受信するメッセージも保存されています。
さらに、ユーザーがメッセージを送信したり受信したりすると、SEND_MESSAGEやRECEIVED_MESSAGEと呼ばれるイベント、我々はこれをトークオペレーションというふうに呼んでいますが、こうしたイベントがユーザごとに発行され、HBaseクラスターに保存されています。
ここで簡単に、HBaseのアーキテクチャについて説明します。HBaseは、先ほど申し上げたとおり、HadoopやZooKeeperに依存しています。HBaseクラスターは、奇数台のController nodeと数十や数百台といったような規模のWorker nodeで構成されます。このスライドのZKQuorumは、ZooKeeperのデーモンで、我々はController nodeでこれを動かします。
NameNode、JournalNode、DataNodeは、それぞれHadoopファイルシステムのデーモンで、NameNodeとJournalNodeをController nodeで動かし、DataNodeをWorker nodeで動かしています。Hadoopファイルシステムでは、ファイルは複数のブロックに分割され、DataNode上に保存されます。各ブロックは64メガバイトや、128メガバイトといったような単位です。
我々は、Replication Factorに3を使用しており、つまり同じブロックが3つのDataNodeに保存されています。NameNodeは、ファイルがどのブロックによって構成されていて、各ブロックがどのDataNodeに保存されているかを管理します。
HMasterとRegionServerは、HBaseのデーモンで、HMasterをController nodeで動かし、RegionServerをWorker nodeで動かしています。Regionというのは、HBaseのテーブルを水平に分割した単位で、HMasterは、各Regionを各RegionServerに割り当てます。
Clientは、アクセスしたいデータに対応したRegionが割り当てられているRegionServerにリクエストを送信します。各RegionServerは、必要に応じてDataNode上のブロックにアクセスし、Clientからのリクエストに応答いたします。
ここで、1台のWorker nodeに障害が発生してしまったとします。その場合でも、3つのレプリカのおかげで、引き続きHadoopファイルシステム上の各ブロックにはアクセスできます。そのため、障害が発生したノード上で動いていたRegionServerに割り当てられていたRegionを、ほかのRegionServerに割り当てたのち、引き続きClientからのリクエストに応答できます。
ここで、HBaseの書き込みのフローについて簡単に説明いたします。Clientは、書き込みしたいデータに対応したRegionが割り当てられているRegionServerにリクエストを送信いたします。
リクエストを受け取ったRegionServerは、Regionごとに存在するメモリ上のデータストア、memstoreを更新いたします。そして、HDFS上のWALファイル、Write Ahead Logファイルに書き込みを追記します。
このWALファイルへの書き込みに成功した段階で、データの永続化に成功したものとみなし、Clientに応答します。WALファイルへの書き込みに失敗した場合は、memstoreをロールバックします。
memstoreが一定のサイズに達するか、一定の時間が経過するごとに、このmemstoreの内容をHDFSにHFileと呼ばれるファイルフォーマットでフラッシュします。
ここで、このRegionServer Aに障害が発生した場合を考えます。その場合、HMasterは、このRegionServer Aに割り当てられていたRegion 1を別のRegionServer、ここではRegionServer Bに再び割り当てます。
先ほど言ったとおり、HDFS上のWALファイルやHFileには、引き続きアクセスが可能ですが、メモリ上のデータストアであるmemstoreの内容が失われてしまいます。そこで、RegionServer Bは、RegionServer AのWALファイルを読み込み、Region 1のmemstoreの状態を、障害が発生する直前まで復旧します。WALファイルは、このようなデータの信頼性のために用いられています。
HBaseには、クラスター間のレプリケーションの機能が標準で備わっています。ここでは、Sourceクラスターから、Destinationクラスターへのレプリケーションをセットアップしたものとし、このレプリケーションについて説明します。
Sourceクラスターでレプリケーションをセットアップした場合、各RegionServerでは、Replication Sourceと呼ばれるスレッドが起動します。このReplication Sourceは、HDFS上のWALファイルを読み込み、WALの各エントリーをReplication Endpointに渡します。
Replication Endpointは、プラガブルなかたちで提供されており、標準のReplication Endpointは、指定されたDestinationクラスターの各RegionServerにWALのエントリーを送信します。WALのエントリーを受け取ったDestinationクラスターのRegionServerは、WALのエントリーをリプレイすることによって、レプリケーションを完了させます。
DestinationクラスターのRegionServerへのリクエスト送信が失敗する場合があります。その場合、Replication Endpointが用いるHBaseクライアントのリトライ処理に加え、Replication Sourceでもリトライ処理が実装されており、Replication Endpointの処理が成功するまで、該当のWALのエントリーをReplication Endpointへと渡し続けます。
Replication Sourceは、最後にReplication Endpointの処理が成功したWALのエントリーのロケーションをReplication offsetとして、ZooKeeper上に保存します。
ここで、このRegionServer Aに障害が発生した場合を考えます。その場合、ほかのRegionServer、ここではRegionServer Bが、ZooKeeper上に保存されているAのReplication offsetを受け取り、そこからレプリケーションの処理を再開することによって、すべてのデータを漏れなくレプリケーションできるような仕組みになっています。
レプリケーションのセットアップは、HBaseの管理ツールである「HBase Shell」などを用いてセットアップでき、HBase Shellを用いる場合、add_peerコマンドによってセットアップが行えます。
ここでは、レプリケーションのIDである1と、Destinationとなるクラスターを指定しています。我々はこのレプリケーションを、Tokyo region内のプロダクションクラスターからBackupクラスターへのレプリケーション、そしてTokyo regionからOsaka regionに存在するディザスタ・リカバリ用のクラスターへのレプリケーションに使用しています。
先ほどReplication Endpointがプラガブルであるという話をしましたが、これについてもう少し詳しく説明します。ここでは、すべてのWALのエントリーをログに出力する「ReplicationEndpoint」を考えてみます。
プラガブルなReplicationEndpointを独自に定義するには、ReplicationEndpointインターフェイスを継承するクラスを定義します。そしてreplicateメソッドをオーバーライドし、この場合ではすべてのWALのエントリーをログに出力しています。replicateメソッドの戻り値は、レプリケーションの処理に成功した場合、trueを返します。
そしてこのReplicationEndpointを用いるためには、このReplicationEndpointのクラスがRegionServerなどのクラスパス上に存在する状況で、HBase Shellのadd_peerコマンドでこのようにクラス名を指定することで、ReplicationEndpointの処理を独自に定義したものに切り替えられます。
ここからは、本日の本題であります、HBaseとKafkaのパイプラインについて説明します。我々は、このパイプラインを2017年に構築いたしました。ここでは、2017年当時の状況と、このパイプラインが構築された背景から説明します。
2017年当時、我々はプロダクションのクラスターに、HBase 0.90.6というバージョンを使用していました。そして統計解析のために、statsクラスターと呼ばれるHBaseクラスターにレプリケーションをセットアップしていました。
このstatsクラスターは別のチームによって管理されており、このクラスターのHBaseのバージョンは0.94でした。我々が用いる0.90.6というバージョンは2012年にリリースされたバージョンで、2017年当時すでにコミュニティからのサポートが得られない、非常に古いバージョンとなっていました。
そこで我々は、2017年当時は比較的新しいバージョンだった1.2.5というバージョンへのマイグレーションを進行していました。マイグレーションは、1.2.5で構成される新しいクラスターを作り、アプリケーションサーバーから古いクラスターと新しいクラスター両方に書き込みます。そして、古いクラスターから新しいクラスターにデータをコピーすることによって、マイグレーションを進行しました。
ここで、古いクラスターへの書き込みを止めるためには、引き続き統計解析が行えるよう、新しいクラスターからstatsクラスターへレプリケーションをセットアップする必要がありました。ですが残念ながら、非互換性の問題により、1.2.5のオフィシャルのレプリケーションが0.94へのレプリケーションをサポートしていませんでした。
これはなぜかと言いますと、こちらがHBaseのバージョンとそのリリース年表です。2013年にリリースされた0.96というバージョンは、コミュニティでは「Singularity」と呼ばれており、通信のためのプロトコルが大幅に変更されました。
また、2015年にリリースされた1.0では、APIのクリーンアップが行われており、これらのバージョンをまたがった通信が行えませんでした。そのため、我々が用いる1.2.5から0.94へのレプリケーションがサポートされていませんでした。
statsクラスターへのレプリケーションが行えない場合、マイグレーションを完了させられません。このレプリケーションの問題を、我々はHBaseとKafkaのデータパイプラインを構築することによって解決しました。
先ほど紹介した「Pluggable replication endpoint」の機能を用い、独自に定義したReplication EndpointがWALのエントリーを独自のプロトコルでKafkaに送信します。
KafkaからWALのエントリーのデータをコンシュームしたReplayerは、0.94のHBaseクライアントとプロトコルを用い、statsクラスターへと書き込みます。
ここで簡単に、Kafkaについて説明します。Kafkaは、Pub/Subモデルのイベントストリーミングミドルウェアです。Kafkaにデータを送信するProducerは、Topicを指定してキー・バリューの形式でデータを送信いたします。
TopicはPartitionと呼ばれる複数の単位に分かれており、keyからPartitionが決定されます。Kafkaからデータを受け取るConsumerは、各Partitionをサブスクライブし、データに対して処理します。
(後半へつづく)
LINE株式会社
2024.11.13
週3日働いて年収2,000万稼ぐ元印刷屋のおじさん 好きなことだけして楽に稼ぐ3つのパターン
2024.11.11
自分の「本質的な才能」が見つかる一番簡単な質問 他者から「すごい」と思われても意外と気づかないのが才能
2024.11.13
“退職者が出た時の会社の対応”を従業員は見ている 離職防止策の前に見つめ直したい、部下との向き合い方
2024.11.12
自分の人生にプラスに働く「イライラ」は才能 自分の強みや才能につながる“良いイライラ”を見分けるポイント
2023.03.21
民間宇宙開発で高まる「飛行機とロケットの衝突」の危機...どうやって回避する?
2024.11.11
気づいたら借金、倒産して身ぐるみを剥がされる経営者 起業に「立派な動機」を求められる恐ろしさ
2024.11.11
「退職代行」を使われた管理職の本音と葛藤 メディアで話題、利用者が右肩上がり…企業が置かれている現状とは
2024.11.18
20名の会社でGoogleの採用を真似するのはもったいない 人手不足の時代における「脱能力主義」のヒント
2024.11.12
先週まで元気だったのに、突然辞める「びっくり退職」 退職代行サービスの影響も?上司と部下の“すれ違い”が起きる原因
2024.11.14
よってたかってハイリスクのビジネスモデルに仕立て上げるステークホルダー 「社会的理由」が求められる時代の起業戦略