rxjs随笔2-理解操作符

zhoulu00 2019-11-04

看rxjs文档的时候,我总是被merge -> mergeAll -> mergeMap等类似的名称的操作符困扰。一直在思考这里面是不是有什么联系,结论:merge 与 (mergeAll && mergeMao)没有任何关系,就像这篇文章和白洁没有任何关系一样。
注:果然理解错误了,merge与mergeAll的作用是类似的,不用使用方式不一样。他们都是将接受到的observable进行订阅,不过merge的使用方式是直接将需要订阅的observable对象作为参数然后进行合并,将多个observable对象合并为一个对象然后按照各个数据完成按照时间序列发出值。总之我说的很烂,不知道怎么讲清楚之间的联系。

注:虽然每个操作符都会返回一个新的observables对象,但是每个操作符接受的参数都是上一个操作符返回的值,即上一个操作符完成后返回的新的observables的具体的value,也是上一个操作符中的函数返回的具体的值

merge mergeMap mergeAll

merge:用来合并两个Observables对象,有两种使用方式。函数签名: merge(input: Observable): Observable(注:函数签名很重要,这里能看到merge接受的参数是observable对象。

// RxJS v6+
import { mapTo } from 'rxjs/operators';
import { interval, merge } from 'rxjs';

// 每2.5秒发出值
const first = interval(2500);
// 每2秒发出值
const second = interval(2000);
// 每1.5秒发出值
const third = interval(1500);
// 每1秒发出值
const fourth = interval(1000);

// 从一个 observable 中发出输出值
const example = merge(
  first.pipe(mapTo('FIRST!')),
  second.pipe(mapTo('SECOND!')),
  third.pipe(mapTo('THIRD')),
  fourth.pipe(mapTo('FOURTH'))
);
// 输出: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH"
const subscribe = example.subscribe(val => console.log(val));

或者

// RxJS v6+
import { merge } from 'rxjs/operators';
import { interval } from 'rxjs';

// 每2.5秒发出值
const first = interval(2500);
// 每1秒发出值
const second = interval(1000);
// 作为实例方法使用
const example = first.pipe(merge(second));
// 输出: 0,1,0,2....
const subscribe = example.subscribe(val => console.log(val));

mergeAll

mergeAll: 收集并订阅所有的 observables; 函数签名:mergeAll(concurrent: number): Observable
可以说mergeAll与merge没有任何关系。merge的作用是merge(ob1, ob2, ob3)将多个observable对象合并为一个对象,这个对象按照三个对象的完成先后发出数据流。 mergeAll是作为操作符,对上游传出的数据流进行操作,而操作内容就是如果数据为observable对象则订阅该对象。mergeAll接受数字参数,表示同时最多“并发”订阅多少observable对象,剩下的将被暂存等待订阅。其实这里的mergeAll也并不是“订阅”的作用吧,更准确的应该叫做打平observable对象。 比如,observable对象发出的数据流,看起来都应该是具体的数据,但是万一其中某一步经过处理,返回了一个新的observable对象呢?那么下一个操作符接受的参数就是这个新的observable对象,如果你希望得到的是这个新的observable对象中的有效数据,那么就需要使用mergeAll来对齐进行“打平observable”。相当于promise中的,返回数据如果是一个新的promise的话,则会对这个promise进行resolve然后再进行下一个then。

from([
    of(1),
    of(2).pipe(delay(1000)),
    interval(500).pipe(map(value => 'a' + value), take(6)),
    of('forth')
]).pipe(
    mergeAll()
).subscribe(console.log)

merge(
    of(1),
    of(2).pipe(delay(1000)),
    interval(500).pipe(map(value => 'a' + value), take(6)),
    of('forth'))
.subscribe(console.log)

mergeMap

mergeMap函数签名: mergeMap(project: function: Observable, resultSelector: function: any, concurrent: number): Observable
可以简化一下:mergeMap(project: function: Observable): Observable
mergeMap操作符接受一个参数,该参数是一个返回observable对象的函数,该函数接受上游发出的数据作为参数。这一点和concatmap是一样的。
mergeMap操作符,由上游的数据流驱动,内部函数返回一个observable对象,然后对连续生成的observable连续执行merge操作。

相关推荐