fengshantao 2019-12-22
Spark针对每个节点(Client、Master、Worker)都称之为一个RpcEndpoint,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则内部调用Dispatcher的对应方法
说明:
RpcEndpointRef的具体实现类是: NettyRpcEndpointRef
每个RpcEndpoint运行时依赖的上下文环境称为 RpcEnv
PC 端点需要发送消息或者从远程 RPC 端点接收到的消息,分发至对应的指令收件箱/发件箱
// class NettyRpcEnv private[netty] def send(message: RequestMessage): Unit = { // 获取接收者地址信息 val remoteAddr = message.receiver.address if (remoteAddr == address) { // Message to a local RPC endpoint. // 把消息发送到本地的 RPC 端点 (发送到收件箱) try { dispatcher.postOneWayMessage(message) } catch { case e: RpcEnvStoppedException => logWarning(e.getMessage) } } else { // Message to a remote RPC endpoint. // 把消息发送到远程的 RPC 端点. (发送到发件箱) postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message))) } }
一个本地 RpcEndpoint 对应一个收件箱
当我们需要向一个具体的 RpcEndpoint 发送消息时,一般我们需要获取到该RpcEndpoint 的引用,然后通过该引用发送消息
对于当前 RpcEndpoint 来说,一个目标 RpcEndpoint 对应一个当前的发件箱,如果向多个目标 RpcEndpoint 发送信息,则有当前会有多个 OutBox。
当消息放入 Outbox 后,紧接着通过 TransportClient 将消息发送出去。
消息放入发件箱以及发送过程是在同一个线程中进行
一个 OutBox 对应一个 TransportClient,TransportClient 不断轮询OutBox,根据 OutBox 消息的 receiver 信息,请求对应的远程 TransportServer