MongoDB学习7:Change Strean

大秦铁骑 2020-08-18

1.什么是Change Stream?

Change Stream是MongoDB用于实现变更追踪的解决方案,类似于关系型数据库的触发器,但原理不完全相同

Change Stream触发器
触发方式异步同步(事务保证)
触发位置应用回调事件数据库触发器
触发次数每个订阅事件的客户端1次(触发器)
故障恢复从上一次断点重新触发事务回滚

2.Change Stream实现原理

Change Stream是基于oplog实现的。它在oplog上开启一个tailable cursor来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数
被追踪的变更事件主要包括:

  • insert/update/delete:插入、更新、删除
  • drop:集合被删除
  • rename:集合被重命名
  • dropDatabase:数据库被删除
  • invalidate:drop/rename/dropDatabase将导致invalidate被触发,并关闭change stream

3.Change Stream与可重复读

Change Stream只推送已经在大多数节点上提交的变更操作。级“可重复度”的变更,这个验证是通过{readConcern:"majority"}实现的,因此

  • 未开启 readConcern:"majority" 的集群无法使用 Change Stream
  • 当集群无法满足{w:"majority"}时,不会触发 Change Stream(例如PSA架构中的S因故障宕机)

4.Change Stream 变更过滤

如果只对某些类型的变更时间感兴趣,可以使用聚合管道的过滤步骤过滤事件

var cs = db.collection.watch([{
  $match:{
    operationType:{
     $in:{"insert","delete"}
    }
  }
}])

5.开启Change Stream功能

默认是没有开启Change Stream功能的,在配置文件中设置

replication:
  enableMajorityReadConcern: true

6.Change Stream 故障恢复

假如在一系列写入操作的过程中,订阅Change Stream的应用在接收到“写3”之后与 t0 时刻崩溃,重启后后续变更怎么办?
MongoDB学习7:Change Strean
想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的_id即可
MongoDB学习7:Change Strean
上图所示是一次Change Stream回调所返回的数据。没跳这样的数据都带有一个_id,这个_id可以用于断点恢复,例如:

var cs = db.collection.watch([],{resumeAfter:<_id>})

即可从上一条通知中断处继续获取后续的变更通知

7.Change Steam使用场景

  • 跨集群的变更复制--在源集群中订阅 Change Stream,一旦得到任何变更立即写入目标集群
  • 微服务联动--当一个微服务变更数据库时,其他微服务得到通知并做出相应的变更
  • 其他任何需要系统联动的场景

8.Change Steam 注意事项

  • Change Steam依赖于oplog,因此中断时间不可超过oplog回收的最大时间
  • 在执行update操作时,如果只更新了部分数据,那么Change Steam通知的也是增量部分
  • 同理,删除数据时也通知的仅是删除数据的_id

相关推荐