Rxjs 响应式编程-第五章 使用Schedulers管理时间

chimywang 2019-06-28

Rxjs 响应式编程-第一章:响应式
Rxjs 响应式编程-第二章:序列的深入研究
Rxjs 响应式编程-第三章: 构建并发程序
Rxjs 响应式编程-第四章 构建完整的Web应用程序
Rxjs 响应式编程-第五章 使用Schedulers管理时间
Rxjs 响应式编程-第六章 使用Cycle.js的响应式Web应用程序

使用Schedulers管理时间

自从接触RxJS,就开始在我的项目中使用它。有一段时间我以为我知道如何有效地使用它,但有一个令人烦恼的问题:我怎么知道我使用的运算符是同步还是异步?换句话说,Operators到底什么时候发出通知?这似乎是正确使用RxJS的关键部分,但对我来说感觉有点模糊。

我认为,间隔运算符显然是异步的,所以它在内部使用类似setTimeout的东西来发出项目。但是,如果我使用范围怎么办?它也是异步发射的吗?它会阻止事件循环吗?来自哪里?我到处都在使用这些运算符,但我对它们的内部并发模型知之甚少。

然后我了解了Schedulers。

Schedulers是一种强大的机制,可以精确管理应用程序中的并发性。它们允许您随时更改其并发模型,从而对Observable如何发出通知进行细粒度控制。在本章中,您将学习如何使用调度程序并在常见场景中应用它们。我们将专注于测试,调度程序特别有用,您将学习如何制作自己的Schedulers。

使用Schedulers

Schedulers是一种“安排”将来发生的操作的机制。 RxJS中的每个运算符在内部使用一个Schedulers,选择该Schedulers以在最可能的情况下提供最佳性能。

让我们看看我们如何改变运算符中的Schedulers以及这样做的后果。 首先让我们创建一个包含1,000个整数的数组:

var arr = [];
for (var i=0; i<1000; i++) {
    arr.push(i);
}

然后,我们从arr创建一个Observable并强制它通过订阅它来发出所有通知。 在代码中,我们还保存了发出所有通知所需的时间:

var timeStart = Date.now();
Rx.Observable.from(arr).subscribe(
    function onNext() {},
    function onError() {},
    function onCompleted() {
        console.log('Total time: ' + (Date.now() - timeStart) + 'ms');
    }
);
"Total time: 6ms”

六毫秒 - 不坏! from在内部使用Rx.Scheduler.currentThread,它计划在任何当前工作完成后运行。 一旦启动,它将同步处理所有通知。

在让我们将Scheduler更改为Rx.Scheduler.default

var timeStart = Date.now();
Rx.Observable.from(arr, null, null, Rx.Scheduler.default).subscribe(
    function onNext() {},
    function onError() {},
    function onCompleted() {
        console.log('Total time: ' + (Date.now() - timeStart) + 'ms');
    }
);
"Total time: 5337ms”

哇,我们的代码运行速度比使用currentThread Scheduler慢几千倍。 那是因为默认的Scheduler异步运行每个通知。 我们可以通过在订阅后添加一个简单的日志语句来验证这一点。

使用currentThread Scheduler:

Rx.Observable.from(arr).subscribe( ... );
console.log('Hi there!’);
"Total time: 8ms"
"Hi there!"

使用默认Scheduler:

Rx.Observable.from(arr, null, null, Rx.Scheduler.timeout).subscribe( ... );
console.log('Hi there!’);
"Hi there!"
"Total time: 5423ms"

因为使用默认Schedule的Observer以异步方式发出其项目,所以我们的console.log语句(它是同步的)在Observable甚至开始发出任何通知之前执行。 使用currentThread Scheduler,所有通知都会同步发生,因此只有在Observable发出所有通知时才会执行console.log语句。

因此,Scheduler确实可以改变我们的Observable的工作方式。 在我们的例子中,性能确实受到异步处理一个已经可用的大型阵列的影响。 但我们实际上可以使用Scheduler来提高性能。 例如,我们可以在对Observable执行昂贵的操作之前动态切换Scheduler:

arr.groupBy(function(value) {
    return value % 2 === 0;
})
.map(function(value) {
    return value.observeOn(Rx.Scheduler.default);
})
.map(function(groupedObservable) {
    return expensiveOperation(groupedObservable);
});

在前面的代码中,我们将数组中的所有值分为两组:偶数和非偶数。 groupBy返回一个Observable,它为每个创建的组发出一个Observable。 这里是很酷的部分:在运行之前对每个分组的Observable中的项目进行昂贵的操作,我们使用observeOn将Scheduler切换到默认值,这样昂贵的操作将异步执行,而不是阻塞事件循环

observeOn和subscribeOn

在上一节中,我们使用observeOn运算符来更改某些Observable中的Scheduler。 observeOn和subscribeOn是返回Observable实例副本的运算符,但它使用的Scheduler我们作为参数传递的。

observeOn接受一个Scheduler并返回一个使用该Scheduler的新Observable。 它将使每个onNext调用在新的Scheduler中运行。

subscribeOn强制Observable的订阅和取消订阅工作(而不是通知)在特定的Scheduler上运行。 与observeOn一样,它接受Scheduler作为参数。 例如,当我们在浏览器中运行并在订阅调用中执行重要工作时,却不希望用它来阻止UI线程,subscribeOn非常有用。

基本的Rx Scheduler

让我们在我们刚刚使用的Scheduler中深入了解一下。 RxJS的运算符最常用的是immediate,default和currentThread。

Immediate Scheduler

Immediate Scheduler同步发出来自Observable的通知,因此无论何时在Immediate Scheduler上调度操作,它都将立即执行,从而阻塞该线程。 Rx.Observable.range是内部使用Immediate Scheduler序的运算符之一:

console.log('Before subscription');

Rx.Observable.range(1, 5)
.do(function(a) {
    console.log('Processing value', a);
})
.map(function(value) { return value * value; })
.subscribe(function(value) { console.log('Emitted', value); });

console.log('After subscription');
Before subscription
Processing value 1
Emitted 1
Processing value 2
Emitted 4
Processing value 3
Emitted 9
Processing value 4
Emitted 16
Processing value 5
Emitted 25
After subscription

程序输出按我们期望的顺序发生。 每个console.log语句在当前项的通知之前运行。

何时使用它

Immediate Scheduler非常适合于在每个通知中执行可预测且非常昂贵的操作的Observable。 此外,Observable最终必须调用onCompleted。

Default Scheduler

Default Scheduler以异步方式运行操作。 您可以将其视为setTimeout的等价物,其延迟为零毫秒,从而保持序列中的顺序。 它使用其运行的平台上可用的最有效的异步实现(例如,Node.js中的process.nextTick或浏览器中的setTimeout)。

让我们使用前一个使用了range示例,并使其在默认的Scheduler上运行。 为此,我们将使用observeOn运算符:

console.log('Before subscription');

Rx.Observable.range(1, 5)
.do(function(value) {
    console.log('Processing value', value);
})
.observeOn(Rx.Scheduler.default)
.map(function(value) { return value * value; })
.subscribe(function(value) { console.log('Emitted', value); });

console.log('After subscription');
Before subscription
Processing value 1
Processing value 2
Processing value 3
Processing value 4
Processing value 5
After subscription
Emitted 1
Emitted 4
Emitted 9
Emitted 16
Emitted 25

这个输出有很大的不同。 我们的同步console.log语句输出每个值,但我们使Observable在默认的Scheduler上运行,它会异步生成每个值。 这意味着我们在do运算符中的日志语句在平方值之前处理。

何时使用它

Default Scheduler永远不会阻塞事件循环,因此它非常适合涉及时间的操作,如异步请求。 它也可以在从未完成的Observable中使用,因为它不会在等待新通知时阻塞程序(这可能永远不会发生)。

Current Thread Scheduler

currentThread Scheduler与Immediate Scheduler一样是同步的,但是如果我们使用递归运算符,它会将要执行的操作排入队列,而不是立即执行它们。 递归运算符是一个自己调度另一个运算符的运算符。 一个很好的例子就是repeatrepeat运算符 - 如果没有给出参数 - 将无限期地重复链中的先前Observable序列。

如果对使用Immediate Scheduler的运算符(例如return)调用repeat,则会遇到麻烦。 让我们通过重复值10来尝试这个,然后使用take只取重复的第一个值。 理想情况下,代码将打印10次然后退出:

// Be careful: the code below will freeze your environment!
Rx.Observable.return(10).repeat().take(1)
.subscribe(function(value) {
    console.log(value);
});
Error: Too much recursion

此代码导致无限循环。 在订阅时,如return调用onNext(10)然后onCompleted,这使得repeat再次订阅return。 由于返回正在Immediate Scheduler上运行,因此该过程会重复,导致无限循环并且永远不会结束。

但是如果相反我们通过将它作为第二个参数传递给currentThread Scheduler给return,我们得到:

var scheduler = Rx.Scheduler.currentThread;
Rx.Observable.return(10, scheduler).repeat().take(1)
.subscribe(function(value) {
    console.log(value);
});
10

现在,当repeat重新订阅返回时,新的onNext调用将排队,因为之前的onCompleted仍在发生。 repeat然后返回一个可以使用的一次性对象,它调用onCompleted并通过重复处理取消repeat,最终从subscribe返回调用。

何时使用它

currentThread Scheduler对于涉及递归运算符(如repeat)的操作非常有用,并且通常用于包含嵌套运算符的迭代。

动画调度

对于诸如canvas或DOM动画之类的快速视觉更新,我们可以使用具有非常小时间间隔的interval运算符,或者我们可以在内部使用类似setTimeout的函数来调度通知。

但这两种方法都不理想。 在他们两个中我们都在浏览器上抛出所有这些更新,这可能无法足够快地处理它们。之所以会发生这种情况,是因为浏览器正在尝试渲染一个帧,然后它会收到渲染下一帧的指令,因此它会丢弃当前帧以保持速度。 结果是导致动画的不流畅,卡顿。

浏览器具有处理动画的原生方式,并且它们提供了一个使用它的API,称为requestAnimationFramerequestAnimationFrame允许浏览器通过在最合适的时间排列动画来优化性能,并帮助我们实现更流畅的动画。

有专门的Scheduler处理requestAnimationFrame

RxDOM库附带了一些额外的调度程序,其中一个是requestAnimationFrame Scheduler。

是的,你猜对了。 我们可以使用此Scheduler来改进我们的太空飞船视频游戏。 在其中,我们建立了40ms的刷新速度 - 大约每秒25帧 - 通过在该速度下创建一个interval Observable,然后使用combineLatest以间隔设置的速度更新整个游戏场景(因为它是最快速更新的Observable) )...但谁知道浏览器使用这种技术丢帧了多少帧! 使用requestAnimationFrame可以获得更好的性能。

让我们创建一个使用Rx.Scheduler.requestAnimationFrame作为其调度程序的Observable。 请注意,它与interval运算符的工作方式类似:

ch_schedulers/starfield_raf.js

function animationLoop(scheduler) {
    return Rx.Observable.generate(
        0,
        function() { return true; }, // Keep generating forever
        function(x) { return x + 1; }, // Increment internal value
        function(x) { return x; }, // Value to return on each notification
        Rx.Scheduler.requestAnimationFrame
    ); // Schedule to requestAnimationFrame
}

现在,无论何时我们使用了25 FPS动画,我们都可以使用animationLoop函数。 所以我们的Observable绘制了星星,之前看起来像这样:

spaceship_reactive/spaceship.js

var StarStream = Rx.Observable.range(1, 250)
.map(function() {
    return {
        x: parseInt(Math.random() * canvas.width),
        y: parseInt(Math.random() * canvas.height),
        size: Math.random() * 3 + 1
    };
})
.toArray()
.flatMap(function(arr) {
    return Rx.Observable.interval(SPEED).map(function() {
        return arr.map(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0;
            }
            star.y += 3;
            return star;
        });
    });
});

变成这样:

ch_schedulers/starfield_raf.js

var StarStream = Rx.Observable.range(1, 250)
.map(function() {
    return {
        x: parseInt(Math.random() * canvas.width),
        y: parseInt(Math.random() * canvas.height),
        size: Math.random() * 3 + 1
    };
})
.toArray()
.flatMap(function(arr) {
    return animationLoop().map(function() {
        return arr.map(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0;
            }
            star.y += 3;
            return star;
        });
    });
});

这给了我们一个更流畅的动画。 代码也更简洁!

使用Scheduler进行测试

测试可能是我们可以使用Scheduler的最引人注目的场景之一。 到目前为止,在本书中,我们一直在编写我们的核心代码而不考虑后果。 但是在现实世界的软件项目中,我们将编写测试以确保我们的代码按照我们的意图运行。

测试异步代码很难。 我们经常遇到以下问题之一:

  • 模拟异步事件很复杂且容易出错。 测试的重点是避免bug和错误,但如果你的测试本身有错误,那这显然是有问题的。
  • 如果我们想要准确测试基于时间的功能,自动化测试变得非常缓慢。 例如,如果我们需要准确测试在尝试检索远程文件四秒后调用错误,则每个测试至少需要花费很长时间才能运行结束。 如果我们不断运行我们的测试套件,那将影响我们的开发时间。

TestScheduler

RxJS为我们提供了TestScheduler,一个旨在帮助测试的Scheduler。 TestScheduler允许我们在方便时模拟时间并创建确定性测试,确保它们100%可重复。 除此之外,它允许我们执行需要花费大量时间并将其压缩到瞬间的操作,同时保持测试的准确性。

TestScheduler是VirtualTimeScheduler的专业化。 VirtualTimeSchedulers在“虚拟”时间而不是实时执行操作。 计划的操作进入队列并在虚拟时间内分配一个时刻。 然后,Scheduler在其时钟前进时按顺序运行操作。 因为它是虚拟时间,所以一切都立即运行,而不必等待指定的时间。 我们来看一个例子:

var onNext = Rx.ReactiveTest.onNext;
QUnit.test("Test value order", function(assert) {
    var scheduler = new Rx.TestScheduler();
    var subject = scheduler.createColdObservable(
        onNext(100, 'first'),
        onNext(200, 'second'),
        onNext(300, 'third')
    );
    var result = '';
    subject.subscribe(function(value) { result = value });
    scheduler.advanceBy(100);
    assert.equal(result, 'first');
    scheduler.advanceBy(100);
    assert.equal(result, 'second');
    scheduler.advanceBy(100);
    assert.equal(result, 'third');
});

在前面的代码中,我们测试了来自冷Observable的一些值以正确的顺序到达。 为此,我们在TestScheduler中使用helper方法createColdObservable来创建一个Observable,它回放我们作为参数传递的onNext通知。 在每个通知中,我们指定应该发出通知值的时间。 在此之后,我们订阅此Observable,手动提前调度程序中的虚拟时间,并检查它是否确实发出了预期值。 如果示例在正常时间运行,则需要300毫秒,但由于我们使用TestScheduler来运行Observable,它将立即运行,但完全按照我们的顺序。

写一个真实的测试案例

没有比在现实世界中为时间敏感的任务编写测试更好的方法来理解如何使用虚拟时间来缩短时间。 让我们从我们在缓冲值中制作的地震查看器中恢复一个Observable:

quakes
.pluck('properties')
.map(makeRow)
.bufferWithTime(500)
.filter(function(rows) { return rows.length > 0; })
.map(function(rows) {
    var fragment = document.createDocumentFragment();
    rows.forEach(function(row) {
        fragment.appendChild(row);
    });
    return fragment;
})
.subscribe(function(fragment) {
    table.appendChild(fragment);
});

为了使代码更易于测试,让我们将Observable封装在一个函数中,该函数接受我们在bufferWithTime运算符中使用的Scheduler。在Obpectables中参数化将要测试的Scheduler总是一个好主意。

ch_schedulers/testscheduler.js

function quakeBatches(scheduler) {
    return quakes.pluck('properties')
    .bufferWithTime(500, null, scheduler || null)
    .filter(function(rows) {
        return rows.length > 0;
    });
}

让我们通过采取一些步骤来简化代码,但保持本质。 此代码采用包含属性属性的Observable JSON对象,将它们缓冲到每500毫秒释放的批次中,并过滤掉空的批次。

我们想要验证此代码是否有效,但我们绝对不希望每次运行测试时都等待几秒钟,以确保我们的缓冲按预期工作。 这是虚拟时间和TestScheduler将帮助我们的地方:

ch_schedulers/testscheduler.js

❶ var onNext = Rx.ReactiveTest.onNext;
var onCompleted = Rx.ReactiveTest.onCompleted;
var subscribe = Rx.ReactiveTest.subscribe;
❷ var scheduler = new Rx.TestScheduler();
❸ var quakes = scheduler.createHotObservable(
    onNext(100, { properties: 1 }),
    onNext(300, { properties: 2 }),
    onNext(550, { properties: 3 }),
    onNext(750, { properties: 4 }),
    onNext(1000, { properties: 5 }),
    onCompleted(1100)
);
❹ QUnit.test("Test quake buffering", function(assert) {
    ❺ var results = scheduler.startScheduler(function() {
        return quakeBatches(scheduler)
    }, {
        created: 0,
        subscribed: 0,
        disposed: 1200
    });
    ❻ var messages = results.messages;
    console.log(results.scheduler === scheduler);
    ❼ assert.equal(
        messages[0].toString(),
        onNext(501, [1, 2]).toString()
    );
    assert.equal(
        messages[1].toString(),
        onNext(1001, [3, 4, 5]).toString()
    );
    assert.equal(
        messages[2].toString(),
        onCompleted(1100).toString()
    );
});

让我们一步一步地剖析代码:

  1. 我们首先从ReactiveTest加载一些辅助函数。 这些在虚拟时间内注册onNext,onCompleted和订阅事件。
  2. 我们创建了一个新的TestScheduler,它将推动整个测试。
  3. 我们使用TestScheduler中的方法createHotObservable创建一个假的热Observable,它将在虚拟时间内模拟特定点的通知。 特别是,它在第一秒发出五个通知,并在1100毫秒完成。 每次它发出一个具有特定属性的对象。
  4. 我们可以使用任何测试框架来运行测试。 对于我们的例子,我选择了QUnit。
  5. 我们使用startScheduler方法创建一个使用测试调度程序的Observable。 第一个参数是一个函数,它创建Observable以使用我们的Scheduler运行。 在我们的例子中,我们只返回我们传递TestScheduler的quakeBatches函数。 第二个参数是一个对象,它包含我们想要创建Observable的不同虚拟时间,订阅它并处理它。 对于我们的示例,我们在虚拟时间0开始和订阅,并且我们在1200(虚拟)毫秒处理Observable。
  6. startScheduler方法返回一个带有scheduler和messages属性的对象。 在消息中,我们可以在虚拟时间内找到Observable发出的所有通知。
  7. 我们的第一个断言测试在501毫秒之后(在第一个缓冲时间限制之后),我们的Observable产生值1和2。
    我们的第二个断言测试在1001毫秒后,我们的Observable产生剩余的值3,4和5.最后,我们的第三个断言检查序列是否完全在1100毫秒完成,正如我们在热的Observable地震中所指出的那样。

该代码以非常可靠的方式有效地测试我们的高度异步的Observable,并且无需跳过箍来模拟异步条件。我们只是指定我们希望代码在虚拟时间内作出反应的时间,我们使用测试调度程序来运行整个操作。

总结

Scheduler是RxJS的重要组成部分。 即使您可以在没有明确使用它们的情况下走很长的路,它们也是一种先进的概念,它可以让您在程序中微调并发性。虚拟时间的概念是RxJS独有的,对于测试异步代码等任务非常有用。

在下一章中,我们将使用Cycle.js,这是一种基于称为单向数据流的概念来创建令人惊叹的Web应用程序的反应方式。有了它,我们将使用现代技术创建一个快速的Web应用程序,从而显着改进传统的Web应用程序制作方式。

关注我的微信公众号,更多优质文章定时推送
Rxjs 响应式编程-第五章 使用Schedulers管理时间

相关推荐