2024.12.10
“放置系”なのにサイバー攻撃を監視・検知、「統合ログ管理ツール」とは 最先端のログ管理体制を実現する方法
提供:LINE株式会社
リンクをコピー
記事をブックマーク
吉田真也氏:「LINE Messaging PlatformにおけるHBaseとKafkaのデータパイプラインとその活用例」について、吉田が発表いたします。
こちらが本日の発表内容です。はじめに「HBaseとLINE Messaging Platformにおける活用」について紹介します。そしてそのあと、本日の本題である「HBaseとKafkaのデータパイプラインとその活用例」について紹介します。
それでは、HBaseとLINE Messaging Platformにおける活用についてお話しします。はじめに、私は2018年にLINEに新卒で入社しまして、LINE Messaging PlatformのHBase Unitというチームに所属しています。
こちらの図が、LINE Messaging Platformのサーバーサイドの技術スタックになります。LINEのサーバーサイドは、複数のサーバーサイドアプリケーションで構成されており、それぞれをJavaやSpringを用いて開発しています。
各サーバーサイドアプリケーションは、データストアとして、「Redis」や「HBase」「Kafka」にアクセスします。我々HBase Unitは、LINE Messaging Platformのサーバーサイドで用いられている、HBaseクラスターの構築や運用を行っています。
また、HBaseに対してパッチの開発をしたり、アップストリームにパッチを貢献したりしています。さらに、HBaseクラスターの運用だけではなく、サーバーサイドアプリケーション内におけるHBaseへのデータアクセスロジックや、ストレージ実装などに関しても開発しています。
HBaseクラスターは、「Hadoop」や「ZooKeeper」に依存しているのですが、そうしたHBaseクラスターのためのHadoopクラスターやZooKeeperクラスターの構築や運用も行っています。モニタリングには、「Prometheus」「Grafana」「Elasticsearch」「Kibana」といったようなものを使用しており、プロビジョニングでは「Ansible」を活用しています。
我々のHBaseクラスターに、どのようなデータが保存されているかを簡単に紹介したいと思います。ここには、LINE Messaging Platformのユーザー情報や、ユーザーのデバイス情報、設定情報などが保存されています。また、ユーザー間の友だち関係や、ユーザー間のチャットやグループのメタ情報、そしてユーザーが送受信するメッセージも保存されています。
さらに、ユーザーがメッセージを送信したり受信したりすると、SEND_MESSAGEやRECEIVED_MESSAGEと呼ばれるイベント、我々はこれをトークオペレーションというふうに呼んでいますが、こうしたイベントがユーザごとに発行され、HBaseクラスターに保存されています。
ここで簡単に、HBaseのアーキテクチャについて説明します。HBaseは、先ほど申し上げたとおり、HadoopやZooKeeperに依存しています。HBaseクラスターは、奇数台のController nodeと数十や数百台といったような規模のWorker nodeで構成されます。このスライドのZKQuorumは、ZooKeeperのデーモンで、我々はController nodeでこれを動かします。
NameNode、JournalNode、DataNodeは、それぞれHadoopファイルシステムのデーモンで、NameNodeとJournalNodeをController nodeで動かし、DataNodeをWorker nodeで動かしています。Hadoopファイルシステムでは、ファイルは複数のブロックに分割され、DataNode上に保存されます。各ブロックは64メガバイトや、128メガバイトといったような単位です。
我々は、Replication Factorに3を使用しており、つまり同じブロックが3つのDataNodeに保存されています。NameNodeは、ファイルがどのブロックによって構成されていて、各ブロックがどのDataNodeに保存されているかを管理します。
HMasterとRegionServerは、HBaseのデーモンで、HMasterをController nodeで動かし、RegionServerをWorker nodeで動かしています。Regionというのは、HBaseのテーブルを水平に分割した単位で、HMasterは、各Regionを各RegionServerに割り当てます。
Clientは、アクセスしたいデータに対応したRegionが割り当てられているRegionServerにリクエストを送信します。各RegionServerは、必要に応じてDataNode上のブロックにアクセスし、Clientからのリクエストに応答いたします。
ここで、1台のWorker nodeに障害が発生してしまったとします。その場合でも、3つのレプリカのおかげで、引き続きHadoopファイルシステム上の各ブロックにはアクセスできます。そのため、障害が発生したノード上で動いていたRegionServerに割り当てられていたRegionを、ほかのRegionServerに割り当てたのち、引き続きClientからのリクエストに応答できます。
ここで、HBaseの書き込みのフローについて簡単に説明いたします。Clientは、書き込みしたいデータに対応したRegionが割り当てられているRegionServerにリクエストを送信いたします。
リクエストを受け取ったRegionServerは、Regionごとに存在するメモリ上のデータストア、memstoreを更新いたします。そして、HDFS上のWALファイル、Write Ahead Logファイルに書き込みを追記します。
このWALファイルへの書き込みに成功した段階で、データの永続化に成功したものとみなし、Clientに応答します。WALファイルへの書き込みに失敗した場合は、memstoreをロールバックします。
memstoreが一定のサイズに達するか、一定の時間が経過するごとに、このmemstoreの内容をHDFSにHFileと呼ばれるファイルフォーマットでフラッシュします。
ここで、このRegionServer Aに障害が発生した場合を考えます。その場合、HMasterは、このRegionServer Aに割り当てられていたRegion 1を別のRegionServer、ここではRegionServer Bに再び割り当てます。
先ほど言ったとおり、HDFS上のWALファイルやHFileには、引き続きアクセスが可能ですが、メモリ上のデータストアであるmemstoreの内容が失われてしまいます。そこで、RegionServer Bは、RegionServer AのWALファイルを読み込み、Region 1のmemstoreの状態を、障害が発生する直前まで復旧します。WALファイルは、このようなデータの信頼性のために用いられています。
HBaseには、クラスター間のレプリケーションの機能が標準で備わっています。ここでは、Sourceクラスターから、Destinationクラスターへのレプリケーションをセットアップしたものとし、このレプリケーションについて説明します。
Sourceクラスターでレプリケーションをセットアップした場合、各RegionServerでは、Replication Sourceと呼ばれるスレッドが起動します。このReplication Sourceは、HDFS上のWALファイルを読み込み、WALの各エントリーをReplication Endpointに渡します。
Replication Endpointは、プラガブルなかたちで提供されており、標準のReplication Endpointは、指定されたDestinationクラスターの各RegionServerにWALのエントリーを送信します。WALのエントリーを受け取ったDestinationクラスターのRegionServerは、WALのエントリーをリプレイすることによって、レプリケーションを完了させます。
DestinationクラスターのRegionServerへのリクエスト送信が失敗する場合があります。その場合、Replication Endpointが用いるHBaseクライアントのリトライ処理に加え、Replication Sourceでもリトライ処理が実装されており、Replication Endpointの処理が成功するまで、該当のWALのエントリーをReplication Endpointへと渡し続けます。
Replication Sourceは、最後にReplication Endpointの処理が成功したWALのエントリーのロケーションをReplication offsetとして、ZooKeeper上に保存します。
ここで、このRegionServer Aに障害が発生した場合を考えます。その場合、ほかのRegionServer、ここではRegionServer Bが、ZooKeeper上に保存されているAのReplication offsetを受け取り、そこからレプリケーションの処理を再開することによって、すべてのデータを漏れなくレプリケーションできるような仕組みになっています。
レプリケーションのセットアップは、HBaseの管理ツールである「HBase Shell」などを用いてセットアップでき、HBase Shellを用いる場合、add_peerコマンドによってセットアップが行えます。
ここでは、レプリケーションのIDである1と、Destinationとなるクラスターを指定しています。我々はこのレプリケーションを、Tokyo region内のプロダクションクラスターからBackupクラスターへのレプリケーション、そしてTokyo regionからOsaka regionに存在するディザスタ・リカバリ用のクラスターへのレプリケーションに使用しています。
先ほどReplication Endpointがプラガブルであるという話をしましたが、これについてもう少し詳しく説明します。ここでは、すべてのWALのエントリーをログに出力する「ReplicationEndpoint」を考えてみます。
プラガブルなReplicationEndpointを独自に定義するには、ReplicationEndpointインターフェイスを継承するクラスを定義します。そしてreplicateメソッドをオーバーライドし、この場合ではすべてのWALのエントリーをログに出力しています。replicateメソッドの戻り値は、レプリケーションの処理に成功した場合、trueを返します。
そしてこのReplicationEndpointを用いるためには、このReplicationEndpointのクラスがRegionServerなどのクラスパス上に存在する状況で、HBase Shellのadd_peerコマンドでこのようにクラス名を指定することで、ReplicationEndpointの処理を独自に定義したものに切り替えられます。
ここからは、本日の本題であります、HBaseとKafkaのパイプラインについて説明します。我々は、このパイプラインを2017年に構築いたしました。ここでは、2017年当時の状況と、このパイプラインが構築された背景から説明します。
2017年当時、我々はプロダクションのクラスターに、HBase 0.90.6というバージョンを使用していました。そして統計解析のために、statsクラスターと呼ばれるHBaseクラスターにレプリケーションをセットアップしていました。
このstatsクラスターは別のチームによって管理されており、このクラスターのHBaseのバージョンは0.94でした。我々が用いる0.90.6というバージョンは2012年にリリースされたバージョンで、2017年当時すでにコミュニティからのサポートが得られない、非常に古いバージョンとなっていました。
そこで我々は、2017年当時は比較的新しいバージョンだった1.2.5というバージョンへのマイグレーションを進行していました。マイグレーションは、1.2.5で構成される新しいクラスターを作り、アプリケーションサーバーから古いクラスターと新しいクラスター両方に書き込みます。そして、古いクラスターから新しいクラスターにデータをコピーすることによって、マイグレーションを進行しました。
ここで、古いクラスターへの書き込みを止めるためには、引き続き統計解析が行えるよう、新しいクラスターからstatsクラスターへレプリケーションをセットアップする必要がありました。ですが残念ながら、非互換性の問題により、1.2.5のオフィシャルのレプリケーションが0.94へのレプリケーションをサポートしていませんでした。
これはなぜかと言いますと、こちらがHBaseのバージョンとそのリリース年表です。2013年にリリースされた0.96というバージョンは、コミュニティでは「Singularity」と呼ばれており、通信のためのプロトコルが大幅に変更されました。
また、2015年にリリースされた1.0では、APIのクリーンアップが行われており、これらのバージョンをまたがった通信が行えませんでした。そのため、我々が用いる1.2.5から0.94へのレプリケーションがサポートされていませんでした。
statsクラスターへのレプリケーションが行えない場合、マイグレーションを完了させられません。このレプリケーションの問題を、我々はHBaseとKafkaのデータパイプラインを構築することによって解決しました。
先ほど紹介した「Pluggable replication endpoint」の機能を用い、独自に定義したReplication EndpointがWALのエントリーを独自のプロトコルでKafkaに送信します。
KafkaからWALのエントリーのデータをコンシュームしたReplayerは、0.94のHBaseクライアントとプロトコルを用い、statsクラスターへと書き込みます。
ここで簡単に、Kafkaについて説明します。Kafkaは、Pub/Subモデルのイベントストリーミングミドルウェアです。Kafkaにデータを送信するProducerは、Topicを指定してキー・バリューの形式でデータを送信いたします。
TopicはPartitionと呼ばれる複数の単位に分かれており、keyからPartitionが決定されます。Kafkaからデータを受け取るConsumerは、各Partitionをサブスクライブし、データに対して処理します。
(後半へつづく)
LINE株式会社
2024.12.10
メールのラリー回数でわかる「評価されない人」の特徴 職場での評価を下げる行動5選
2024.12.09
10点満点中7点の部下に言うべきこと 部下を育成できない上司の特徴トップ5
2024.12.09
国内の有名ホテルでは、マグロ丼がなんと1杯「24,000円」 「良いものをより安く」を追いすぎた日本にとって値上げが重要な理由
2024.12.12
会議で発言しやすくなる「心理的安全性」を高めるには ファシリテーションがうまい人の3つの条件
2023.03.21
民間宇宙開発で高まる「飛行機とロケットの衝突」の危機...どうやって回避する?
2024.12.10
職場であえて「不機嫌」を出したほうがいいタイプ NOと言えない人のための人間関係をラクにするヒント
2024.12.12
今までとこれからで、エンジニアに求められる「スキル」の違い AI時代のエンジニアの未来と生存戦略のカギとは
PR | 2024.11.26
なぜ電話営業はなくならない?その要因は「属人化」 通話内容をデータ化するZoomのクラウドサービス活用術
PR | 2024.11.22
「闇雲なAI導入」から脱却せよ Zoom・パーソル・THE GUILD幹部が語る、従業員と顧客体験を高めるAI戦略の要諦
2024.12.11
大企業への転職前に感じた、「なんか違うかも」の違和感の正体 「親が喜ぶ」「モテそう」ではない、自分の判断基準を持つカギ