誰もが簡単にデータを楽しめるように

徐陽氏:こんにちは、徐陽と申します。私は、Data EngineeringセンターのData Platform室でソフトウェアエンジニアをしています。データパイプラインとバッチインジェスチョンの構築に、これまで5年間取り組んできました。

また、社内で使用する効率化ツールの開発も行っています。私のビジョンは、私たちが提供するサービスで、社内の必要な人が簡単に正しくデータを扱えるようになることです。

本日はまず、LINEのETLバッチの現状と問題点についてお話しします。その上で、私たちの考えをお伝えしたいと思います。その後、考察して解決策として導入したユニファイドマネージドワークフローサービスについて、目的や特徴などを含めて紹介します。最後に、今後の重要な課題についてお話しします。

ETLバッチ

ETLバッチは、当社のデータパイプラインの中でも特に重要な役割を果たしています。毎日、約10万のバッチジョブを200人以上のユーザーが実行しており、レポート、分析、機械学習などの分野で使用するために約4万のテーブルを提供しています。

Data Platform室には、バッチを扱うData ETLチームがあります。このチームは、PythonベースのETLジョブフレームワークである「Azkaban」を使っています。Data ETLチームのメンバーは、Data Platform室以外のチームとコミュニケーションを取る役割を担い、ほかのチームのためにバッチジョブを作成し、管理します。

一方、LINE公式アカウントの担当チームや、データサイエンティストのチームなど、多くのチームは自分たちだけでバッチを処理しています。彼らは慣れ親しんだ言語でコーディングを行い、そして「Airflow」や「OASIS」など、ほかのプラットフォーム上でスケジューリングをしています。

誰もが満足している状況なのか

現在は十分に良い状況と思われます。なぜなら、データプラットフォームのユーザーは、バッチを自由に扱ったり、Data ETLチームの助けを借りたり、そのいずれも選べるからです。しかし実際に誰もが満足している状況なのでしょうか? ユーザーが自分で処理をすると決めた場合、何が起こるかを見てみましょう。

ユーザーはまず、マシンを準備し、スケジューラーを設定して、マシンとスケジューラーの両方を管理する必要があります。そしてデータの検証、タスクのリトライ、アラート、通知など、スケジューラーに必要な機能を設計・開発しなくてはなりません。最後に最も重要なことですが、確実に実行可能で信頼できるコードを書かなくてはなりません。

データエンジニアが、これらのことを行うことは難しくないかもしれませんが、コーディングやデータエンジニアリングの経験が浅い社内のその他のユーザーにとっては、あまりに手間がかかり過ぎます。

その一方で、データプラットフォームの管理者も悩むことが多くあります。不用意なスケジューリングやコンフィギュレーションが原因で不注意によるイレギュラーなジョブの問い合わせがしばしば来ます。

さらに悪いことに、ユーザーから提出される悪いクエリや乱暴な運用によって、データプラットフォームのコンポーネントが不安定になることもあります。

あるチームのOASISジョブの実例

実際のケースを見てみましょう。これは、あるチームのOASISのジョブから抜粋した4つのパラグラフです。このチームでは、Scalaを使って「Spark」ジョブを作成していました。

最初のパラグラフ、Createジョブの最初のパラグラフ、Create tableからいくつかのライブラリを開発していたことがわかります。

そしてCheck partitionとGenerate new partitionのパラグラフでは、適切なパーティションキーを取得し、現在のパーティションを確認してデータ処理のクエリを実行しようとしています。ちなみに、クエリが終了すると小さなファイル同士を結合することも忘れていませんでした。

そして最後に、「Slack」上でジョブのステータスを通知してほしいという要望があります。実際このチームにとっては、左下のパラグラフにある500行のクエリだけが重要なのです。彼らのビジネスにとってはこれだけが重要なのですが、ETLのジョブを行うためだけにしては、それ以外の部分の準備にあまりにも時間がかかります。

そしてOASISは、パラグラフを定期的に実行するよう設定できますが、スケジューラーとして機能させたくても、タスクの依存関係などの機能がありません。OASISは、このスクリーンショットで示されているように、逐次での実行しかできません。

もう1つの重要なポイントは、バージョン管理がチームによって行われていない場合もあるということです。例えばOASISのユーザーは、ファイルをディレクトリツリーでのみ管理できます。

ここで私たちの目標が明確になりました。中央で管理され、かつ、使い勝手のよいETLシステムが必要です。このようなシステムがあれば、データエンジニアリングの負担や暗号化や圧縮の設定、小さなファイルの結合、タスクのリトライやリカバリーなどの運用コストなど、データエンジニアリングの負担を通常のユーザーは負わなくて済むようになります。

同時に、データプラットフォームの管理者は、このプラットフォーム上で実行されるすべてのジョブに、適切な設定や制限を適用できます。

新しいワークフローサービス

このようなあらゆる問題点を考慮し、私たちは社内のエンジニアに向けて、新しいワークフローサービスを構築・提供することを始めました。このサービスには、少なくとも以下の3つの重要な機能が必要となります。

まず第一に、我々データプラットフォーム側は、単一の統一されたプラットフォームを持ち、相異なる種類のスケジューラーやインスタンスを設置することを避けなければなりません。そうすることで、開発・管理・保守を容易に行えます。同時にユーザーは、自分の得意ではないことから解放されます。

そして、一連の標準化された機能をユーザーに提供することで、すべてのジョブとオペレーションが管理されていることを確認できるようにしなくてはなりません。これによって、異なるチームによる開発の重複も避けられます。

そして最後に、ジョブの作成において、コードを使わなくて済むようにしなくてはなりません。ユーザーに高度なコーディングスキルを求めず、クエリが問題なく動作することが確認できれば十分となります。

統一されたプラットフォーム

統一されたプラットフォームをホストするためには、次の3つの特性を満たす必要があると考えています。

まず、高い可用性。システムがダウンタイムなしに安定性と信頼性のある状態でアプリケーションを実行できることが必要です。例えばアップデートや再起動がユーザーのアクセスやオペレーションに影響を与えてはなりません。

2番目は、拡張性です。ユーザーの多様なニーズに対応する必要があるため、システムに機能を定義し、統合することが容易であることです。

3番目は、スケーラビリティです。将来的なワークロードの増加や、さまざまな状況にそなえて、システム全体が容易にスケールアップ、スケールアウトできることが必要です。

私たちの実際のやり方では、統一されたプラットフォームを構築するために、「Kubernetes」上のAirflowという選択をしました。Kubernetesの助けを借りて、高い可用性を得られるからです。そして、Airflowは、拡張可能なプラグインをサポートしています。

また、KubernetesレベルとAirflowレベルの両方で、容易にスケールアップ、スケールアウトが可能です。さらにKubernetesだけでなく、Airflowのコミュニティも非常に活発です。なので、サポートを受けるのも、貢献を提供するのも容易です。

次のポイントは、標準化された機能です。データプラットフォームの管理者である私たちは、ユーザーが好きなようにジョブを実行するのは、危ないし無責任であるということを知っています。不注意な操作や不適切な設定、最適化されていないクエリなどがあると、サービスが不健全なものになってしまいます。

同時に、異なるチームのユーザーが、同じライブラリや関数を開発して、車輪の再発明になってしまうこともあります。だからこそ私たちは、正式に推奨された標準化機能を提供しなくてはなりません。ユーザーの予期せぬ行動から「Hadoop」クラスタを守り、ワークフローサービスを使いやすくする必要があります。

これまでに実現した機能

これまでに実現した機能や、現在取り組んでいるものを紹介します。まずはエンジンです。現在、私たちのデータプラットフォームには、「Hive」「Spark」「Tez」などいくつかのエンジンがあります。

しかし、新しいワークフローサービスでは、そのうち1つだけを使うべきです。そうすることで、一連の標準化された機能性を効率的に開発・提供できるからです。実際には、調査・検討を経て、最新の「Spark 3」をメインの実行エンジンとして使用することにしました。

次はオペレーターです。ETLバッチのタスクタイプを分析したところ、SQLとデータ移動が大半を占めていることがわかりました。また、ジョブ実行時にもオペレーションをコントロールする必要があります。さらには、データ品質です。ユーザーはデータのプロファイリング、検証、異常検知を行えるようにする必要があります。

通知に関しては、ユーザーはSlackとEメールで、ジョブのステータスに関するメッセージを受け取れます。

依存関係に関しては、データの生成時間を保証し、障害発生時にあまり人手をかけないようにするために、上流のジョブに障害が発生しても、ジョブの内部およびジョブ横断的にタスクの依存関係を設定できるようにする必要があります。

最後はSLAです。ユーザーは、データの生成時間とジョブ実行時間を容易にモニタリングおよび検知できるようにSLAを設定できます。

私たちの設計哲学

統一されたプラットフォームと標準化された機能は、開発のためのものですが、コードが不必要なのは、私たちの設計哲学を反映しています。私たちは日々の経験から、次のような結論に至りました。

「ユーザーは、データエンジニアリングではなく自分のビジネスロジックに集中したいと考えている。また、データプラットフォーム管理者は、ユーザーのジョブの予期せぬ動作によるトラブルを防ぎたいと考えている。」

ユーザーがデータ処理の中核となるロジックを提供し、それをどのように実行したいかを私たちに教えてくれれば、それを実行可能な状態にするために、すべてのパーツを組み合わせて、双方の問題を解決できます。

そこで私たちが考え出したのが、「Job as Config」という解決策です。いくつかの種類の設定テンプレートをあらかじめ定義しておきます。ユーザーは必要なものを選んでフォームに入力し、提出します。そして、設定ファイルを解析し、所与の実行時間で実行可能なジョブに変換するのです。

一方、ETLのジョブは、意図的にバックエンドのスケジューラーから独立させています。これによって、ユーザーに独自のミドルウェアを提供可能になります。

また、将来的には、特定のスケジューラーをユーザーから隠すことも可能になります。これによってユーザーは、Airflowのようなスケジューラーの使い方を覚える必要はありません。さらに、スケジューラーの移行が発生した場合にも簡単に対応できます。

設定テンプレート

次に、設定テンプレートの一例を見てみましょう。まずはジョブです。ジョブは、完全なるデータパイプラインを定義します。グローバル全体的な設定も保持しています。マルチテナント環境で適切な権限管理を行うために、"service"で分けて設定するようにしています。"description"の内容はWebページに表示されるので、チームメンバーは詳細を確認しなくても、簡単にこのジョブを知ることができます。

"maintainer"は、誰がこのジョブに責任を持つかを定義します。ユーザーは、NotificationとRetryのフィールドによって、メッセージの送信先とリトライ戦略を決められます。

また、東アジアや東南アジアのさまざまな国にユーザーがいるので、タイムゾーンを設定する機能をサポートして、タイムゾーンの変換による潜在的な問題を解決しています。そして最後に、ジョブはタスクをタスクネームで参照し、それらの依存関係を定義します。

タスクのテンプレート

では、タスクのテンプレートを見てみましょう。Spark SQLやDistCpなどのタスクの種類ごとに、専用のテンプレートを用意しました。この構成設定ファイルのすべてのフィールドで、パラメータ化が可能です。

この場合、2つのパラメータが定義されていることがわかります。これらのパラメータは、下にあるデータ検証とクエリフィールドで使用されます。これは、ユーザーが柔軟にタスクを作成・更新できるという点でパワフルです。

そして、User Defined Function(UDF)をサポートする必要があります。ユーザーは、信頼できる私たちのレポジトリから、ユーザー定義型機能をインポートできます。クエリの実行前と実行後に、データソースと結果テーブルの両方でデータ検証を行えます。また、データ保持などの機能もあります。一度設定すると、タスク実行中に古いデータが自動的に消去されます。

どのようにして変換されるのか

ジョブやタスクの設定ファイルができたら、どのようにして実際の実行可能なジョブに転送されるのでしょうか? 1つの解決策は、「Jinja2」のようなテンプレートツールを利用して、設定ファイルからスクリプトのテンプレートをレンダリングするという方法です。

これは実現可能ですが、提供されたファイルを管理しなければなりません。さらに、開発に追加的な段階が入るためシンプルではありません。

実際には、私たちはジョブを動的に生成します。よく知られているように、AirflowはDAGフォルダをスキャンしてDAGを作成します。そこでDAGスクリプトを作成し、実行時に設定を読み込み、設定が読み込まれ、実行時にDAGオブジェクトが作成されます。Airflow以外の新しいスケジューラーに変更しても、同様の処理を行ってジョブを動的に生成できます。

では、Airflowではどのようなジョブになるのかを確認してみましょう。構成を設定した内容が、実際のDAGに転送されているのがわかります。このジョブを説明するドキュメンテーションもあります。ピンク色の円は、このDAGが2つのSpark SQLタスクで構成されていることを示しており、すべて順調に動作しています。

ワークフローサービスのアーキテクチャ

最後に、私たちのワークフローサービスのアーキテクチャについて説明します。可能な限りシンプルに作りました。コードとユーザー設定は、社内の「GitHub」で管理しています。リリースされると、Kubernetesクラスターに展開されます。

Airflowは、Kubernetes executorで動作するので、「Redis」のような追加のメッセージブローカーや、余分な「Celery」ワーカーを設定する必要はありません。

タスクポッドが作成されると、SparkアプリケーションをYarnに投入します。ジョブが終了すると、Airflowタスクインスタンスは、ユーザーがjob configに設定したとおり、メッセージをSlackまたはメールに送信します。

もちろん、ユーザーはAirflowのサーバーにアクセスして、自分のDAGを確認・操作できます。

本番でリリースしてからの成果

新しいサービスを本番でリリースしてから、いくつかの成果がありました。ETLコードがあちこちに散らばっていた従来の状況だった過去とは逆に、今では1ヶ所に集中して保管し、特定のジョブを簡単に見つけられます。

同時に私たちは、クラスタコンポーネントに影響を与える可能性のある設定の制御を、ユーザーから取り戻しました。例えば私たちは、すべてのORCフォーマットのテーブルに、zlibで圧縮を適用して、パフォーマンスを向上させ、ディスク使用量を削減しています。以前は「ORC」+「Snappy」やその他の組み合わせなどの、ユーザーが設定した組み合わせがありました。

ユーザーも、より生産的な新しいワークスタイルを見つけられています。「Job as Config」のおかげで、ユーザーはビジネスロジックに集中できる時間が増えました。そして、豊富なプログラミングの知識がなくても、ほかのETLジョブを簡単に理解できます。

今後の活動

最後に、私たちの今後の活動についてお話しします。データの質に関しては、現在の機能、特に異常検知機能を強化していきます。また、カラムアナライザーやデータクリーニングなど、より詳細な機能の導入に取り組みます。

データリネージでは、ETLジョブを社内のデータカタログツールと統合して、ユーザーによりよいリネージサービスを提供したいと考えています。

データ接続性については、より多くのデータストレージをサポートし、異なる種類のデータソース間でのETLジョブの実行を行いやすくしたいと考えています。例えば、MySQLとHiveのテーブルから結合されるデータにクエリを行えるようにするなどです。

私たちはここに書いてある人材を採用していきたいと思っています。ご視聴ありがとうございました。