ETL処理で気をつけたいこと

吉田健太郎氏(以下、吉田):いろいろなETL処理があるんですけれども、それにはいろいろ気をつけたいことがあります。

例えばいわゆる昔ながらの使い方で実装してしまうと、活用できるまでのリードタイムがかなり長くなってしまうので。いかに最小限のラグで実行できるようにするかが勝負です。

例えば丸1日分のデータを一気に集めてそれを処理するとかなり時間がかかります。そのデータを1分ごとに処理したら短い時間で計算が終わるのは自明です。

そこらへんの概念を切り替えていく感じで、リードタイムをいかに少なくしていくか、いかに計算を短くしてパラレルに進めていくか。そういったところの考慮が必要です。

もちろん思うようにデータが入ってこないこともあったり、プログラムが突然なにかしらの影響で落ちることもよくあることなので、そこらへんのエラーハンドリングを適切に行なう必要があります。

これも自前で行なうとかなりしんどいものがあり、バスタブ曲線的に安定するまで数年以上はかかると思います。すでにEmbulkやDigdagは3年以上経ってますけども、たまに「そういった問題がまだ見つかってしまったか」といったこともあります。

最近すごく減ってきて、ここ1年本当に安定してきていると思ってますが。まあ数年は安定させるまで時間がかかるので、そういったものはうまくできてるものを使うとさっさとメインの業務に集中できるでしょう。

あとデータ検証ですね。破損レコードもあったりするので、つまり文字コードがたまたま崩れているであったり、パケットが微妙に壊れていて文字化けというか、途中でEOF的な感じの記号が入ってきてしまっているとか。

そういうのもあるので、そこらへんEmbulkであればおかしなレコードがあれば、そこはスキップしたうえで次に進むというオプションであったりとか、そういうのが1個でもあったら丸ごとそのファイルを処理しないといった設定もできるので便利です。

そのようにしてエラーレポートされたものは後で自分たちが見るということにして、それ以外のものに関してどんどん処理を進めていくといった使い方もできます。

リードタイムの長さをどうするか

あとバックフィルですね。これは例えばいま(午後)3時20分ですけども、1時間単位で実行している2時のバッチが実はこけていた。ただ3時のがもう始まってしまったとなったときに2時の分だけをうまく実行して、その分だけのデータを入れなおすというものです。

このETL処理のところでは次の処理がどんどんどんどん進んでいくので。前の実行結果によって次の結果が左右されてしまうことがなるべくないような、冪等性を担保する必要があります。

つまり2時のを失敗したら2時のデータだけ入れなおせば、そのあと綺麗にすべて解消されるようなそういう仕組みが必要です。

あと遅れて到着したデータへの考慮です。短い時間で行うとどうしても遅れてくるようなデータ。例えばスマホアプリであればスマホ側にいろいろデータを保管していたりするので実際にサーバに届くまで数分どころでなく、数日遅れることだってよくあります。

そのため、それのワーストの時間に合わせてしまうと、ぜんぜんデータが出てこないで困ってしまうので、一旦速報値を出してそのあと順次確定値を出すようなそんなやり方をすると良い。そんな気をつけるところがあります。

活用できるリードタイムの速さというところでは、例えば直列処理で昔ながらの処理でしていると、まずステップ1のなにかしらの処理を実行します。

実はステップ2、これはステップ1の依存関係はないんですけども、直列処理なのでその次に実行されます。次、ステップ1と2で出てきたデータを使ってステップ3が実行される。となったときに、これだけ時間がかかってしまうわけですね。

でも、もしこれを並列処理で実行するならば、もちろん自前で全部書くととても大変です。なかなか難しいと思うんですけどEmbulkとDigdagを使うとそこらへんが本当に簡単にできるので素晴らしいことです。

Digdagのところでparallel:trueというオプションを指定したうえでステップ1とステップ2の処理を実行する。そうするとマルチコアを活かして並列でガツガツっと実行してくれます。

Digdagのところで最大プロセス数を指定しておけばそれ以上増えることはないので使い過ぎになることも防げます。あとはリソースをちょっとだけ残しておく。そんなこともできます。

そこらへんを並列で実行し終わったらステップ3を2つのもの、もしどちらかが遅れたらそれまで待って、そのうえでステップ3を実行する。そういうのが簡単に書けることは本当に素晴らしいところですね。

エラーハンドイングとレジューム機能

エラーハンドリングとバックフィルができる仕組み。こちらも自前で実装するのは大変ですし、もちろんDigdag以外にもそういったものがいくつかありまして。Airflowとか、そういったものを使ってやってる方もいます。

どちらも共通するのは、コマンド1つで、ある日付のこのタイミングで実行する予定だったものを実行してくださいと頼むとその通りに動いてくれると。

ここらへんを使ううえで必要なのは手動でゴミファイルを削除しないと動かないですとかそういったものは本当にあってはならないので、叩けば必ずうまくいく。そういった冪等性のある設計を行なっておく必要があります。

あとレジューム機能ですね。

先ほどのであればステップ1、2までいって、その直前でAWSのなにかしらの影響でインスタンスが落ちたという場合に、そこまでのステートは持っているので次はステップ3から実行する。

そうすると最初の重たい処理をスキップできるので効率良くできると。そういったチェックポイントとして書き出されたファイルのところから再開できる仕組みを設計する段階で入れておくと良いです。

言い換えると、例えばEmbulkのインプットでMySQLを指定してフィルタでなにかしらの処理をし、アウトプットでTreasureDataであったりどこかに書き出すといった処理をしていたときに、アウトプットのところがたまたまネットワーク的な問題で落ちてしまった。

そういうときにインプットのところをもう1回やり直しになってしまうわけですね。それはけっこう効率悪いですし、取得する予定だったタイミングからもずれてしまったりします。なので、うまく実行できたところは保存しておいて、実行できてなかったところだけやり直そうということが、このレジューム機能でうまく考慮しておきたいポイントです。

効率のいいレジュームというのは失敗したらそのバッチの頭からではなく続きから再開できること。なので、取り込みと書き出しのそれぞれ2つにバッチを分けると良いです。

どうしてもファイルが大きかったりするので、ローカルに置くとすごい容量を食ってしまうということなので、容量を無限大に使えるS3を使っておけばなにも気にすることなく。もちろんリトライがあるのでなにかしら通信失敗があればうまいことやってくれます。いずれにせよかなり安定的なストレージの1つなので、そこをチェックポイントとして使うといいでしょう。

とくにSnapshotの取得、いわゆるデータベースの、例えば商品情報マスタであったりユーザー情報マスタ、そこらへんについてはログで集めるのは難しいので、データベースへ直アクセスすることになると思います。

そこを定期的にがっつり取ってくる、そういったときにもう1回やり直すとデータベースの負荷もかかってしまったり、夜中だったら問題ないが日中叩くとちょっと……、的な問題も起きることもあるので普通に夜中取れるものは取っといた方がいいということですね。

関連してTDでもクエリの実行と書き出しを別のJOBとして実行できるようになりまして。例えば1回DBでクエリ実行します。そのあとResult Outputの機能を使ってS3であったり、どこか別のデータベースであったり、そこらへんに書き出すことができるんですけども。

Result Export機能の使い方

3月か4月TDコマンドの新機能result:exportの紹介をします。これまではクエリを毎回実行しないと、もしくは1回テーブルに書き出して、それをSELECT * FROM (1回書き出したテーブル)といったクエリで別の出力先へ書き出す必要がこれまではありました。しかし今回、JOBの実行結果をIDで指定して、ストレージなどの外部出力先に投げるという機能が増えました。

これは効率の良いレジュームにも関連してかなり使いやすくなります。つまり、クエリの実行は成功したが、後続タスクで失敗してしまった場合に、そこの続きから実行できるということです。

先にこのTDのクエリを実行して、その結果取ってきたJOB IDが「td.last_job_id」というところに入り、digdagの次のステップのブロックで参照できます。設定ファイルについて解説します。「result_path」というところに結果の書き出し先を定義し、一旦変数として入れたものを、sh>:オペレータを使って結果の書き出します。。

これまでの効率悪かったところがとてもよく解消されてます。ぜひとも使っていただきたい機能の1つです。

これはネット上には、『Qiita』であったりブログにまだ書かれてない機能なのでここにいる方くらいしかおそらく知っている方はいらっしゃらない機能なので。ぜひ自慢してください。そのうち書きたいなと思っています。

遅れて届くデータに考慮する

先ほどお話しましたが、遅れて届くデータの考慮をしましょうというところですね。

遅れて届くデータを補正するために前日または前々日のデータをまとめて処理することもあるんですけども、基本的には速報値と確定値の2ストリーム、2パスで進めたほうがいいです。

例えばこの遅れて届くデータに関して、2、3日ぐらい経ってから遅れているデータが3件ぐらい届いたってなったときに、それ以外の例えば3億5000万件+3件同時に計算するかというとなんか微妙ですよね。

そういうときにもし単位時間ごとにある条件によって平均値を求めている場合には、平均値とその母数となった件数両方保管しておくと、差分計算ができるのでおすすめです。

例えば簡単に、すべての合計値が64で20件で平均3.2というなにかしらの値があったとすれば、その3.2というのと合計件数の20件を記録しておきます。そのあと遅れて5と12と6というデータが3件届いた場合にはこのようなかたちで足し合わせた数を合計値のところに足し合わせて、さらに母数の分母のところを3件、20+3ですね。というかたちですることで、3.78という新しい平均値を求められます。

こういった差分の処理をすることで、重たそうに見えても実はそんなに重たくない。本当に0.00何秒かでおわるような処理に、アプローチを変えることでできます。

このときに忘れてはいけないのは平均値だけでなく総件数も更新しないとその次にデータが入ってきたときにおかしくなってしまうので、そこらへんの考慮漏れしないように気をつけましょう、というのがちょっとしたTipsです。

ETL設計のセオリー

ETL設計のセオリーとして、まずはFunctionalというところで、それぞれが単純明快な処理であること。1つのスクリプトで何百行も書かれているような処理だとなにか問題が起きたときにデバックも大変ですし。途中で再実行しようにもどこまでいったのか見るのがけっこう大変なので、それぞれコンパクトな関数を呼ぶだけで済むようなふうにすると良いでしょう。

Reproducibilityということで、データソースが普遍的で再現性のある冪等な動作ができること。これはいわゆるsnapshot_idというので、のちほどご説明します。

あとログ戦略ですね。あとセキュリティ・サニタイズ。個人情報が漏れてしまったら大変なので。

Functionalというところでは、不具合が起きたときも調査を行ないやすくするように短い単位のプログラムに機能分割をしてシンプルにすると良いということです。

Reproducibilityについては、なにかエラーが起きて再実行するだけで同じ正しい結果が導き出せる。二重実行オッケーなそんなかたちですね。

それをどうするかと言うと、日時でなにかしらデータを入れている場合には、取り込むというときに先に入れる予定だったレンジのデータを削除してから入れるといったやり方で一貫性を確保するという方法です。

なのでsnapshot_idを付与するというもので。snapshot_id、本当になんでもいいんですけど、今日であれば20180523、日時であればそういうのでもいいですね。そういうかたちでなにかしら好きなもので作るといいです。

これのsnapshot_keyを使えばほかのFactテーブル同士でジョインの条件に指定すれば、その日のものが取れるので、SQLも変更することなくデータを取って来れます。

Logging戦略とSecurity

次はLogging戦略ですね。

新たな値が来るとレコードが上書き、または削除されていくようなワークロードだと、ロガーでデータを集めようというのは難しいので、スナップショット、いわゆるデータベースのダンプ的なものですね。もちろんサニタイズは必要です。

アプリ側でUPTADE/DELETEはしない増分記録のみの履歴テーブルを持つと、けっこう肥大化してしまうので、そういったのを全部フロント側のアプリでしようとするのではなく、それぞれの責任ですべきことを分けたうえで、それぞれに最適なデータを入れてあげるときれいな仕組みになります。

セキュリティに関しては、分析DBに個人情報が入っていると広くみんなに使ってもらううえで相当まずかったりするので。なにかしらサニタイズ処理をしておくと良いです。

そういうのもだんだん開発しているうちに、データベースのスキーマが変わってきたりするものもあるので、そこらへんに気づける仕組みを作ろうというアプローチがおすすめですね。

あとはプロダクション環境からのデータベースの抽出というところと、そのあとのデータをエンリッチメントさせるような内容性であったり、統計値を出すとか、そこらへんのプログラムを分けたほうが良いです。

なんで分けるかと言いますと、前者のプロダクションDBからのデータ吸い出しというのは秘匿情報も往往にして含まれるので、そこらへんはしっかりとレビューを行ななったうえで「大丈夫だよね!」ということをみんなで確認して取り出したものを、S3であったりどこかに飛ばせるような処理だけ入れたスクリプトを用意しておくと良いでしょう。

後者のところは、「そこらへんで大丈夫なデータになっているはずだから」ということで、素早い開発ができるように、レビューさせる対象の人も減らして素早い開発ができるといいでしょう。

後者が設置される分析基盤はプロダクション以外のネットワークにあったほうが、外部連携や、例えばポートを空けたいとかいろいろあると思いますので、そこらへんの自由度を高めるうえでも分けておいたほうがいいです。

TDを活用してできること

TDを活用することでできること。簡単にまとめに入ってきたいと思います。

ETL処理や基盤を自分で全部整備することは、もちろんAWSやいろんなものを使えばできます。ただそれにはさまざまな苦労が発生するので、既存のいろいろなベストプラクティスを組み合わせていいものを作っていくといいです。

Embulkが解消できることというのは、データ取り込みやフィルタ加工が設定ファイルで容易となる。設定ファイルの中にRubyの1行スクリプトみたいなもの、1ライナー入れることもできるので、それでした表記を変えるとか。そういうこともできます。

その周辺の処理については、別途自前のgemみたいのを作り、そのなかでユニットテストを回して、1個ファンクションを呼べば、その処理が行なわれるとしておいたほうがテストも書きやすいし。

コード量の肥大化、「設定ファイルにプログラムがけっこう入ってるぞ」ってなっててもメンテしづらいので、そこらへんを解消させることに助かると思いますので、別のライブラリを呼ぶようなかたちがいいです。

Embulkが得意なことは、1ファイルであってもページスキャッター機能というのがあって、1ファイルをいい感じに分割して、それぞれ並列に、パラレルのストリームで回してくれることがあるので、そこらへんを使うととてもいいです。

Digdagが解消させることは、スケジュールやエラーハンドリング、再実行を任せることができます。バックフィルもこっちですね。

TreasureCDPが解決することは、大量のデータに耐えられるスケーラブルなシステムによって利用企業は顧客への価値提供最大化に注力できるようになる。つまり面倒くさいことは全部TDに任せて、本当に必要な、いかにユーザーに価値を提供するか。そこに集中するために使ってもらいたいと思っています。

エンジニア観点で言うと、最も負荷が高くて難易度の高いデータプラットフォーム部分のところは、設定ファイルを書くだけで終わりにできるくらい仕事量を減らして、データ活用をどうするかとか、もっといい設計にするにはどうするかとか、そこらへんに工数を割いてもらえるような感じになると素敵だなと思っています。

あとTDはとにかくデータ量がたくさん入るところなので、ほかのRedshiftであったり、Googleさんが出しているBigQueryとか使うと、相当な量を入れて全件なめるとかいった処理をかけるとけっこうな費用になってしまうんですけれども。

TDはその分すごく優しい価格設定になっていて。いくらデータをなめても、データをフルスキャンしようが料金定額です。なので一旦全部のデータ、どんなスキャンするかよくわからないようなデータもすべて入れてあげるといいと思います。

そのうえで各種連携システムごとに、例えばRedshiftが必要であればそこからRedshift用にデータを書き出してもいいでしょうし。S3に一旦書き出したうえでRedshift Spectrumを使ってS3のデータをRedshift経由で読むようなかたちにしてもいいでしょうし。そういったかたちでうまく使い分けるといいと思います。そういったハブとしての使い方がとてもおすすめです。

まとめと質問

まとめです。今回はETLの処理のセオリーをおさえて、ロバストな基盤を作るノウハウを紹介しました。可能な限り利用者の負担を減らして、シンプルにどう解決するか。それを実現できるのがTreasureDataの強みだと感じています。

そしてTDのエコシステムを活用するとさまざまなノウハウを大いに活用できるので、ビジネスの拡大を大いに後押しできてスピード感のある成功を支援できると思ってたりもします。

こんな感じでTDのことが大好きすぎて6年後に転職してきた者からの発表でした。

いい感じに使い分けるといいでしょうということでした。

以上です。ありがとうございました。

(会場拍手)

司会者:吉田様、ありがとうございました。会場から質疑応答のお時間をと思ってますので、なにかご質問があれば。せっかくの機会ですのでお願いできたらと思います。

(会場挙手)

質問者1:「スキーマの変更を検知しましょう」みたいなのが、弊社でもそれがあるとありがたいなというものだったので、もうちょっと詳しく聞きたいです。ツールみたいなのが提供されているのか、こちら側からAPIかなにかで定期的に差分を見てチェックして自分たちで組むみたいな感じなのか。そのあたりを教えていただきたいです。

吉田:そうですね。それについては現在作ろうとしているところです。ちなみに自分のプロジェクトの中で作ろうとしているわけであり、TDとして提供するというわけではないんですけれども一例を紹介しますね。

アプローチとして、GitHubであれば、なにか変更があったときにREST API叩けるようになっていたりします。なのでそこでAPI叩いてもらい、該当するスキーマ系のRubyファイルが変更されたらSlackに通知してね、みたいなので、終わったらSlackに「できました!」みたいなアイコンを置くみたいな。そういった、まずは軽~い感じのでやってみようかなと思ってたりもします。

あとはGitクローンしたうえで定期的にアップデートかけて特定のファイルが更新されたら通知みたいなバッチでもいいかもしれないと思っています。

もしDigdagでやるならば後者の方法で、たぶん2時間もあれば作れるような気がします。Digdagの中のシェルスクリプトオペレーターを使って、Gitのクローンしたのをアップデートして、特定ファイルのタイムスタンプが前回と比べて変わったかどうかを見て、変わってたらSlackに通知する。

できそうですよね? そういったゆるい感じでいまのところは考えていました(笑)。

司会者:ありがとうございます。他にございますでしょうか? 大丈夫ですかね。

ありがとうございました。もう1度拍手をいただけたらと思います。吉田さん、ありがとうございました。

(会場拍手)