ZadarrienChina 2020-04-26
import { interval } from "rxjs"; import { take } from "rxjs/operators"; const interval$ = interval(1000).pipe(take(3)); interval$.subscribe(value => console.log("Observer A get value: " + value)); setTimeout(() => { interval$.subscribe(value => console.log("Observer B get value: " + value)); }, 1000);
输出
Observer A get value: 0 Observer A get value: 1 Observer B get value: 0 Observer A get value: 2 Observer B get value: 1 Observer B get value: 2
可以看到
import { interval, Subject } from "rxjs"; import { take } from "rxjs/operators"; const interval$ = interval(1000).pipe(take(3)); const subject = new Subject(); const observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; const observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.subscribe(observerA); // 添加观察者A interval$.subscribe(subject); // 订阅interval$对象 setTimeout(() => { subject.subscribe(observerB); // 添加观察者B }, 1000);
输出
Observer A get value: 0 Observer A get value: 1 Observer B get value: 1 Observer A get value: 2 Observer B get value: 2 Observer A complete! Observer B complete!
可以看到
除了 Subject 之外,还有BehaviorSubject、ReplaySubject 和 AsyncSubject。
先看一个Subject的例子。
import { Subject } from "rxjs"; const subject = new Subject(); const observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; const observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒后订阅 }, 1000);
输出
Observer A get value: 1 Observer A get value: 2 Observer A get value: 3
这里的observerB没有订阅。
因为 Subject 对象没有再调用 next() 方法。
这里的Subject 不能保存当前的最新状态。
如果希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。
使用 BehaviorSubject。
import { BehaviorSubject } from "rxjs"; const subject = new BehaviorSubject(0); const observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; const observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒后订阅 }, 1000);
输出
Observer A get value: 0 Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 3
同时我们看到const subject = new BehaviorSubject(0);
有一个初始值为0,它用于表示 Subject 对象当前的状态。
在subject.subscribe(observerA);
这条语句执行后,便会输出Observer A get value: 0
。
如果我们希望新增的订阅者,可以接收到数据源最近发送的几个值。
可以使用ReplaySubject。
import { ReplaySubject } from "rxjs"; const subject = new ReplaySubject(2); const observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; const observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒后订阅 }, 1000);
输出
Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 2 Observer B get value: 3
当你把const subject = new ReplaySubject(2);
改为const subject = new ReplaySubject(1);
输出
Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 3
AsyncSubject 类似于 last 操作符,它会在 Subject 结束后发出最后一个值。
import { AsyncSubject } from "rxjs"; const subject = new AsyncSubject(); const observerA = { next: value => console.log("Observer A get value: " + value), error: error => console.log("Observer A error: " + error), complete: () => console.log("Observer A complete!") }; const observerB = { next: value => console.log("Observer B get value: " + value), error: error => console.log("Observer B error: " + error), complete: () => console.log("Observer B complete!") }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); subject.complete(); setTimeout(() => { subject.subscribe(observerB); // 1秒后订阅 }, 1000);
输出
Observer A get value: 3 Observer A complete! Observer B get value: 3 Observer B complete!
当你注释掉subject.complete();
则什么也不会输出。
因为subject没有结束。
参考:
RxJS Subject