コマンドとクエリを分けるCQRSとは

加藤潤一氏:今日は「CQRSはEvent Sourcingなしで実現できるのか?」という話をします。よろしくお願いします。自己紹介は割愛させてください。

Event Sourcingの事例は、Chatworkでも2016年にNTTデータさんと共同開発したプロジェクトです。AWSのDev Day(AWS Dev Day Tokyo 2017)で話したので、スライドもあります。あとはApache Kafkaを使っているんですけど、NTTデータさんのメンバーが書いた書籍があって、そこでもうちの事例も出ているので、興味がある人はそちらを見てください。

今日の内容ですが、Event Sourcingの話なので、モノとのつながりをトレースできるイベントや出来事などが、ドメイン分析に使えるよという話なのかと思われそうですが、今回はそういう話ではなく、イベントを使ったアーキテクチャに関する話題です。その辺はご承知ください。

CQRS(コマンドとクエリの分離)において、Event Sourcingは必須なのか、あるいは大袈裟ではないのかという話題がよく挙がります。今日はそういったテーマでいきたいと思います。(Event Sourcingの)逆の意味のState Sourcing、つまり状態に基づく仕組みの場合は、CRUD(Create/Read/Update/Delete)を使うことが多いと思うんですけど、それでCQRSできるかどうかを考えてみたいという話です。

CQRSは、最近よく話題になるのでみなさんご存じかと思います。改めて言うと、CQRSはコマンドとクエリの責務を分離するパターンですね。2010年にGreg Youngさんが考案したパターンで、DDDのコミュニティでも、わりと話題になったりしています。もとはMeyer(Bertrand Meyer)さんの、CQSという、パターンをアーキテクチャレベルに適用した話がありました。これは分けて考えたほうがいいのかな、と最近思ったりしています。

コマンドとクエリで、要件が違うから分けましょうという話がよく出ます。ドメインに関してはWrite Stackに閉じ込めて、それ以外の複雑なクエリ要件に関してはRead Stackで対応する。ReadとWriteで責務が違うので完全に分けていくんですが、完全に分けたら、どこかでつながないといけません。そのためには、そのWriteモデルとReadモデルを途中で形式変換する「ReadModel Updater」という仕組みが必要になります。このようにCQRSを使う例として、一貫性やデータ形式、スケーラビリティとかは違うので分けましょう、という話がよく出てきます。

CとQの分離させたときの問題

CとQを分離しない場合の弊害は何だろうと思ったとき、僕も分けないパターンでよくやっていたのでよくわかるのですが、気付いたところで言うと、リポジトリのクエリメソッドが複雑になるんですね。例えばfindBy〇〇とかがいくつもつながったりとか、はたしてこの要件はリポジトリの責務なのかと、みなさんも考えたことあると思います。

あと、困っている人が多いと思うんですけど、N+1クエリが発生しやすいんですよね。クエリでReadモデルを作る必要性があった場合、こういった問い合わせが発生しやすい。例えばホテルの予約みたいなものを取り出して、ホテルの名前とか予約した顧客の名前とかを取ってきて、Readモデルを作る必要性があった場合は、こういったN+1クエリが発生しやすくなります。

あとは、ドメインオブジェクトからViewモデルやReadモデルといったDTOに変換するときに、変換効率みたいなものが落ちてしまうとか。例えばUIに合わせて、必要な項目が実はドメインオブジェクトの一部しか使わない場合に、リポジトリで引き当てたものの、半分ぐらいは捨てられて、名前と住所ぐらいしか残らないということが、画面の構成によってはあると思います。

そういったことが、CとQを分離しないと起こりやすいのかなというのを、実感値として感じているところです。

ReadモデルのCとQは分けてもいいけど、ReadモデルとWriteモデルのテーブル、DBは分けなくてもいいじゃないという人もいます。

実際にこれは困った例で、例えば従業員の入社日を管理したいといったときに、EpocTimeがLong値としてエンコードされてしまう。ドメインオブジェクトでエンコードされたものが、実際に内部データとしてLong値になっています。

これは、クエリ側からそのまま参照しても、Readしたい値表現にはなりません。こういったときに助けになるのが値オブジェクトのメソッドだったりするんですが、ここでクエリ側の値オブジェクトを参照するということは、結局Write側の助けが必要となるわけです。完全に分けて途中でつなぐみたいな発想がないと、こういった分離みたいなことはなかなかできません。

こういったことを回避するためにも、(Write側に読み込ませるため)先ほど言ったReadModel Updaterで事前に形式変換してあげる必要があります。こうしないと、ドメイン側のコマンド側オブジェクトのエンコードしたデータを、クエリ側で扱えません。

Greg Youngさん曰く、このC/Qのモデル間の変換や同期の問題は、イベントで解決すべきだと言っているんですね。他にも仕組みはあると思うんですけれども、「最も適したモデルはイベントの導入であり、イベントはよく知られた統合パターンであり、モデルの同期化に最適なメカニズムを提供します」と言っています。Event Sourcingが必須みたいな印象を受けるんですけど、今回はあえて、Event Sourcingを使わないで統合できないか考えてみた感じですね。

Event Sourcingとは何かというと、過去に起きた出来事=イベントにおいて、真のデータソースは「最新状態」のものではなくて、過去に起きた「イベントの列」が、真のデータソースだよという考え方です。イベントは不変で、追記のみしか行いません。イベントを再生、といってリプレイする考え方があるんですが、そこからの状態にイベントを適用していくと、最新の状態が手に入るという考え方です。

これは「パフォーマンス的にはどうなるの?」「イベントの列が大量にあるとパフォーマンスが劣化しない?」という話があります。それも解決できる仕組みもあったりするので実質パフォーマンス的にもそんなに問題にならないことが多いです。

Event Sourcingの場合は、先ほど言ったように真のデータソースは永続化されたイベントで、そのイベントを使ってReadモデルを構築します。データソースの部分は、Write側がドメインイベントを列として管理できるストレージを使います。例えばApacheのKafkaとかいろんなものに対応しています。KafkaとかKinesisとかが、典型的な例ですかね。これが「大袈裟では?」「大変では?」となるので、他にも方法はないかとなります。

データソースを最新にするには

真のデータソースを最新状態にできないか考えてみます。(スライドで)クライアントをちょっと横長に書いていますが、左から右に流れると思って見てください。左側にRESTのサーバみたいなのがいて、リポジトリの助けを借りてアグリゲートを読み込んで、何か副作用を起こしてその結果を保存する。

Write側には、(スライドの)真ん中左側にWrite用のモデルがあると思いますが、そこに最新状態のものを保存します。そして、Read側のデータベースにもイベントに何か変更が起こったよというのを伝えないといけません。

変更内容を伝搬させる必要があって、Write側(コマンド側)は最新状態でいいんですけど、Read側(クエリ側)には非正規化されたデータが必要です。例えばさっき言ったようなホテルの予約だったら、予約に対してホテル名や顧客名みたいなものを、非正規化されたデータとして構築する必要があります。逆にWrite側は、正規化された集約単位のデータベース設計になります。

なので、これはそれぞれ正規と非正規の橋渡しをするため、何かしら通知しないといけないんですが、この正規と非正規の橋渡しをするのがReadModel Updaterです。ただし、書き込みが起こったあとにイベントをReadModel Updaterに通知はしますが、イベント自体は最新状態のみ永続化する必要があります。

そういう考え方でいくと、まずWrite側のモデルを書き込んだあとに、このキューみたいなところにイベントを書き込みます。これは永続化が揮発しちゃうんですけど、それが読み込まれて非正規化されたデータを作る。それをRead Stackで読み出す。こういった方法を考えたんですけど、この場合でも、2層コミットの弊害と、イベントの追い越し問題が発生してしまいます。

それがどういう問題かというと、アプリケーションからのイベント伝搬方式は、当たり前ですけど、イベントが全順序に並んでいないとうまくReadモデルを構築できません。例えば何かカートにアイテムを追加して、その後数量を変えてアイテムの削除を行なう、といったことは、発生した順序に並んでないとおかしいですよね。

この図の、真ん中にあるグレーの矢印がストリームです。イベントの流れですね。そして左側は2層コミットの問題の図なんですが、これはイベント追加にロックがない場合におきる問題ですね。

例えばステートレスなWebアプリケーションの場合、今イベント3が書き込まれている状態に対して、スレッド1とスレッド2が同時にイベント4を書き込もうとしたとします。他のサーバあるいはスレッドから、同じ前提でイベント4を追加しよとしたときに、ロックがないと、この3に対する変更を2つのイベントが同時に登録されてしまう問題が発生します。そのため、ロックが必要になります。

しかしこの場合、Write用モデルのDBとこのキューの書き込み、キューへの追加が、同一トランザクション上にないと、そういったロックが作れません。例えばWriteのDB側に書き込みが成功したらイベントを追加する、みたいなことをやったとしても、基本的には同じDBではなくて別々のDBなので、レースコンディションが発生してしまいます。アトミックな操作ではないので、割り込みが発生すると、どうしても防ぎようがありません。

別の方法として、Writeモデル側に書き込みが成功したら、そこをエンキューする方法があるんですけど、それには、Writeモデルの書き込みが成功したのをきっかけにしてキューにエンキューするといった処理が必要になります。

もう1個の問題がイベントの追加順序の問題。作業中にイベントの追加順序が狂うということが、実際にあります。僕らがやっているChatworkでも同じことが起きているんですね。

これは他のサーバで、早めにイベントを生成して追加しようとしても、OSやスケジューラにおけるスレッドのスケジューリングが狂ってしまい、早く受け取っているのに、書き込みが遅くなり、そうなるとあとから来たイベントが先に追加される事態になります。

解決策としては、ReadModel Updater側で動的に順番を入れ替える方法でやっているのですが、こういったものを成立させるには、すごく複雑な作業が必要です。この仕組みでイベントを永続化しない前提で考えると、すごく複雑なことが発生してしまいます。

CDC+Outbox方式

そこで、他に方法がないのかといろいろ調べました。見つかったのが、CDCという仕組みで、Change Data Captureという方法です。同じようにこちらがステートレスな前提で、どこのサーバでリクエストを受けても、どこでも同じように処理ができるというものです。Kafkaコミュニティではわりと知られている方法で、このCDCの仕組みを使うと、Write DBでの変更を捉えることができます。

具体的には、Writeモデルと同一トランザクションのOutboxに、ドメインイベントを保存します。一時的な保存でもよくて、それを使って下流に伝播させます。

Red Hatのメンバーが作っていると思うんですけど、オープンソースのプロダクトであるDebeziumがKafkaのコネクタとして実装されていて、いろんなRDBに対応しているんですけど、これを使えばMySQLだとbinlogを読むことができます。

一時的にドメインイベントに書き込んで揮発したとしても、binlogにはヒストリが残った状態になるのでそれをコンシュームします。DBのトランザクションログはイベントと同じような働きをするので、結局それを使って下流に流す。それをやれば、別にこのWriteモデルはステートをもっていても問題はありません。

ただ結局、これはEvent Sourcingになっているので、どうしてもイベントを書いて連携しないと、さっき言った2層コミットの問題がすごく面倒くさいです。

AkkaのES方式

Akkaの方式なんですけど、Akkaはステートフルアプリケーションなので、DDDでいう集約をアクターとして常駐させるんですね。常駐させてステートを持っている状態すが、その集約IDを持つアクターは、クラスタ内で1つしかもてないので、複数使う場合はそれをシャーディングしています。

つまり、クラスタ全体でID=1というのは、1つしかありません。そして必ずどこかのノードで動いていて、万が一停止しても別のサーバへの切り替えが自動的にされているので、問題にはならないという感じです。

そして、クライアントからリクエストがルーティングされてくるので、ここで受け取ったら、アクターがトランザクション管理をしているので、真の状態は、実際にこのアプリケーション空間上のメモリの中にあります。

なので、このアクターが副作用を起こすと、イベントを追記していきます。追記して、例えばノードがダウンしたらリプレイする必要があるので、そのときもう1回読み直すんですね。つまりこのパーティションの中のイベントを読み直すという仕組みになっています。ここをどんどん書いていきます。こっち側はコンシューマになっているので、何台いてもかまいません。

もしKafkaとかでやれば、コンシューマが何台でもリバランスをうまくやってくれるので、このドメインイベントの列をコンシュームして、Readモデルを作っていきます。

まとめると、単純にイベントを伝播させるだけでは、さっきのような問題が発生します。Akkaみたいなものでなく、一般的なステートレスなWebアプリケーションであれば、CDCとOutbox方式をとらないと大変だなという印象です。CDCは、一旦そのMySQLに状態を書き込むので、そのあとにKafkaとかを使ってトランザクションログを読んで、Readモデルを別に作ります。

結局トランザクションログが真のデータソースであり、ステートのみを保存していると言いつつも、ESの一種であるという理解が必要かなと思います。なので外形的にはCQRSでState Sourcingと言っていますが、最新のデータが真のデータソースになるわけではありません。そんな身も蓋もない話で、結局Event Sourcingでないと、CQRSは難しいという結論です。

以上になります。ご清聴ありがとうございました。