2万rpsを処理する行動ログ収集システムをGoで作った話

潮平諒也氏:「2万rpsを処理する行動ログ収集システムをGoで作った話」と題して、発表いたします。

まず自己紹介からですね。潮平諒也と申します。現在はデータインフラ部のデータエンジニアチームで仕事しています。

高専を卒業して、去年度19新卒としてDMMに入社しました。入社したときは、全然Goの経験とかなくて、研修で触り始めて、Go歴が9ヶ月ぐらいという感じです。

今日は、今いるチームに配属されて今年の3月まで携わった「行動ログ収集システムのクラウド移行プロジェクト」についてお話しします。

このプロジェクトで、私はWebブラウザから行動ログを受け取るAPIのほうをメインで作成していました。

行動ログ収集システムの動き

行動ログ収集システムは、DMMサイトやアプリのユーザーの行動を個人にひもづかないかたちで分析用に追跡するものです。行動ログを受け取って適切なかたちに加工して、データ分析基盤へ届けるのが責務です。

行動ログ収集システムを移行する背景です。アプリケーション側の要因としては、まず古いことが挙げられます。5年前ぐらいに作られたNode.js 4のAPIとかが動いていて、デプロイに半日かかるとか、そういう使いにくい状態になっていました。

副次的な理由として、現在、行動ログ収集システムが用途別で2つ存在していて、これらを今回の移行でまとめたいという要望もあります。

次にインフラ側の要求というか背景なんですが、セッションを管理しているKVSがトラフィックが大きくなりすぎて増設できないという理由。それから、LBのSSL終端が性能上、今利用できていなくて終端サーバを別で用意しているのですが、こちらのスケールアウトも容易じゃなかったという理由もあります。

こちらが旧システムの構成図です。行動ログがトラッキングAPIに送信されると、オンプレにあるKVSであるAerospikeから、セッションや経由情報を読み出して、その他の値とともにパラメータをデータに保管して、RabbitMQに渡します。これについては、後ほど説明します。

そのあとに、Erlang Consumerと呼ばれているErlangで書かれた何かがありまして、GeoIPの補完とか消費税の計算とかその他エトセトラを行なっていまして、ここを通ったあとにHadoopのほうへ取り込むというかたちになっていました。

先ほどの説明です。セッションというのがユーザーの1行動を追うための一意な値で、ユーザーがページを移動するたびにTTLが更新されていって、どこまでが1行動なのかがわかるようになっています。このTTLが長期と短期の2種類が存在しています。経由情報というのは、どのシステム経由でカートに商品が追加されて購入されたかを識別するための値です。

クラウド移行に向けて

今回のクラウド移行で解決したいことについてです。今回はトラッキング対象の増加に対応できるようなパフォーマンスが求められているのと、パフォーマンスに起因して、Aerospikeと同等の性能が出るDBが求められています。それから、オンプレに存在する現状動いているセッション等のデータを移行しながら引き継いでいかなければならないという要件がありました。

この問題に対する解決策としてえ、まずパフォーマンス面での解決として、APIにGo言語を採用しました。理由としましては、コンテナネイティブであり、速度が出ると噂されていること。会社全体でGoを使っていきましょうという機運があったのも理由の1つです。また、インフラにGKEを採用していて、今後増えるクラウドに載るであろうサービスたちを同じ基盤上に載せたかったというのも理由になっています。

DBについてですが、選択肢としてDynamoDBとBigtableがありまして、今回はBigtableを採用しました。理由としましては、DynamoDBはちょっと料金が高かったためで、Bigtableを採用するにあたって、今回のシステムはGCP上に構築することが決定しました。

セッションなどのデータの引き継ぎについては、オンプレとクラウド両方に設けられたDBへ読み書きする並行稼動期間を設けて、順次データを移行していくかたちで解決しました。

クラウド化したら、こうなった

こちらが新システムの構成図です。先ほどと同じですね。行動ログの送信が行なわれたあとに、新しく作られたGolangで書かれたトラッキングAPIにまずデータが行って、そのあとにオンプレとBigtableと通信して、セッションなどのデータをもってきます。

オンプレと通信する際は、こちらもGoで作成したマイグレーションAPIというものがありまして、これがオンプレのAerospikeとやり取りして、トラッキングAPIのほうにデータをもっていきます。

データの補完が完了したあとはPub/Subにパブリッシュして、あとはDataflowなどで加工しながら、GCSにいったん保存して、そのあとにDistCpでHadoopに取り込む構成になっています。

各GCPリソースの役割としては、Pub/Subは行動ログのキューイングと一時保存です。Dataflowではいろいろやっていて、JSONからTSVへの変換とか、消費税の計算、スキーマの変換、それから単純にPub/SubからGCSへのデータの移動というのを役割としてもっています。そしてGCS自体は生データの保存とかですね。

GoでAPIを書くにあたっての工夫

みなさん、おまたせしました。Goの話です。ここまでは前置きで、自分がGoのAPIを書くにあたって工夫したところとか苦労したところについてピックアップしてお話ししようと思ったのですが、真ん中の「Interfaceに依存させてモックできるようにした」という話は、作ったあとで「ちょっと今回の発表の趣旨と違うな」ということで今回は割愛します。このあとにスライドなどが公開されるときに、気になる人はご覧ください。

なので今回は、APMについてと、パフォーマンスチューニングについてお話ししようと思います。

APIの動きを追跡するAPM

まずはAPMについてです。今回はパフォーマンスチューニングすることがわかっていたので、そのためにAPIの動きを追跡しておきたいということで、APMを導入しました。フレームワークはOpenCensusのを使っていて、Datadog Exporterを使用してDatadogから見れるようにしています。

今回使ってよかった点は、前のページの構成図にあったトラッキングAPIとマイグレーションAPI間のSpanです。1つのSpanで通信を挟んでいても貫通して追跡できることがかなりよかったなと思っています。あとは、任意のSpan時点でのAPIの状態をコードから登録して表示できるのがよかった点かなと思います。

一つひとつ解説すると、上のほうはコードになっていて、このStartSpanというのを始めたときからspan.End()までの動きがこちらのUIから確認できるようになっています。このSpan自体は入れ子にできるので、各メソッドの呼び出しだとか、先ほどのマイグレーションAPIの動きなどを貫通して追うことができます。

Spanに状態を登録して表示するというところですが、アプリケーション固有の値とかをattributeに登録することで、このようにUIから逐次追うことができます。

パフォーマンスチューニングの工夫

次にパフォーマンスチューニングの話に移ります。今回のパフォーマンスチューニングにおける負荷試験環境は、Googleから提供されている方法を参考にLocustとGKE上に作成しています。

最初の負荷試験では、Podをいくら増やしても3,000rps以上数値が出なくて、このままだと横にスケールしないことがわかりました。目標は2万rps以上捌けて99パーセンタイルのリクエストに100ms以内でレスポンスすることということで、かなり遠くて、当初は絶望していました。

まずアプローチとして、APMを見ると、Bigtableへの書き込みとかPub/SubへのPublishが極端に遅いリクエストがあるのを発見したので、まず行動ログの保存やセッションの読み書きを見直しました。

試したことその1として、Bigtableの呼び出し時に最新の1データだけを取得するフィルターを追加しました。Bigtableの仕様で、同じキーに入ったレコードがGCされるまでに時間がかかります。過去の不要なデータまでかなり読み込んでいたことがわかったので、このフィルターを追加することでスキャン量の減少が見込めます。この赤線の部分ですね。

その2としまして、Pub/SubへのPublishした際の結果を非同期で受け取れるようにしました。結果を待たないことで、レスポンスの改善を図ることが目的でした。

これは改善前のコードで、赤線の部分がPub/Subへの送信部分で、送信結果の取得が緑の線の部分になっているのですが、これをこのように、ゴルーチンを使って非同期で結果だけ受け取るようにしています。

Podの偏りが原因だった

そのほかにもいろいろ試したのですが、捌けるリクエスト数や速度が改善したんですけど1万rpsぐらいだったので、目標にはほど遠いというか倍ぐらい出さないといけないという話でした。

あと相変わらず横にスケールしないという感じでチームで唸っているところに、様子を見ていたSREの方から一言あって、「Locust側のPodが偏っていませんか?」という指摘を受けて、いろいろインフラのところを見直しました。

原因がわかりまして、APIと負荷ツール側のPodの配置をうまく分散するように設定すると、あっさり2rps以上出たというオチです。PodとNodeのスケールアウトで性能が向上することも確認できました。

今回は主に負荷をかける側ですね。Podの配置が特定のNodeに偏っていたことが原因で、おそらくネットワークが詰まっていただろうと推測しています。2万出たときのスクショがこんな感じですね。2万5,000ぐらい出ています。

最終的な構成と今後の課題

その後、1Podでどれだけ性能が出るかをとりあえず改めて測定して、そのあとリニアに性能がスケールすることも確認しました。そして最終的な構成としまして、他サービスも同居することを考慮して、マシンタイプをn1-standard-8することにチームで決定しました。

それより下のマシンタイプであるn1-standard-4やn1-standard-2よりも、ネットワークの帯域幅が大きかったのも理由です。今回、性能が出なかった原因がネットワークだったので、こちらを大きくしておいたほうがいいだろうということで選定しました。最終的に今動いている構成がNode18台で、その上にPodが32台分散されています。

今後の課題として、現在、JSONのデコードがたまに遅いという現象が確認されていて、APMを見るとデコードだけで数秒かかっているリクエストが散見されます。これは原因不明で、現在確認中です。もう1つ、たまにgRPC起因のcontext canceledが発生することも確認できていますが、こちらも原因不明になっています。今後改善予定です。

APMはなにかと役に立つ

今回のまとめに入ります。今回はトラッキングシステムのAPIをGoで実装する例をみなさんに紹介しました。速いらしいという噂がされていたGoだったのですが、ちゃんと期待したパフォーマンスが出ました。

APMはなにかと役に立つので、パフォーマンスチューニング以外にも使いこなせるとかなり楽だなと思いました。それからインフラの設定はちゃんと見直すことと、あと負荷試験のときは負荷ツール側にもちゃんと目を配ってあげようという学びを得ることができました。

以上で発表を終わります。ありがとうございました。