RxJSを使って非同期なレスポンスをリクエストした順に結合する話
RxJS
は非同期処理を高度に抽象化するので、煩雑な手続きをシンプルに書ける場面が多く、その辺にRxJS
のユーザがこのライブラリにのめり込んでいく要因があるように思います。
一方、実地で使っていないとこういったメリットは、説明されてもピンとこないということがあるかも知れません。
そこで本稿では、実際に遭遇した『RxJS
を使っていたから楽だった』コードをベースに、RxJS
のメリットを紹介したいと思います。
最近弊社では、とある検索機能を持ったアプリケーションを作っていました。
(SUUMO
やHome'S
の物件検索のようなものをイメージしてもらうと実像に近いと思います)
ユーザが任意に入力した検索パラメータに応じて、都度APIサーバを叩いて検索結果を表示するような機能です。
クライアントから非同期に発行される検索リクエストを、複数台あるAPIサーバのいずれかに投げてレスポンスをもらう、という構成になっています。
ここで問題になったのが、各検索リクエストと各レスポンスはシーケンシャルに対応している必要がありますが、サーバマシンから見て各検索リクエスト同士は関連性を持っていないので、レスポンスが必ずしもシーケンシャルになるわけではないということです。
説明が分かりづらい気がするので、具体例を上げてみます
- クライアントから検索リクエスト-Aが発行
- APIサーバ-1で処理を開始
- クライアントから検索リクエスト-Bが発行
- APIサーバ-2で処理を開始
- APIサーバ-2からレスポンス-B'が返却
- APIサーバ-1からレスポンス-A'が返却
APIサーバでの処理時間とタイミング如何では、このように『リクエストした順にレスポンスが返ってこない』ようなことが発生し得るかと思います。
(逆にレスポンス-A'
->レスポンス-B'
の順番で返ってくることもあり得ます)
これを解決するためのコードを、まずRxJS
を使わずに書いてみたいと思います。
例えば、リクエストをキーにしたキュー構造(のようなもの)を用意して、先出しのレスポンスが満たされたらpopするような実装を考えてみました。
let queue = []; // パラメータをキーに探索できるミュータブルなキュー
const PENDING = "PENDING";
const request = (parameter)=> {
const query = encode(parameter); // オブジェクトをクエリ文字列に変換する関数だと思ってください
queue.push({ [query]: PENDING });
return fetch(`/endpoint?${query}`)
.then(res => res.json())
.then(response => {
// レスポンスが返る毎に、パラメータをキーにqueueに格納する
queue.forEach(q => {
if (q[query] === PENDING) {
q[query] = response
}
})
// queueを舐めてレスポンスが満たされているものだけ取り出す
const results = [];
for (let i = 0; i < queue.length; i++) {
const element = queue[i];
if (element === PENDING) {
break;
}
results.push(element);
}
results.forEach(() => queue.shift());
return results;
});
};
request({ parameterA: "foo" })
.then(console.log);
何をやっているかよくわからないですね...
コードが悪いだけかも知れないですが、キューの中身を都度書き換えていたりして、不安の残るコードになってしまいました。
キューから取り出したシーケンシャルなレスポンスを、長さの不定な配列に格納して返しているので、インターフェースとしても直感的ではない気がします。
では次に、同じ仕様でRxJS
を使う前提で書き直してみようと思います。
const request_Rx = (parameter: { [key: string]: any }) => {
const query = encode(parameter);
const fetched = fetch(`/endpoint?${query}`)
.then(res => res.json());
return Rx.Observable.fromPromise(fetched).first();
};
request_Rx({ parameterA: "foo" })
.concatAll() // レスポンスをシーケンシャルに結合するオペレータ
.map(res => [res]) // 最初のバージョンとインターフェースを合わせるために配列に入れている
.subscribe(console.log)
ご覧の通り非常にシンプルに書けました。
最初のバージョンで苦労した、非同期に発生するイベントをシーケンシャルに結合する
部分をRxJS
が抽象化してくれているので、ほとんどコードを書かないで済んでいることがわかると思います。
以上、実際に開発中に遭遇した場面を元に、RxJS
の便利さを紹介してみました。
参考になればうれしいです。