
2025.02.12
職員一人あたり52時間の残業削減に成功 kintone導入がもたらした富士吉田市の自治体DX“変革”ハウツー
提供:LINE株式会社
リンクをコピー
記事をブックマーク
吉田真也氏:「LINE Messaging PlatformにおけるHBaseとKafkaのデータパイプラインとその活用例」について、吉田が発表いたします。
こちらが本日の発表内容です。はじめに「HBaseとLINE Messaging Platformにおける活用」について紹介します。そしてそのあと、本日の本題である「HBaseとKafkaのデータパイプラインとその活用例」について紹介します。
それでは、HBaseとLINE Messaging Platformにおける活用についてお話しします。はじめに、私は2018年にLINEに新卒で入社しまして、LINE Messaging PlatformのHBase Unitというチームに所属しています。
こちらの図が、LINE Messaging Platformのサーバーサイドの技術スタックになります。LINEのサーバーサイドは、複数のサーバーサイドアプリケーションで構成されており、それぞれをJavaやSpringを用いて開発しています。
各サーバーサイドアプリケーションは、データストアとして、「Redis」や「HBase」「Kafka」にアクセスします。我々HBase Unitは、LINE Messaging Platformのサーバーサイドで用いられている、HBaseクラスターの構築や運用を行っています。
また、HBaseに対してパッチの開発をしたり、アップストリームにパッチを貢献したりしています。さらに、HBaseクラスターの運用だけではなく、サーバーサイドアプリケーション内におけるHBaseへのデータアクセスロジックや、ストレージ実装などに関しても開発しています。
HBaseクラスターは、「Hadoop」や「ZooKeeper」に依存しているのですが、そうしたHBaseクラスターのためのHadoopクラスターやZooKeeperクラスターの構築や運用も行っています。モニタリングには、「Prometheus」「Grafana」「Elasticsearch」「Kibana」といったようなものを使用しており、プロビジョニングでは「Ansible」を活用しています。
我々のHBaseクラスターに、どのようなデータが保存されているかを簡単に紹介したいと思います。ここには、LINE Messaging Platformのユーザー情報や、ユーザーのデバイス情報、設定情報などが保存されています。また、ユーザー間の友だち関係や、ユーザー間のチャットやグループのメタ情報、そしてユーザーが送受信するメッセージも保存されています。
さらに、ユーザーがメッセージを送信したり受信したりすると、SEND_MESSAGEやRECEIVED_MESSAGEと呼ばれるイベント、我々はこれをトークオペレーションというふうに呼んでいますが、こうしたイベントがユーザごとに発行され、HBaseクラスターに保存されています。
ここで簡単に、HBaseのアーキテクチャについて説明します。HBaseは、先ほど申し上げたとおり、HadoopやZooKeeperに依存しています。HBaseクラスターは、奇数台のController nodeと数十や数百台といったような規模のWorker nodeで構成されます。このスライドのZKQuorumは、ZooKeeperのデーモンで、我々はController nodeでこれを動かします。
NameNode、JournalNode、DataNodeは、それぞれHadoopファイルシステムのデーモンで、NameNodeとJournalNodeをController nodeで動かし、DataNodeをWorker nodeで動かしています。Hadoopファイルシステムでは、ファイルは複数のブロックに分割され、DataNode上に保存されます。各ブロックは64メガバイトや、128メガバイトといったような単位です。
我々は、Replication Factorに3を使用しており、つまり同じブロックが3つのDataNodeに保存されています。NameNodeは、ファイルがどのブロックによって構成されていて、各ブロックがどのDataNodeに保存されているかを管理します。
HMasterとRegionServerは、HBaseのデーモンで、HMasterをController nodeで動かし、RegionServerをWorker nodeで動かしています。Regionというのは、HBaseのテーブルを水平に分割した単位で、HMasterは、各Regionを各RegionServerに割り当てます。
Clientは、アクセスしたいデータに対応したRegionが割り当てられているRegionServerにリクエストを送信します。各RegionServerは、必要に応じてDataNode上のブロックにアクセスし、Clientからのリクエストに応答いたします。
ここで、1台のWorker nodeに障害が発生してしまったとします。その場合でも、3つのレプリカのおかげで、引き続きHadoopファイルシステム上の各ブロックにはアクセスできます。そのため、障害が発生したノード上で動いていたRegionServerに割り当てられていたRegionを、ほかのRegionServerに割り当てたのち、引き続きClientからのリクエストに応答できます。
ここで、HBaseの書き込みのフローについて簡単に説明いたします。Clientは、書き込みしたいデータに対応したRegionが割り当てられているRegionServerにリクエストを送信いたします。
リクエストを受け取ったRegionServerは、Regionごとに存在するメモリ上のデータストア、memstoreを更新いたします。そして、HDFS上のWALファイル、Write Ahead Logファイルに書き込みを追記します。
このWALファイルへの書き込みに成功した段階で、データの永続化に成功したものとみなし、Clientに応答します。WALファイルへの書き込みに失敗した場合は、memstoreをロールバックします。
memstoreが一定のサイズに達するか、一定の時間が経過するごとに、このmemstoreの内容をHDFSにHFileと呼ばれるファイルフォーマットでフラッシュします。
ここで、このRegionServer Aに障害が発生した場合を考えます。その場合、HMasterは、このRegionServer Aに割り当てられていたRegion 1を別のRegionServer、ここではRegionServer Bに再び割り当てます。
先ほど言ったとおり、HDFS上のWALファイルやHFileには、引き続きアクセスが可能ですが、メモリ上のデータストアであるmemstoreの内容が失われてしまいます。そこで、RegionServer Bは、RegionServer AのWALファイルを読み込み、Region 1のmemstoreの状態を、障害が発生する直前まで復旧します。WALファイルは、このようなデータの信頼性のために用いられています。
HBaseには、クラスター間のレプリケーションの機能が標準で備わっています。ここでは、Sourceクラスターから、Destinationクラスターへのレプリケーションをセットアップしたものとし、このレプリケーションについて説明します。
Sourceクラスターでレプリケーションをセットアップした場合、各RegionServerでは、Replication Sourceと呼ばれるスレッドが起動します。このReplication Sourceは、HDFS上のWALファイルを読み込み、WALの各エントリーをReplication Endpointに渡します。
Replication Endpointは、プラガブルなかたちで提供されており、標準のReplication Endpointは、指定されたDestinationクラスターの各RegionServerにWALのエントリーを送信します。WALのエントリーを受け取ったDestinationクラスターのRegionServerは、WALのエントリーをリプレイすることによって、レプリケーションを完了させます。
DestinationクラスターのRegionServerへのリクエスト送信が失敗する場合があります。その場合、Replication Endpointが用いるHBaseクライアントのリトライ処理に加え、Replication Sourceでもリトライ処理が実装されており、Replication Endpointの処理が成功するまで、該当のWALのエントリーをReplication Endpointへと渡し続けます。
Replication Sourceは、最後にReplication Endpointの処理が成功したWALのエントリーのロケーションをReplication offsetとして、ZooKeeper上に保存します。
ここで、このRegionServer Aに障害が発生した場合を考えます。その場合、ほかのRegionServer、ここではRegionServer Bが、ZooKeeper上に保存されているAのReplication offsetを受け取り、そこからレプリケーションの処理を再開することによって、すべてのデータを漏れなくレプリケーションできるような仕組みになっています。
レプリケーションのセットアップは、HBaseの管理ツールである「HBase Shell」などを用いてセットアップでき、HBase Shellを用いる場合、add_peerコマンドによってセットアップが行えます。
ここでは、レプリケーションのIDである1と、Destinationとなるクラスターを指定しています。我々はこのレプリケーションを、Tokyo region内のプロダクションクラスターからBackupクラスターへのレプリケーション、そしてTokyo regionからOsaka regionに存在するディザスタ・リカバリ用のクラスターへのレプリケーションに使用しています。
先ほどReplication Endpointがプラガブルであるという話をしましたが、これについてもう少し詳しく説明します。ここでは、すべてのWALのエントリーをログに出力する「ReplicationEndpoint」を考えてみます。
プラガブルなReplicationEndpointを独自に定義するには、ReplicationEndpointインターフェイスを継承するクラスを定義します。そしてreplicateメソッドをオーバーライドし、この場合ではすべてのWALのエントリーをログに出力しています。replicateメソッドの戻り値は、レプリケーションの処理に成功した場合、trueを返します。
そしてこのReplicationEndpointを用いるためには、このReplicationEndpointのクラスがRegionServerなどのクラスパス上に存在する状況で、HBase Shellのadd_peerコマンドでこのようにクラス名を指定することで、ReplicationEndpointの処理を独自に定義したものに切り替えられます。
ここからは、本日の本題であります、HBaseとKafkaのパイプラインについて説明します。我々は、このパイプラインを2017年に構築いたしました。ここでは、2017年当時の状況と、このパイプラインが構築された背景から説明します。
2017年当時、我々はプロダクションのクラスターに、HBase 0.90.6というバージョンを使用していました。そして統計解析のために、statsクラスターと呼ばれるHBaseクラスターにレプリケーションをセットアップしていました。
このstatsクラスターは別のチームによって管理されており、このクラスターのHBaseのバージョンは0.94でした。我々が用いる0.90.6というバージョンは2012年にリリースされたバージョンで、2017年当時すでにコミュニティからのサポートが得られない、非常に古いバージョンとなっていました。
そこで我々は、2017年当時は比較的新しいバージョンだった1.2.5というバージョンへのマイグレーションを進行していました。マイグレーションは、1.2.5で構成される新しいクラスターを作り、アプリケーションサーバーから古いクラスターと新しいクラスター両方に書き込みます。そして、古いクラスターから新しいクラスターにデータをコピーすることによって、マイグレーションを進行しました。
ここで、古いクラスターへの書き込みを止めるためには、引き続き統計解析が行えるよう、新しいクラスターからstatsクラスターへレプリケーションをセットアップする必要がありました。ですが残念ながら、非互換性の問題により、1.2.5のオフィシャルのレプリケーションが0.94へのレプリケーションをサポートしていませんでした。
これはなぜかと言いますと、こちらがHBaseのバージョンとそのリリース年表です。2013年にリリースされた0.96というバージョンは、コミュニティでは「Singularity」と呼ばれており、通信のためのプロトコルが大幅に変更されました。
また、2015年にリリースされた1.0では、APIのクリーンアップが行われており、これらのバージョンをまたがった通信が行えませんでした。そのため、我々が用いる1.2.5から0.94へのレプリケーションがサポートされていませんでした。
statsクラスターへのレプリケーションが行えない場合、マイグレーションを完了させられません。このレプリケーションの問題を、我々はHBaseとKafkaのデータパイプラインを構築することによって解決しました。
先ほど紹介した「Pluggable replication endpoint」の機能を用い、独自に定義したReplication EndpointがWALのエントリーを独自のプロトコルでKafkaに送信します。
KafkaからWALのエントリーのデータをコンシュームしたReplayerは、0.94のHBaseクライアントとプロトコルを用い、statsクラスターへと書き込みます。
ここで簡単に、Kafkaについて説明します。Kafkaは、Pub/Subモデルのイベントストリーミングミドルウェアです。Kafkaにデータを送信するProducerは、Topicを指定してキー・バリューの形式でデータを送信いたします。
TopicはPartitionと呼ばれる複数の単位に分かれており、keyからPartitionが決定されます。Kafkaからデータを受け取るConsumerは、各Partitionをサブスクライブし、データに対して処理します。
(後半へつづく)
LINE株式会社
2025.02.06
すかいらーく創業者が、社長を辞めて75歳で再起業したわけ “あえて長居させるコーヒー店”の経営に込めるこだわり
PR | 2025.02.07
プロジェクトマネージャーは「無理ゲーを攻略するプレイヤー」 仕事を任せられない管理職のためのマネジメントの秘訣
2025.02.04
日本企業にありがちな「生産性の低さ」の原因 メーカーの「ちょっとした改善」で勝負が決まる仕組みの落とし穴
2025.02.05
「納得しないと動けない部下」を変える3つのステップとは マネージャーの悩みを解消する会話のテクニック
2025.02.06
落合陽一氏や松尾豊氏の研究は社会に届いているか? ひろゆき氏が語るアカデミアの課題と展望
2025.01.07
1月から始めたい「日記」を書く習慣 ビジネスパーソンにおすすめな3つの理由
2025.02.05
エンジニアとして成功するための秘訣とは? ひろゆき氏が語る、自由な働き方を叶えるアプリ開発とキャリア戦略
2025.02.03
手帳に書くだけで心が整うメンタルケアのコツ イライラ、モヤモヤ、落ち込んだ時の手帳の使い方
2025.02.03
「昔は富豪的プログラミングなんてできなかった」 21歳で「2ちゃんねる」を生んだひろゆき氏が語る開発の裏側
2025.02.10
A4用紙を持ち歩いて殴り書きでアウトプット コクヨのワークスタイルコンサルタントが語る、2種類のメモ術
【手放すTALK LIVE#45】人と組織のポテンシャルが継承されるソース原理 ~人と組織のポテンシャルが花開く「ソース原理」とは~
2024.12.09 - 2024.12.09
【著者来館】『成果を上げるプレイングマネジャーは「これ」をやらない』出版記念イベント!
2025.01.10 - 2025.01.10
片付けパパ対談【特別編】 整理術×行動術×メモ術で、仕事も人生も自在にデザイン!
2024.12.16 - 2024.12.16
日本を変える 中小企業リーダーズサミット2025
2025.01.30 - 2025.02.12
【必見】納得しない部下を動かす!マネジメントの裏技とは?
2024.12.16 - 2024.12.16