fengshantao 2019-12-22
RpcEndpointRef的具体实现类是: NettyRpcEndpointRef
每个RpcEndpoint运行时依赖的上下文环境称为 RpcEnv
PC 端点需要发送消息或者从远程 RPC 端点接收到的消息,分发至对应的指令收件箱/发件箱
// class NettyRpcEnv private[netty] def send(message: RequestMessage): Unit = { // 获取接收者地址信息 val remoteAddr = message.receiver.address if (remoteAddr == address) { // Message to a local RPC endpoint. // 把消息发送到本地的 RPC 端点 (发送到收件箱) try { dispatcher.postOneWayMessage(message) } catch { case e: RpcEnvStoppedException => logWarning(e.getMessage) } } else { // Message to a remote RPC endpoint. // 把消息发送到远程的 RPC 端点. (发送到发件箱) postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message))) } }
一个本地 RpcEndpoint 对应一个收件箱
当我们需要向一个具体的 RpcEndpoint 发送消息时,一般我们需要获取到该RpcEndpoint 的引用,然后通过该引用发送消息
对于当前 RpcEndpoint 来说,一个目标 RpcEndpoint 对应一个当前的发件箱,如果向多个目标 RpcEndpoint 发送信息,则有当前会有多个 OutBox。
当消息放入 Outbox 后,紧接着通过 TransportClient 将消息发送出去。
一个 OutBox 对应一个 TransportClient,TransportClient 不断轮询OutBox,根据 OutBox 消息的 receiver 信息,请求对应的远程 TransportServer