zhoulu00 2019-06-28
中后台仪表盘是一个非常复杂,特别是当需要全面屏运用时,数据的实时性需求非常高。WebSocket 不管在什么环境中使用其实都是非常简单,各现代浏览器实现标准都很统一,而且接口也足够简单。
即便是在 Angular 也是如此,只需要简单几行代码就能使用 WebSocket。
const ws = new WebSocket('wss://echo.websocket.org');
ws.onmessage = (e) => {
  console.log('message', e);
}若需要向服务端发送消息,则:
ws.send(`content`);
在 Angular 里绝大多数的人都会根据上述代码进一步拓展,比如统一消息解析、错误处理、多路复用等,并最终将其封装成一个服务类。
事实上,RxJS 也包裹了一个 WebSocket Subject,位于 rxjs/websocket。
假如将上面的示例使用 RxJS 来写,则:
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
const ws = webSocket('wss://echo.websocket.org');
ws.subscribe(res => {
  console.log('message', res);
});
ws.next(`content`);webSocket 是一个工厂函数,所生产出来的 WebSocketSubject 对象可被多次订阅,若未订阅或取消最后一个订阅时都会导致 WebSocket 连接中断,当再一次订阅时会重新自动连接。
webSocket 除了接收字符串(WebSocket服务远程地址)外,还允许指定更复杂的配置项。
默认情况下,消息是使用 JSON.parse 和 JSON.stringify 对消息格式序列化和反序列化操作,所以不管消息发送或接收都以 JSON 为准,可通过 serializer、deserializer 属性来改变。
若需要关心 WebSocket 什么时候开始或结束(closeObserver),则:
const open$ = new Subject();
const ws = webSocket({
  url: 'wss://echo.websocket.org',
  openObserver: open$
});
// 订阅打开事件
open$.subscribe(() => {});WebSocketSubject 也是 Subject 的变体之一,因此订阅它表示接收消息,反之则利用 next、complete、error 来维护消息的推送。
next 来发送消息complete 会尝试检测是否最后一个订阅,若是将会关闭连接error 相当于原始 close 方法且必须提供 { code: number, reason?: string} 参数,注意 code 务必遵守取值范围可被重放
调用 next 发送消息时若 WebSocket 连接中断(例如:没人订阅时),消息会被缓存当下一次重新连接以后会按顺序发送。这对于异步世界里非常方便,我们只需要确保 Angular 启动前初始化好 WebSocket 不管什么时候订阅接收消息,都可以随时发送也无须等待。
事实上这一点是 RxJS WebSocket 默认情况下是通过 webSocket 所生产的 WebSocketSubject 其本质上是 ReplaySubject 的“重放”能力。当然你可以通过 webSocket 的第二个参数改变这种行为。
一般来说我们不太可能只会一个 Web Socket 服务完成所有的事,然而也不太可能针对每一个业务实例创建一个 webSocket。往往我们会增加一层网关并将这些业务 WebSocket 进行汇总,对于前端始终只需要一个连接,这就是多路复用存在的意义。
而核心是必须要让后端知道,什么时候发送什么消息给什么样的服务。
首先必须先使用 multiplex 方法来创建 Observable 以便订阅某一路消息,它有三个参数来帮助我们区分消息:
subMsg 告知正在订阅哪一路消息unsubMsg 告知取消订阅哪一路消息messageFilter 过滤消息,使订阅者只接收哪一路消息const ws = webSocket('wss://echo.websocket.org');
const user$ = this.ws.multiplex(
    () => ({ type: 'subscribe', tag: 'user' }),
    () => ({ type: 'unsubscribe', tag: 'user' }),
    message => message.type === 'user'
);
user$.subscribe(message => console.log(message));
const todo$ = this.ws.multiplex(
    () => ({ type: 'subscribe', tag: 'todo' }),
    () => ({ type: 'unsubscribe', tag: 'todo' }),
    message => message.type === 'todo'
);
todo$.subscribe(message => console.log(message));user$ 流和 todo$ 流他们共用一个 WebSocket 连接,这便是多路复用。
虽然订阅是通过 multiplex 创建的,然后消息的推送依然还是需要使用 ws.next()。
这原本是对内部一个简单培训,然而我发现竟然极少人会讨论 RxJS 里面 Web Socket 的实现。
其实一直有想着要给 ng-alain 内置 WebSocket,只是就封装角度来讲完全没有价值,因为已经足够优雅。