Managed Fluentd Clusterの導入

まず複数のチームが、自分たちのログ設定を我々のManaged Fluentd Clusterに適用することを想定してみます。

あるチームが設定を適用する時に他のチームも同じように適用しようとすると、コンフリクトが発生する可能性があります。またあるチームが設定的に間違っている設定を適用しようとした時に、Fluentdのプロセスが壊れてしまってダウンタイムが発生してしまいます。当然それは、一緒にFluentdを共有しているマルチテナントクラスター上の他のチームにも影響してしまいます。

問題を整理してみます。Managed Fluentd Clusterに複数のチームが設定を入れようとすると、チーム間でコンフリクトが発生して、異常なconfigを入れることでプロセスがダウンしてしまいます。また設定を適用する時には、当然Fluentdプロセスを意識してオペレーションする必要があります。

そのため我々は、これらの課題を解決するために次のことを満たすソリューションを提供することにしました。それは、定適用前にちゃんとバリデーションされること。あるチームの設定が他のチームに影響を与えないこと。Fluentdのプロセスダウンは避けること。そしてFluentdに対して、直接オペレーションをしなくても設定を適用できるようにすることです。

そこで我々が開発したのが、Fluentd Config Operatorです。これはKubernetesのカスタムリソースと、それらを扱うオペレーターとして提供されています。今回はFluentdのログに関するオペレーターなのですが、マルチテナントでConfigを扱うケースであれば、同じように活用できると考えています。

このFluentd Config Operatorは、カスタムリソースで記述された設定を自動的にバリデーションします。そして自動的にFluentdの設定に変換して、Configmapとして出力して、もしこのConfigmapに設定更新があった時は、Fluentdにその更新を伝播します。また設定のバリデーションが失敗した場合は、そこで処理をブロックします。

設定適用の流れ

では実際に設定適用がどのようになるのかを、流れとして見ていきます。まず事前にSREチームが設定適用の対象となるFluentdを、カスタムリソースによって指定します。これによって、指定したFluentdはオペレーターの監視対象として設定されます。そしてVerdaサービスの開発者は、自分たちのロギングの設定を書いたCRD、LoggingPipelineをアプライします。

LoggingPipelineカスタムリソースでは、どこからログを取得して、どのようにログを処理し、そしてそれをどこに保存するのかを記載していきます。これがデプロイされると、オペレーターは変更を検知して、reconcileループを開始します。

まずカスタムリソースに記述された内容をもとに、Fluentdのconfigに変換して、バリデーション専用のConfigmapを生成します。次に、そのバリデーション専用のConfigmapをマウントした、バリデーション用のPodを立ち上げます。ここではFluentdのドライラン、もしくはプロセスがreadyになるかどうかを実際に動かして確認するかの、どちらかの方法によってバリデーションします。

バリデーションが仮に成功した場合、オペレーターはカスタムリソースの内容をFluentdの設定にコンパイルして、実際に使われるConfigmapを出力します。もともと用意されている場合は、変更内容を既存のConfigmapにマージします。そしてこのConfigmapが、更新もしくは作成されると、オペレーターはFluentdに対してその内容を通知します。

ここでは、例えばConfig変更が連続した時に、頻繁にFluentdに数値がいかないようにするために、一定時間ごとに更新をバッファリングして通知するような形式を取っています。これによって、開発者が自分たちの設定を適用しなければいけないのをカスタムリソースに記述して、それをアプライするだけになりました。

カスタムリソースの詳細

では概要を掴んだところで、より詳細な部分について少し掘っていきたいと思います。まずはカスタムリソースについての詳細です。LoggingPipelineリソースはログの取得元というのを指定しますが、これは3つのタイプをサポートしています。まずは標準出力、そしてemptyDirです。

Verdaでは、監査ログをemptyDirに出力して、アプリケーションログを標準出力するようにしています。そのため、emptyDirの変えたログファイルについても、読み取るための手段が必要でした。emptyDirは、マウントしているPod外からは数字を見ることができません。ただ、Podがスケジュールされているホスト側からは、参照できます。

ホスト側にあるemptyDirに相当するディレクトリを我々のFluentd Forwarderがマウントすることで、取得できるようにしています。そして最後に、カスタムスニペットもサポートしています。これは主に、ホストパス上に保存されたログを指定するためにサポートしています。

コンパイルについて

次にコンパイルについても軽く説明しておきます。オペレーターはLoggingPipelineリソースに記載された内容をもとに、Fluentdのconfigにコンパイルします。コンパイル時には重要なパラメータを自動的に追加したり、バッファの保存先をオペレーターにとって、強いては開発者にとって都合の良いかたちに、自動的に変換しています。

例として、例えばemptyDirに吐き出されたログを出力して、加工してElasticsearchに送るような設定を想定してみます。まずログの取得元については、emptyDirのディレクトリ名とログファイル名を指定していて、これをもとにForwarder用のConfigとしてコンパイルしています。

そして、このemptyDirのパスは、ホスト側から見えるパスに自動的に変換されています。また、すでに読み込んだログの位置をファイルに記述するような設定だったり、その他重要な設定は、このタイミングで自動的に付与しています。

そして次に、加工処理と宛先についての指定が、Aggregator用のconfigとしてコンパイルされます。加工処理や宛先への送信の処理の設定については、誤って他のチームの設定にも影響を与えてしまう可能性があるため、必ずFluentdのラベルタグでマッピングしています。

そして1つのLoggingPipelineの設定は、必ず1つのラベルタグの中に吐き出されるようになっています。こうすることによって、あるLoggingPipelineの設定は必ずそのロギングリソースの中だけにしか影響を及ぼさないようになっています。そしてAggregatorのPersistentVolumeには、必ずバッファファイルが保存されるようにパスを自動的に保管しています。

このようにコンパイルプロセスではForwarderとAggregator、それぞれ用にConfigファイルというのを出力して、その時にラベルタグでラップしたり、重要な設定というのを自動的に保管したり、バッファファイルやポジションファイルが意図した場所に保存されるように自動的に変換したりしています。

バリデーションについて

次に、バリデーションについても紹介します。実際にPodで動かす前に静的にルールを解析したり、もしくはそのあとは基本的にPodをドライランで実行して検証しています。しかしElasticsearchへの送信設定など、実際に動かさないと疎通のテストができないみたいな問題があるので、そういったものに関してはドライランではなくて実際にPodを稼働させて検証するようにしています。

以上が我々が課題解決のために提供したソリューションについての説明になります。

負荷試験や監視について

次に負荷試験や監視などについても紹介したいと思います。Verdaが扱うログはなかなかの大規模であるため、我々のソリューションが現実の負荷環境でもちゃんと動作するように検証する必要がありました。そこでまず、我々はダークローンチを行って、本番の負荷を実際の環境に影響を与えずにかけることで、検証しました。

具体的には、もともとサイドカーコンテナとして存在していたFluentdの環境には一切手を加えずに、新規で我々のManaged Fluentd Clusterを追加して、ログを送信するようにしました。本番環境では、実際に1日でだいたい250GBほどの負荷がかかるのですが、我々はこのタイミングで高負荷ならではの問題を可視化して解決することで、安全に本番に障害を起こさずにリリースを達成できました。

ここで1つ代表的な高負荷環境で直面した問題について紹介できればと思います。AggregatorノードのCPU使用率がランダムに高負荷になって、すぐに元に戻るといったような事象が発生していました。また高負荷時は、ForwarderにAckを返すことができずに、そのため何度もForwarderがログ送信をリトライするようなことが起こっていました。

そしてAggregatorは、十分なインスタンスが用意されていて、ふだんは低いCPU使用率を維持していました。こういった問題に対して我々は、愚直にFluentdのどのスレッドが高負荷なのかについて見てきました。Fluentdは仕組み上、イベントスレッドというスレッドがリクエストを受け付けて、バッファに書き込んでからAckを返すというような処理になっています。

そしてまさに、このイベントスレッドが高負荷になっていることがわかりました。またTCPのコネクションを監視してみると、高負荷時でも大量のForwarderが1つのAggregatorに接続しているわけではないことがわかりました。つまり、1つのログ送信によってリクエスト受け付け、バッファ書き込み、Ackのフローのどこかが重くなっているということです。

しかしI/Oの負荷が低いこと、また重い加工処理もないことから、一度に送られるログのサイズが大き過ぎるがゆえにログの加工に時間がかかってしまっていると推測しました。よって、このサイズを小さくして細かく分割して送ることによって、無事に解決できました。

ログが巨大で1リクエストに時間がかかってしまうと、スケールアウトしても改善ができないのですが、細かく軽処理のリクエストが大量に増えることについては、シンプルにスケールアウトすることでパフォーマンスを改善可能なので、この施策は妥当であると判断しています。

モニタリングの方法

次に、どのようにモニタリングを行っているのかというところについて紹介します。まずPrometheusを同じクラスター、同じマルチテナントなKubernetesクラスターの中に設置してFluentdやオペレーターなメトリクスを常に監視しています。またVerda内で共通で使用しているVictoriaMetricsに対して、メトリクスをリモートライドしています。

そしてVictoriaMetricsのプラグインであるVM Alertを使って、定期的にアラートルールというのをクエリして、もしマッチした場合はその結果をアラートマネージャーに通知します。そしてアラートマネージャーが、SlackやPagerDutyなどの適切なルーティング先にアラートを飛ばす、といったような構成になっています。

またクラスターの中にいるPrometheus自体も、別のクラスターの外のPrometheusによって監視されています。これによって、仮にこのクラスターの中のPrometheusがダウンしていたとしても、すぐに検知できる構成になっています。監視しているメトリクスは、CPUのメモリ使用率は基本として、プロセスダウン、Podダウン、リスタート回数だったり、ログがちゃんとFluentdから送信されているかどうかであったり、ログの取得スピードより処理するスピードがちゃんと速くなっているかどうかというところ。あとは、バッファに使用しているディスクの使用量であったり、エラー数などをモニタリングしています。

特にログの取得スピードと処理スピードについては定期的に見ていて、適宜チューニングすることでどんどんバッファが溜まって、ログが滞留しないようにしています。仮にログの処理速度が追いついていない場合、宛先のダウンによって長時間バッファリングが必要になってしまうと、その後復旧してもなかなかバッファが捌けてくれませんし、常に一定時間ログの送信が遅れ続けることが発生してしまいます。

プロジェクトの成果とリリース後に出てきた課題

最後に、このプロジェクトの成果とリリース後に出てきた課題について説明したいと思います。これら2つのソリューションを実施することによって、当初のもくろみどおり、Verdaのサービス開発者は完全にFluentdのメンテナンスから解放されて、自分たちのログ設定をLoggingPipelineリソースに記述するだけで設定を安全に適用できるようになりました。

そしてログ集計の信頼性、データ永続性、パフォーマンス面をSREで面倒を見ることによって、品質が改善されました。これに加えて、1クラスターでおよそ172個のコンテナ削減に成功していて、スケジュール効率を上げられました。

しかしリリースしてから、いくつか問題が浮上してきました。まずDockerのJSON Logドライバは、16KB以上のログについては自動的に分割してログを送信する仕様になっています。そのため、例えばJSONログをアプリケーション側で出力しているようなケースを考えた時に、分割されてしまうと、JSONとしては壊れたログがFluentdに届くことになってしまいます。

そしてFluentdのパース処理時に、エラーになって捨てられてしまうような問題が発生してしまいました。これについては、1行でやる16KB以上のログは、そもそもアプリケーション側で出力内容を減らしてもらうか、もしくは適切にアプリケーション側で分割して送ってもらうような対応方針を取っています。

また、ユーザー側のLoggingPipeline設定が間違っていてパースエラーになった場合、Fluentdプロセスダウンの影響はないのですが、ログ自体は捨てられることになってしまいます。これは、Configオペレーターのバリデーションプロセスを検知できないので、このままではユーザーが自身でエラーを検知して対処するすべがありません。

現在は、SREがそれを検知してユーザーに伝達して解決しているのですが、これではスケールするごとにSREの負担が増えてしまうので、エラーハンドリングの設定を用意しようと考えています。

例えばこのように、LoggingPipelineカスタムリソースに対して、エラー時の退避先というのを事前に定義しておきます。そして、FluentdのdeadLetterRoutingの仕組みである@ERRORラベルの中に、各LoggingPipelineカスタムリソース用の退避先の設定を自動で挿入するようにします。こうすることで、まずエラーになったログを捨てられずに保管しておくことが可能になります。

また送信先へのログの流量は、Prometheusメトリクスから出力が可能であるため、このエラー退避先にログが流れた時に、ユーザーに対して自動的に通知することが可能になります。このようにして、パースエラーに対してもログを欠損することなく、かつユーザーに即座に通知できるような仕組みを検討しています。

今後の展望

そして今後の更なる展望としては、Kafkaを使ったアーキテクチャ改善を考えています。ForwarderはKafkaに対してログを飛ばして、それに対してAggregatorはKafkaからログを取るような構成になります。社内には非常に高いサービスレベルで運用されているマネージドなKafkaが存在するため、これを活用することを考えています。

これによって何がうれしいかというと、AggregatorとForwarderの直接の依存関係が消えて、より安全で柔軟なスケールが可能になります。さらにAggregator障害時でも、Forwarderは自分自身が排出されているノードでバッファリングするのではなく、Kafka側にバッファの責務を委譲できるようになります。

さらにクラスターの外からでもログを送って、我々のAggregatorリソースを活用できます。マルチテナントクラスターのログ流量は、Verda全体の内部サービスのログ流量の10分の1程度しかありません。よって、今後より広いスコープでログの標準化を進めるためにも、このマルチテナントKubernetesクラスターの外からログを送れるようになることは、非常に重要なポイントとして認識しています。

こうすることで、例えば個人情報のマスキングであったり、ログの重要度に応じたログの送信先データストアの整理とルーティングなど、このKubernetesクラスターに閉じずに、VMやPM、物理マシンで動いているようなプロセスを含めて、Verdaサービスすべてのワークロードに対してログ集計を標準化していくことを、今まさに計画して実施をしているところです。

これで私の発表は終わります。ご清聴ありがとうございました。