Rx勉強会
RxのObservableに関するオペレータやその他もろもろに関する逆引きです。
基本的にRxJS+αなので、各実装でどういう名前になっているかはドキュメントを参照してください
メモ: メッセージ=ストリームから飛んでくる値のこと。イベント=メッセージが飛んでくる+onComplete/onError。ストリーム=Observable。RxJSはPromiseもそのままストリームとして扱えることが多い。
| したいこと | オペレータ名 | 備考 |
|---|---|---|
| ストリームをSubscribe時の動作から作成する | create | |
| ストリームをfor文っぽい形で生成する | generate | |
| 指定した値を返すだけのストリームを作成する | return/just | |
| 指定した値を繰り返すストリームを作成する | repeat | |
| 例外を投げるストリームを作成する | throw | |
| 空のストリームを作成する | empty | onCompleteだけ飛んでくる |
| 何も流れてこないストリームを作成する | never | こちらはonCompleteも飛んでこない |
| DOM要素のイベントからストリームを作成する | fromEvent | Hot Observable |
| fromEventでは扱えないイベントをストリームに変換する | fromEventPattern | Hot Observable |
| ES6のPromiseからストリームに変換する | fromPromise | |
| イテレータからストリームに変換する | from | |
| mapからストリームに変換する | pairs | |
| 各要素がObservable/Promiseなイテレータからストリームに変換する | for | |
| 連続した数字のストリームを作成する | range | |
| 引数に入れた値をそのままストリームにしたい | of | |
| 一定間隔ごとにメッセージを発信するストリームを作成する | interval | |
| 一定時間後に一定間隔でメッセージを発信するストリームを作成する | timer | |
| ある関数を非同期で実行して、結果をストリームにしたい | start | 同期メソッドを非同期に実行するのに使える。変換する場合はtoAsyncを使用 |
| Promiseを返す非同期関数を実行して、結果をストリームにしたい | startAsync | |
| 同期関数を非同期関数に変換したい | toAsync | Observableを返す関数に変換される。そのまま実行する場合はstartを使用 |
| 結果をコールバックするタイプの非同期関数をストリームに変換したい | fromCallback | |
| Node.jsタイプの結果をコールバックするタイプの非同期関数をストリームに変換したい | fromNodeCallback | |
| とても複雑なObservableシーケンスの合成/作成がしたい | when及びand及びthen | |
| それ以外の独自方法でシーケンスを作りたい | Subjectのどれかを使用 |
| したいこと | オペレータ名 | 備考 |
|---|---|---|
| メッセージを変えたい | map/select | 歴史的経緯により、LINQがある言語はselect、無い言語はmapとなる |
| メッセージの特定のプロパティの値が欲しい | pluck | C#/Javaには無い。動的言語ならでは |
| ストリームのイベントのメタ情報をメッセージに付与したい/onComplete/onErrorもメッセージ化したい | materialize | |
| メッセージに前回メッセージからの経過時間を付与したい | timeInterval | |
| メッセージにタイムスタンプを付与したい | timeStamp | |
| メッセージ中のストリームを並列に展開したい | selectMany/flatMap | RxJavaのjavadocが分かりやすい |
| メッセージ中のストリームを直列に展開したい | selectConcat/concatMap | RxJavaのjavadocが分かりやすい |
| メッセージ中のストリームを並列に展開するが、新しいストリームから値が来たら古いストリームを無視したい | selectSwitch/switchMap | RxJavaのjavadocが分かりやすい |
| メッセージごとにストリームを引き延ばしたい | expand |
| したいこと | オペレータ名 | 備考 |
|---|---|---|
| ストリームを条件式でフィルタしたい | filter/where | 歴史的経緯により、LINQがある言語はwhere、無い言語はfilterとなる |
| ストリームを先頭から指定した件数までで切りたい | take | |
| ストリームを先頭から条件式が成立しなくなるまでで切りたい | takeWhile | |
| ストリームを先頭から他のストリームから値が来るまでで切りたい | takeUntil | |
| ストリームを最後から指定した件数までにしたい | takeLast | 最後はOnCompleteが無いと決まらないことに注意。 |
| ストリームのonComplete/onErrorだけを通したい | ignoreelements | |
| ストリームを指定した件数まで飛ばしたい | skip | |
| ストリームを条件式が成立するまで飛ばしたい | skipWhile | |
| ストリームを他のストリームから値が来るまで飛ばしたい | skipUntil | |
| ストリームを最後から指定した件数を飛ばしたい | skipLast | |
| メッセージが変わった時だけメッセージを通したい | distinctUntilChanged | |
| メッセージが高頻度で飛びすぎなのを削りたい | throttle/debounce | throttleは言語によってはdeprecatedなので注意 |
| メッセージを最初の一つだけ通したい | single/first | メッセージが無い時はエラーとなる。singleは2つ以上来た時もはerrorとなる |
| メッセージを最初一つだけ、無い場合は特定の値を通したい | singleOrDefault/firstOrDefault | silgleOrDefaultは2つ以上来た時はerrorとなる |
| メッセージを最後の一つだけ通したい | last | メッセージが無い時はエラーとなる |
| メッセージを最後一つだけ、無い場合は特定の値を通したい | lastOrDefault | |
| メッセージを一定間隔で削りたい | sample/throttleLast | throttleと似た機能 |
| したいこと | オペレータ名 | 備考 |
|---|---|---|
| ストリームにタイムアウトを指定したい | timeout | timeout時にerrorにするか、別のストリームにするかを選べる |
| ストリームを遅延させたい | delay/delayWithSelector | |
| メッセージを前後で束ねて配列にして、新たなメッセージにしたい | buffer | RxJSは条件の型に合わせてbufferWithCount/bufferWithTime等のメソッドを用意している |
| メッセージを前後で束ねてObservableにして、新たなメッセージにしたい | window | RxJSは条件の型に合わせてwindowWithCount/windowWithTime等のメソッドを用意している |
| ストリームを特定の間隔ごとにグループ分けしたい | groupByUntil |
| したいこと | オペレータ名 | 備考 |
|---|---|---|
| 複数のストリームの最後のメッセージを新たなストリームにしたい | forkJoin | |
| ストリームを分岐したい/複数Subscribeしたい | publish/share | Hot変換。shareはSubscribeされた時に自動でconnectする |
| ストリームを分岐したいが、その時に好きなSubjectを使いたい | multicast | Hot変換 |
| ストリームを分岐し、分岐先には直前の値を返したい | publishLast/shareLast | Hot変換だが、cold的要素あり。内部的にはAsyncSubjectをmulticastしたのと同じ |
| ストリームを分岐し、分岐先にはsubscribe時に初期値を与えたい | publishValue/shareValue | Hot変換 |
| ストリームを分岐&キャッシュしたい | replay/shareReplay | Hot変換だが、cold的要素あり。内部的にはReplaySubjectをmulticastしたのと同じ |
| 複数のストリームのうち、一番速くメッセージが来たストリームを選択したい | amb | 1番速くメッセージが来たストリームのメッセージが流れ続ける |
| 複数のストリームに同じメッセージが飛んできているかを判定したい | sequenceEqual | |
| 複数のストリームのうち、どれか一つにメッセージが来たら、他のストリームの直前のメッセージと合わせて流したい | combineLatest | |
| 複数のストリームのうち、すべてに一つメッセージが来たら、合わせて流したい | zip | |
| 複数のストリームのうち、メッセージ同士のタイミングのズレが一定以内なら、合わせて流したい | join/gropuJoin | |
| 複数のストリームをそのままmergeしたい | merge | |
| 終了時に次のストリームを繋げたい | concat |
| したいこと | オペレータ名 | 備考 |
|---|---|---|
| メッセージの合計/平均が取りたい | sum/average | |
| メッセージ全体に対して何らかの計算をして、一つの結果値を得たい | aggregate/reduce | sumやaverage的なことを自作する時に使用。aggregateは言語によってはdeprecated |
| メッセージと前回の結果とで何らかの計算をして次のストリームに流したい | scan | windowやthrottle的なことを自作する時に使用。 |
| ストリームに含まれるメッセージ数を数えたい | count | |
| ストリームに特定のメッセージが含まれているかを調べたい | contains | |
| ストリームにメッセージが1つ以上含まれているかを調べたい | any/some | 条件式を入れることもできる |
| ストリームのすべてのメッセージが条件式に適合するかを調べたい | all/every | |
| ストリーム全体を配列/mapに変換したい | toMap他 | toArray/ToDictionary(C#)/ToList/toSet等々。 |
| ストリーム全体をキーごとにグループ分けしたい | groupBy |
| したいこと | オペレータ名 | 備考 |
|---|---|---|
| subscribe時の処理を特定のタイミングやスレッドで行いたい | subscribeOn | |
| ストリームを特定のスレッドに切り替えたい/非同期で行いたい | observeOn | |
| ある関数を非同期で実行して、結果をストリームにしたい | start | 同期メソッドを非同期に実行するのに使える。変換する場合はtoAsyncを使用 |
| Promiseを返す非同期関数を実行して、結果をストリームにしたい | startAsync | |
| 同期関数を非同期関数に変換したい | toAsync | Observableを返す関数に変換される。そのまま実行する場合はstartを使用 |
| 結果をコールバックするタイプの非同期関数をストリームに変換したい | fromCallback | |
| Node.jsタイプの結果をコールバックするタイプの非同期関数をストリームに変換したい | fromNodeCallback |
| したいこと | オペレータ名 | 備考 |
|---|---|---|
| Subscribe時にストリームを2つから1つ選択したい | if/ifThen | ifThenはIE8用 |
| Subscribe時に多数のストリームから1つ選択したい | case/switchCase | switchCaseはIE8用 |
| Observable作成をSubscribe時まで遅延させたい | defer | |
| Observable作成をSubscribe時まで遅延させ、シーケンスにリソースを紐付けたい | using | リソースはシーケンス終了後、リソース.dispose()が呼び出される |
| したいこと | オペレータ名 | 備考 |
|---|---|---|
| ストリームには影響をあたえないで何か処理したい | do/tap | 派生でdoOnError等がある |
| メソッドチェーンを壊さずに複雑な処理をしたい | let | これ需要あるのかなぁ? |
| エラー時に再度subscribeしたい | retry | |
| エラー時にエラーに対処したい | catch | |
| エラー時に次のストリームを繋げたい | onErrorResumeNext | |
| 終了時に再度subscribeしたい | repeat | |
| 終了またはエラー時に何かしたい | finally |
| 名前 | 機能 | 備考 |
|---|---|---|
| Subject | 基本的なSubject。Hotなobservableで、onNext/onComplete/onErrorをそのままsubscribeしているObserverに伝える | |
| ReplaySubject | Subjectの派生。違う点はメッセージをすべてキャッシュし、新たにsubscribeした時にキャッシュしたメッセージをすべて再送する | |
| BehaviorSubject | Subjectの派生。初期値を持ち、違う値がメッセージとして来た時にその値をキャッシュしてメッセージを流す。Subscribe時に直近の値か初期値を再送する | |
| AsyncSubject | Promise的動きをするSubject。最後のメッセージをキャッシュし、onCompleteが呼び出された時にメッセージを放出してonCompleteを送出する。 | onComplete後にSubscribeした場合もメッセージとonCompleteが送られてくる |
| したいこと | 解決法 |
|---|---|
| ストリームで飛ばす値が無い/voidを飛ばしたい | Unitを飛ばすようにする。Unitの実装が無い言語(RxJS)もあるので、その場合はUnitを実装する |
| .NET3.5以降のeventからストリームを作成する(Rx.NET) | fromEventを使用する |
| Taskからストリームを作成する(Rx.NET) | Task.ToObservableを使用する |
| Futureからストリームを作成する(RxJava) | Observable.fromを使用する |
| 毎フレームのUpdateと同等のストリームを作成する(UniRx) | Observable.EveryUpdateを使用する |