2024.10.10
将来は卵1パックの価格が2倍に? 多くの日本人が知らない世界の新潮流、「動物福祉」とは
提供:LINE株式会社
リンクをコピー
記事をブックマーク
吉田真也氏:「LINE Messaging PlatformにおけるHBaseとKafkaのデータパイプラインとその活用例」について、吉田が発表いたします。
こちらが本日の発表内容です。はじめに「HBaseとLINE Messaging Platformにおける活用」について紹介します。そしてそのあと、本日の本題である「HBaseとKafkaのデータパイプラインとその活用例」について紹介します。
それでは、HBaseとLINE Messaging Platformにおける活用についてお話しします。はじめに、私は2018年にLINEに新卒で入社しまして、LINE Messaging PlatformのHBase Unitというチームに所属しています。
こちらの図が、LINE Messaging Platformのサーバーサイドの技術スタックになります。LINEのサーバーサイドは、複数のサーバーサイドアプリケーションで構成されており、それぞれをJavaやSpringを用いて開発しています。
各サーバーサイドアプリケーションは、データストアとして、「Redis」や「HBase」「Kafka」にアクセスします。我々HBase Unitは、LINE Messaging Platformのサーバーサイドで用いられている、HBaseクラスターの構築や運用を行っています。
また、HBaseに対してパッチの開発をしたり、アップストリームにパッチを貢献したりしています。さらに、HBaseクラスターの運用だけではなく、サーバーサイドアプリケーション内におけるHBaseへのデータアクセスロジックや、ストレージ実装などに関しても開発しています。
HBaseクラスターは、「Hadoop」や「ZooKeeper」に依存しているのですが、そうしたHBaseクラスターのためのHadoopクラスターやZooKeeperクラスターの構築や運用も行っています。モニタリングには、「Prometheus」「Grafana」「Elasticsearch」「Kibana」といったようなものを使用しており、プロビジョニングでは「Ansible」を活用しています。
我々のHBaseクラスターに、どのようなデータが保存されているかを簡単に紹介したいと思います。ここには、LINE Messaging Platformのユーザー情報や、ユーザーのデバイス情報、設定情報などが保存されています。また、ユーザー間の友だち関係や、ユーザー間のチャットやグループのメタ情報、そしてユーザーが送受信するメッセージも保存されています。
さらに、ユーザーがメッセージを送信したり受信したりすると、SEND_MESSAGEやRECEIVED_MESSAGEと呼ばれるイベント、我々はこれをトークオペレーションというふうに呼んでいますが、こうしたイベントがユーザごとに発行され、HBaseクラスターに保存されています。
ここで簡単に、HBaseのアーキテクチャについて説明します。HBaseは、先ほど申し上げたとおり、HadoopやZooKeeperに依存しています。HBaseクラスターは、奇数台のController nodeと数十や数百台といったような規模のWorker nodeで構成されます。このスライドのZKQuorumは、ZooKeeperのデーモンで、我々はController nodeでこれを動かします。
NameNode、JournalNode、DataNodeは、それぞれHadoopファイルシステムのデーモンで、NameNodeとJournalNodeをController nodeで動かし、DataNodeをWorker nodeで動かしています。Hadoopファイルシステムでは、ファイルは複数のブロックに分割され、DataNode上に保存されます。各ブロックは64メガバイトや、128メガバイトといったような単位です。
我々は、Replication Factorに3を使用しており、つまり同じブロックが3つのDataNodeに保存されています。NameNodeは、ファイルがどのブロックによって構成されていて、各ブロックがどのDataNodeに保存されているかを管理します。
HMasterとRegionServerは、HBaseのデーモンで、HMasterをController nodeで動かし、RegionServerをWorker nodeで動かしています。Regionというのは、HBaseのテーブルを水平に分割した単位で、HMasterは、各Regionを各RegionServerに割り当てます。
Clientは、アクセスしたいデータに対応したRegionが割り当てられているRegionServerにリクエストを送信します。各RegionServerは、必要に応じてDataNode上のブロックにアクセスし、Clientからのリクエストに応答いたします。
ここで、1台のWorker nodeに障害が発生してしまったとします。その場合でも、3つのレプリカのおかげで、引き続きHadoopファイルシステム上の各ブロックにはアクセスできます。そのため、障害が発生したノード上で動いていたRegionServerに割り当てられていたRegionを、ほかのRegionServerに割り当てたのち、引き続きClientからのリクエストに応答できます。
ここで、HBaseの書き込みのフローについて簡単に説明いたします。Clientは、書き込みしたいデータに対応したRegionが割り当てられているRegionServerにリクエストを送信いたします。
リクエストを受け取ったRegionServerは、Regionごとに存在するメモリ上のデータストア、memstoreを更新いたします。そして、HDFS上のWALファイル、Write Ahead Logファイルに書き込みを追記します。
このWALファイルへの書き込みに成功した段階で、データの永続化に成功したものとみなし、Clientに応答します。WALファイルへの書き込みに失敗した場合は、memstoreをロールバックします。
memstoreが一定のサイズに達するか、一定の時間が経過するごとに、このmemstoreの内容をHDFSにHFileと呼ばれるファイルフォーマットでフラッシュします。
ここで、このRegionServer Aに障害が発生した場合を考えます。その場合、HMasterは、このRegionServer Aに割り当てられていたRegion 1を別のRegionServer、ここではRegionServer Bに再び割り当てます。
先ほど言ったとおり、HDFS上のWALファイルやHFileには、引き続きアクセスが可能ですが、メモリ上のデータストアであるmemstoreの内容が失われてしまいます。そこで、RegionServer Bは、RegionServer AのWALファイルを読み込み、Region 1のmemstoreの状態を、障害が発生する直前まで復旧します。WALファイルは、このようなデータの信頼性のために用いられています。
HBaseには、クラスター間のレプリケーションの機能が標準で備わっています。ここでは、Sourceクラスターから、Destinationクラスターへのレプリケーションをセットアップしたものとし、このレプリケーションについて説明します。
Sourceクラスターでレプリケーションをセットアップした場合、各RegionServerでは、Replication Sourceと呼ばれるスレッドが起動します。このReplication Sourceは、HDFS上のWALファイルを読み込み、WALの各エントリーをReplication Endpointに渡します。
Replication Endpointは、プラガブルなかたちで提供されており、標準のReplication Endpointは、指定されたDestinationクラスターの各RegionServerにWALのエントリーを送信します。WALのエントリーを受け取ったDestinationクラスターのRegionServerは、WALのエントリーをリプレイすることによって、レプリケーションを完了させます。
DestinationクラスターのRegionServerへのリクエスト送信が失敗する場合があります。その場合、Replication Endpointが用いるHBaseクライアントのリトライ処理に加え、Replication Sourceでもリトライ処理が実装されており、Replication Endpointの処理が成功するまで、該当のWALのエントリーをReplication Endpointへと渡し続けます。
Replication Sourceは、最後にReplication Endpointの処理が成功したWALのエントリーのロケーションをReplication offsetとして、ZooKeeper上に保存します。
ここで、このRegionServer Aに障害が発生した場合を考えます。その場合、ほかのRegionServer、ここではRegionServer Bが、ZooKeeper上に保存されているAのReplication offsetを受け取り、そこからレプリケーションの処理を再開することによって、すべてのデータを漏れなくレプリケーションできるような仕組みになっています。
レプリケーションのセットアップは、HBaseの管理ツールである「HBase Shell」などを用いてセットアップでき、HBase Shellを用いる場合、add_peerコマンドによってセットアップが行えます。
ここでは、レプリケーションのIDである1と、Destinationとなるクラスターを指定しています。我々はこのレプリケーションを、Tokyo region内のプロダクションクラスターからBackupクラスターへのレプリケーション、そしてTokyo regionからOsaka regionに存在するディザスタ・リカバリ用のクラスターへのレプリケーションに使用しています。
先ほどReplication Endpointがプラガブルであるという話をしましたが、これについてもう少し詳しく説明します。ここでは、すべてのWALのエントリーをログに出力する「ReplicationEndpoint」を考えてみます。
プラガブルなReplicationEndpointを独自に定義するには、ReplicationEndpointインターフェイスを継承するクラスを定義します。そしてreplicateメソッドをオーバーライドし、この場合ではすべてのWALのエントリーをログに出力しています。replicateメソッドの戻り値は、レプリケーションの処理に成功した場合、trueを返します。
そしてこのReplicationEndpointを用いるためには、このReplicationEndpointのクラスがRegionServerなどのクラスパス上に存在する状況で、HBase Shellのadd_peerコマンドでこのようにクラス名を指定することで、ReplicationEndpointの処理を独自に定義したものに切り替えられます。
ここからは、本日の本題であります、HBaseとKafkaのパイプラインについて説明します。我々は、このパイプラインを2017年に構築いたしました。ここでは、2017年当時の状況と、このパイプラインが構築された背景から説明します。
2017年当時、我々はプロダクションのクラスターに、HBase 0.90.6というバージョンを使用していました。そして統計解析のために、statsクラスターと呼ばれるHBaseクラスターにレプリケーションをセットアップしていました。
このstatsクラスターは別のチームによって管理されており、このクラスターのHBaseのバージョンは0.94でした。我々が用いる0.90.6というバージョンは2012年にリリースされたバージョンで、2017年当時すでにコミュニティからのサポートが得られない、非常に古いバージョンとなっていました。
そこで我々は、2017年当時は比較的新しいバージョンだった1.2.5というバージョンへのマイグレーションを進行していました。マイグレーションは、1.2.5で構成される新しいクラスターを作り、アプリケーションサーバーから古いクラスターと新しいクラスター両方に書き込みます。そして、古いクラスターから新しいクラスターにデータをコピーすることによって、マイグレーションを進行しました。
ここで、古いクラスターへの書き込みを止めるためには、引き続き統計解析が行えるよう、新しいクラスターからstatsクラスターへレプリケーションをセットアップする必要がありました。ですが残念ながら、非互換性の問題により、1.2.5のオフィシャルのレプリケーションが0.94へのレプリケーションをサポートしていませんでした。
これはなぜかと言いますと、こちらがHBaseのバージョンとそのリリース年表です。2013年にリリースされた0.96というバージョンは、コミュニティでは「Singularity」と呼ばれており、通信のためのプロトコルが大幅に変更されました。
また、2015年にリリースされた1.0では、APIのクリーンアップが行われており、これらのバージョンをまたがった通信が行えませんでした。そのため、我々が用いる1.2.5から0.94へのレプリケーションがサポートされていませんでした。
statsクラスターへのレプリケーションが行えない場合、マイグレーションを完了させられません。このレプリケーションの問題を、我々はHBaseとKafkaのデータパイプラインを構築することによって解決しました。
先ほど紹介した「Pluggable replication endpoint」の機能を用い、独自に定義したReplication EndpointがWALのエントリーを独自のプロトコルでKafkaに送信します。
KafkaからWALのエントリーのデータをコンシュームしたReplayerは、0.94のHBaseクライアントとプロトコルを用い、statsクラスターへと書き込みます。
ここで簡単に、Kafkaについて説明します。Kafkaは、Pub/Subモデルのイベントストリーミングミドルウェアです。Kafkaにデータを送信するProducerは、Topicを指定してキー・バリューの形式でデータを送信いたします。
TopicはPartitionと呼ばれる複数の単位に分かれており、keyからPartitionが決定されます。Kafkaからデータを受け取るConsumerは、各Partitionをサブスクライブし、データに対して処理します。
(後半へつづく)
LINE株式会社
2024.11.13
週3日働いて年収2,000万稼ぐ元印刷屋のおじさん 好きなことだけして楽に稼ぐ3つのパターン
2024.11.11
自分の「本質的な才能」が見つかる一番簡単な質問 他者から「すごい」と思われても意外と気づかないのが才能
2024.11.13
“退職者が出た時の会社の対応”を従業員は見ている 離職防止策の前に見つめ直したい、部下との向き合い方
2024.11.12
自分の人生にプラスに働く「イライラ」は才能 自分の強みや才能につながる“良いイライラ”を見分けるポイント
2023.03.21
民間宇宙開発で高まる「飛行機とロケットの衝突」の危機...どうやって回避する?
2024.11.11
気づいたら借金、倒産して身ぐるみを剥がされる経営者 起業に「立派な動機」を求められる恐ろしさ
2024.11.11
「退職代行」を使われた管理職の本音と葛藤 メディアで話題、利用者が右肩上がり…企業が置かれている現状とは
2024.11.18
20名の会社でGoogleの採用を真似するのはもったいない 人手不足の時代における「脱能力主義」のヒント
2024.11.12
先週まで元気だったのに、突然辞める「びっくり退職」 退職代行サービスの影響も?上司と部下の“すれ違い”が起きる原因
2024.11.14
よってたかってハイリスクのビジネスモデルに仕立て上げるステークホルダー 「社会的理由」が求められる時代の起業戦略
2024.11.13
週3日働いて年収2,000万稼ぐ元印刷屋のおじさん 好きなことだけして楽に稼ぐ3つのパターン
2024.11.11
自分の「本質的な才能」が見つかる一番簡単な質問 他者から「すごい」と思われても意外と気づかないのが才能
2024.11.13
“退職者が出た時の会社の対応”を従業員は見ている 離職防止策の前に見つめ直したい、部下との向き合い方
2024.11.12
自分の人生にプラスに働く「イライラ」は才能 自分の強みや才能につながる“良いイライラ”を見分けるポイント
2023.03.21
民間宇宙開発で高まる「飛行機とロケットの衝突」の危機...どうやって回避する?
2024.11.11
気づいたら借金、倒産して身ぐるみを剥がされる経営者 起業に「立派な動機」を求められる恐ろしさ
2024.11.11
「退職代行」を使われた管理職の本音と葛藤 メディアで話題、利用者が右肩上がり…企業が置かれている現状とは
2024.11.18
20名の会社でGoogleの採用を真似するのはもったいない 人手不足の時代における「脱能力主義」のヒント
2024.11.12
先週まで元気だったのに、突然辞める「びっくり退職」 退職代行サービスの影響も?上司と部下の“すれ違い”が起きる原因
2024.11.14
よってたかってハイリスクのビジネスモデルに仕立て上げるステークホルダー 「社会的理由」が求められる時代の起業戦略