RxのObservableに関するオペレータやその他もろもろに関する逆引きです。

基本的にRxJS+αなので、各実装でどういう名前になっているかはドキュメントを参照してください

メモ: メッセージ=ストリームから飛んでくる値のこと。イベント=メッセージが飛んでくる+onComplete/onError。ストリーム=Observable。RxJSはPromiseもそのままストリームとして扱えることが多い。

Observableのオペレータ

生成

したいことオペレータ名備考
ストリームをSubscribe時の動作から作成するcreate
ストリームをfor文っぽい形で生成するgenerate
指定した値を返すだけのストリームを作成するreturn/just
指定した値を繰り返すストリームを作成するrepeat
例外を投げるストリームを作成するthrow
空のストリームを作成するemptyonCompleteだけ飛んでくる
何も流れてこないストリームを作成するneverこちらはonCompleteも飛んでこない
DOM要素のイベントからストリームを作成するfromEventHot Observable
fromEventでは扱えないイベントをストリームに変換するfromEventPatternHot Observable
ES6のPromiseからストリームに変換するfromPromise
イテレータからストリームに変換するfrom
mapからストリームに変換するpairs
各要素がObservable/Promiseなイテレータからストリームに変換するfor
連続した数字のストリームを作成するrange
引数に入れた値をそのままストリームにしたいof
一定間隔ごとにメッセージを発信するストリームを作成するinterval
一定時間後に一定間隔でメッセージを発信するストリームを作成するtimer
ある関数を非同期で実行して、結果をストリームにしたいstart同期メソッドを非同期に実行するのに使える。変換する場合はtoAsyncを使用
Promiseを返す非同期関数を実行して、結果をストリームにしたいstartAsync
同期関数を非同期関数に変換したいtoAsyncObservableを返す関数に変換される。そのまま実行する場合はstartを使用
結果をコールバックするタイプの非同期関数をストリームに変換したいfromCallback
Node.jsタイプの結果をコールバックするタイプの非同期関数をストリームに変換したいfromNodeCallback
とても複雑なObservableシーケンスの合成/作成がしたいwhen及びand及びthen
それ以外の独自方法でシーケンスを作りたいSubjectのどれかを使用

メッセージの変換

したいことオペレータ名備考
メッセージを変えたいmap/select歴史的経緯により、LINQがある言語はselect、無い言語はmapとなる
メッセージの特定のプロパティの値が欲しいpluckC#/Javaには無い。動的言語ならでは
ストリームのイベントのメタ情報をメッセージに付与したい/onComplete/onErrorもメッセージ化したいmaterialize
メッセージに前回メッセージからの経過時間を付与したいtimeInterval
メッセージにタイムスタンプを付与したいtimeStamp
メッセージ中のストリームを並列に展開したいselectMany/flatMapRxJavaのjavadocが分かりやすい
メッセージ中のストリームを直列に展開したいselectConcat/concatMapRxJavaのjavadocが分かりやすい
メッセージ中のストリームを並列に展開するが、新しいストリームから値が来たら古いストリームを無視したいselectSwitch/switchMapRxJavaのjavadocが分かりやすい
メッセージごとにストリームを引き延ばしたいexpand

フィルタ・カット

したいことオペレータ名備考
ストリームを条件式でフィルタしたいfilter/where歴史的経緯により、LINQがある言語はwhere、無い言語はfilterとなる
ストリームを先頭から指定した件数までで切りたいtake
ストリームを先頭から条件式が成立しなくなるまでで切りたいtakeWhile
ストリームを先頭から他のストリームから値が来るまでで切りたいtakeUntil
ストリームを最後から指定した件数までにしたいtakeLast最後はOnCompleteが無いと決まらないことに注意。
ストリームのonComplete/onErrorだけを通したいignoreelements
ストリームを指定した件数まで飛ばしたいskip
ストリームを条件式が成立するまで飛ばしたいskipWhile
ストリームを他のストリームから値が来るまで飛ばしたいskipUntil
ストリームを最後から指定した件数を飛ばしたいskipLast
メッセージが変わった時だけメッセージを通したいdistinctUntilChanged
メッセージが高頻度で飛びすぎなのを削りたいthrottle/debouncethrottleは言語によってはdeprecatedなので注意
メッセージを最初の一つだけ通したいsingle/firstメッセージが無い時はエラーとなる。singleは2つ以上来た時もはerrorとなる
メッセージを最初一つだけ、無い場合は特定の値を通したいsingleOrDefault/firstOrDefaultsilgleOrDefaultは2つ以上来た時はerrorとなる
メッセージを最後の一つだけ通したいlastメッセージが無い時はエラーとなる
メッセージを最後一つだけ、無い場合は特定の値を通したいlastOrDefault
メッセージを一定間隔で削りたいsample/throttleLastthrottleと似た機能

時間や前後

したいことオペレータ名備考
ストリームにタイムアウトを指定したいtimeouttimeout時にerrorにするか、別のストリームにするかを選べる
ストリームを遅延させたいdelay/delayWithSelector
メッセージを前後で束ねて配列にして、新たなメッセージにしたいbufferRxJSは条件の型に合わせてbufferWithCount/bufferWithTime等のメソッドを用意している
メッセージを前後で束ねてObservableにして、新たなメッセージにしたいwindowRxJSは条件の型に合わせてwindowWithCount/windowWithTime等のメソッドを用意している
ストリームを特定の間隔ごとにグループ分けしたいgroupByUntil

ストリームの分岐・合成

したいことオペレータ名備考
複数のストリームの最後のメッセージを新たなストリームにしたいforkJoin
ストリームを分岐したい/複数Subscribeしたいpublish/shareHot変換。shareはSubscribeされた時に自動でconnectする
ストリームを分岐したいが、その時に好きなSubjectを使いたいmulticastHot変換
ストリームを分岐し、分岐先には直前の値を返したいpublishLast/shareLastHot変換だが、cold的要素あり。内部的にはAsyncSubjectをmulticastしたのと同じ
ストリームを分岐し、分岐先にはsubscribe時に初期値を与えたいpublishValue/shareValueHot変換
ストリームを分岐&キャッシュしたいreplay/shareReplayHot変換だが、cold的要素あり。内部的にはReplaySubjectをmulticastしたのと同じ
複数のストリームのうち、一番速くメッセージが来たストリームを選択したいamb1番速くメッセージが来たストリームのメッセージが流れ続ける
複数のストリームに同じメッセージが飛んできているかを判定したいsequenceEqual
複数のストリームのうち、どれか一つにメッセージが来たら、他のストリームの直前のメッセージと合わせて流したいcombineLatest
複数のストリームのうち、すべてに一つメッセージが来たら、合わせて流したいzip
複数のストリームのうち、メッセージ同士のタイミングのズレが一定以内なら、合わせて流したいjoin/gropuJoin
複数のストリームをそのままmergeしたいmerge
終了時に次のストリームを繋げたいconcat

ストリーム全体への処理

したいことオペレータ名備考
メッセージの合計/平均が取りたいsum/average
メッセージ全体に対して何らかの計算をして、一つの結果値を得たいaggregate/reducesumやaverage的なことを自作する時に使用。aggregateは言語によってはdeprecated
メッセージと前回の結果とで何らかの計算をして次のストリームに流したいscanwindowやthrottle的なことを自作する時に使用。
ストリームに含まれるメッセージ数を数えたいcount
ストリームに特定のメッセージが含まれているかを調べたいcontains
ストリームにメッセージが1つ以上含まれているかを調べたいany/some条件式を入れることもできる
ストリームのすべてのメッセージが条件式に適合するかを調べたいall/every
ストリーム全体を配列/mapに変換したいtoMaptoArray/ToDictionary(C#)/ToList/toSet等々。
ストリーム全体をキーごとにグループ分けしたいgroupBy

非同期

したいことオペレータ名備考
subscribe時の処理を特定のタイミングやスレッドで行いたいsubscribeOn
ストリームを特定のスレッドに切り替えたい/非同期で行いたいobserveOn
ある関数を非同期で実行して、結果をストリームにしたいstart同期メソッドを非同期に実行するのに使える。変換する場合はtoAsyncを使用
Promiseを返す非同期関数を実行して、結果をストリームにしたいstartAsync
同期関数を非同期関数に変換したいtoAsyncObservableを返す関数に変換される。そのまま実行する場合はstartを使用
結果をコールバックするタイプの非同期関数をストリームに変換したいfromCallback
Node.jsタイプの結果をコールバックするタイプの非同期関数をストリームに変換したいfromNodeCallback

遅延評価

したいことオペレータ名備考
Subscribe時にストリームを2つから1つ選択したいif/ifThenifThenはIE8用
Subscribe時に多数のストリームから1つ選択したいcase/switchCaseswitchCaseはIE8用
Observable作成をSubscribe時まで遅延させたいdefer
Observable作成をSubscribe時まで遅延させ、シーケンスにリソースを紐付けたいusingリソースはシーケンス終了後、リソース.dispose()が呼び出される

その他

したいことオペレータ名備考
ストリームには影響をあたえないで何か処理したいdo/tap派生でdoOnError等がある
メソッドチェーンを壊さずに複雑な処理をしたいletこれ需要あるのかなぁ?
エラー時に再度subscribeしたいretry
エラー時にエラーに対処したいcatch
エラー時に次のストリームを繋げたいonErrorResumeNext
終了時に再度subscribeしたいrepeat
終了またはエラー時に何かしたいfinally

オペレータ以外のRxの要素

Subject一覧

onComplete後にSubscribeした場合もメッセージとonCompleteが送られてくる
名前機能備考
Subject基本的なSubject。Hotなobservableで、onNext/onComplete/onErrorをそのままsubscribeしているObserverに伝える
ReplaySubjectSubjectの派生。違う点はメッセージをすべてキャッシュし、新たにsubscribeした時にキャッシュしたメッセージをすべて再送する
BehaviorSubjectSubjectの派生。初期値を持ち、違う値がメッセージとして来た時にその値をキャッシュしてメッセージを流す。Subscribe時に直近の値か初期値を再送する
AsyncSubjectPromise的動きをするSubject。最後のメッセージをキャッシュし、onCompleteが呼び出された時にメッセージを放出してonCompleteを送出する。

その他

したいこと解決法
ストリームで飛ばす値が無い/voidを飛ばしたいUnitを飛ばすようにする。Unitの実装が無い言語(RxJS)もあるので、その場合はUnitを実装する
.NET3.5以降のeventからストリームを作成する(Rx.NET)fromEventを使用する
Taskからストリームを作成する(Rx.NET)Task.ToObservableを使用する
Futureからストリームを作成する(RxJava)Observable.fromを使用する
毎フレームのUpdateと同等のストリームを作成する(UniRx)Observable.EveryUpdateを使用する