問題

以下のコードを実行するとside effectは何回表示されるか?

var source = new Rx.Subject<number>();
var stream = source.do(i => console.log("side effect"));
    
stream.subscribeOnNext(i => console.log("child1: " + i));
source.onNext(1);
stream.subscribeOnNext(i => console.log("child2: " + i));
source.onNext(2);
        

答え

3回

なぜ2回ではないのか?

Rxでは基本的にオペレータをつないだ時にストリームの接続が適用されるのではなく、Subscribeした時に初めて接続が適用される

つまり、上記コードでいえば、streamというのは「sourceにdoオペレータを噛まします」という設計だけが保持されている状態で、
実際に接続されるのはsubscribeした時。
この時に「sourceにdoオペレータをかましてsubscribeに流す」というストリームが新規作成される

上記コードではsubscribeが2回呼ばれているから、ストリームが2本つくられることになる。

それを図にすると以下になる

       /→do→subscribe
Subject
       \→do→subscribe
        

onNext(1)時点ではストリームが1本存在し、onNext(2)時点ではストリームが2本存在するから、合計で3回流れることになる

このようにSubscribeした段階で値を流したり、ストリームを接続するタイプのObservableシーケンスの性質のことを"Cold"なObservableといいます(ある種の遅延評価と思うと理解しやすいのではないでしょうか)

逆に、Subjectのようにストリームが接続されて無くても値を流したり、ストリームの分岐になるObservableシーケンスの性質のことを"Hot"なObservableといいます(Cold/Hotという対立する文言だが、これらの性質は対立するものではない)

上記のstreamをdoの後で分岐させるには?

publish/share/multicast等の"Hot"に変換するメソッドを使います

var source = new Rx.Subject<number>();
var stream = source.do(i => console.log("side effect")).share();//shareを追加
    
stream.subscribeOnNext(i => console.log("child1: " + i));
source.onNext(1);
stream.subscribeOnNext(i => console.log("child2: " + i));
source.onNext(2);

これにより、ストリームは以下の図になります

                  /→subscribe
Subject→do→share
                  \→subscribe
        

shareの実装について

shareは内部でSubjectを持っており、誰かがshareの結果のobservableをsubscribeした時点で、1回だけsubscribeする動作になっています

これにより、shareより手前のストリームは1回しかsubscribeされず、side effectは2回しか実行されません

このshareが行っている内部でsubscribeを作動させて繋げることをCold→Hot変換といいます

この変換を任意のタイミング(connectの呼び出し)で出来るようにするのがpublishで、他の種類のSubjectでも出来るようにするのがmulticastになります

どれがColdでどれがHotなのか?

基本的にSubjectとかfromEvent/等で作ったイベント系以外はColdと思ってください

また、AsyncSubjectみたいな両方の性質を持った変なのもいるので注意しましょう

実際のコードでの注意点

複数subscribeする可能性のある箇所。特にObservableシーケンスの公開時に注意しましょう

基本的に複数subscribeする可能性のある場所は、Subject直とかを除き、share等を使ってHot変換しておくといいでしょう