Rx勉強会
複数のイベントや非同期処理を扱うコードは、処理の順番を扱うために状態マシンを定義する必要があり、その結果コードがすぐに複雑になってしまいます。
加えて、そのコードはそれぞれの処理について成功や失敗を扱う必要があります。
これらの処理は通常の制御フローとは違う処理になり、理解とメンテナンスが難しいコードが生まれてしまいます
RxJSはそれらの計算を扱うのに最高のライブラリとなります。
RxJSは、これらの非同期の計算を扱うための読みやすく、かつ統合・編集しやすいAPIを提供します
var input = document.getElementById('input'); var dictionarySuggest = Rx.Observable.fromEvent(input, 'keyup') .map(function () { return input.value; }) .filter(function (text) { return !!text; }) .distinctUntilChanged() .throttle(250) .flatMapLatest(searchWikipedia) .subscribe( function (results) { list = []; list.concat(results.map(createItem)); }, function (err) { logError(err); } );
このサンプルは、一般的なUIで使われるユーザーの入力に合わせてサジェストする機能の実装例です
RxJSは既存のkeyup
イベントからObservableシーケンスを作成します
そして、このイベント(訳注: Observableシーケンスのこと)を前回と違う値が来た時だけ発火するように、フィルターや変換をシーケンスに重ねていきます
(keyup
イベントはキーを打つすべてのタイミングで作動します。つまり、←や→キーをおしてカーソルを動かし、入力テキストが変化しないケースも含みます)
次に、throttle
オペレータを使い、直前のイベントから250ms経過しないと発動しないようにします
(ユーザーがタイプ中は、サジェストを探索してもすぐに捨てられるため、探索する高コストな処理を省きます)
今までの書き方では、throttle相当の処理をタイマーのコールバックで実装していました。
このタイマーは例外を飛ばす可能性があります(幾つかのタイマーは実行数に制限がある)(訳注: この文どう考えても蛇足な気が)
ユーザーの入力イベントがフィルタリングされたら、次に辞書の探索を開始します。
これらの処理は一般的に重たい処理となります(別の環境にあるサーバにリクエストを投げる等)。この処理も非同期で行います
flatMap
/selectMany
は複数の非同期ストリームを結合するのに役立ちます。
この結合は各ストリームの成功だけではなく、例外(訳注: onErrorのこと)も同様に結合されます
今までの書き方では、複数のコールバックを用意して、各コールバック内で例外処理を行っていました
もし、辞書探索中にユーザーが新たなキー入力を行った場合、辞書探索の結果はもはや見る必要はありません。
ユーザーがより長い単語をタイプ中に古いサジェスト結果を見るのは混乱の元だからです
flatMapLatest
オペレータは新しいkeyup
イベントを検知したら辞書の処理を無視するように働きます
最終的な処理の結果のObervableをsubscribe
します。subscribe
に渡した2つの関数は、以下の時のみ実行されます。
対象のアプリケーション/ライブラリ中に含まれる非同期/イベントベースの処理が少ない、またはそれらの処理を統合・編集する処理が少なく、 RxJSのコスト(学習コスト及びRxJSライブラリ再配布コスト)が自前で実装するコストよりも重たくなる場合
いくつかの既存のライブラリはJavaScriptエコシステムに非同期処理をサポートします。
これらのライブラリは強力ですが、一つのメッセージを返すことに最適化されています。
そして、大抵のライブラリは非同期処理中に複数のメッセージを結果として送信することはサポートしていません
RxJsのメッセージングは以下の形で表されます: onNext
* (onCompleted
|onError
)?
つまり、RxJsは終了までに複数のメッセージを送信できます。このことはRxJSは一つのメッセージを返すのと、2つ以上のメッセージを返すのとの両方の処理に適切に利用可能であることを示しています
var fs = require('fs'); var Rx = require('rx'); // Read/write from stream implementation function readAsync(fd, chunkSize) { /* impl */ } function appendAsync(fd, buffer) { /* impl */ } function encrypt(buffer) { /* impl */} //open a 4GB file for asynchronous reading in blocks of 64K var inFile = fs.openSync('4GBfile.txt', 'r+'); var outFile = fs.openSync('Encrypted.txt', 'w+'); readAsync(inFile, 2 << 15) .map(encrypt) .flatMap(function (data) { return appendAsync(outFile, data); }) .subscribe( function () { }, function (err) { console.log('An error occurred while encrypting the file: %s', err.message); fs.closeSync(inFile); fs.closeSync(outFile); }, function () { console.log('Successfully encrypted the file.'); fs.closeSync(inFile); fs.closeSync(outFile); } );
このサンプルでは、4GBのファイルをすべて読み込み、暗号化して他のファイルに保存しています
ファイルのすべてを読み込んで、暗号化してすべてを書き出すのは高コストな処理になるでしょう
代わりに、RxJSが複数のメッセージを扱えるという点を利用します。
ファイルを64Kブロックごとに非同期で読み取ります。読み取りの結果、byte配列のobervableシーケンスができます。
次に各ブロックをそれぞれ暗号化します(このサンプルではファイルの部分部分を暗号化できると仮定します)。
ブロックの暗号化が終了すると、即座に次の他のファイルに保存するパイプラインに送信されます。
appendAsync
は複数のメッセージを処理できる非同期処理です(訳注: ここで先ほど送信された暗号化済みメッセージが保存される)
対象のアプリケーション/ライブラリ中に複数のメッセージを処理するケースが少なく、 RxJSのコスト(学習コスト及びRxJSライブラリ再配布コスト)が自前で実装するコストよりも重たくなる場合