jannal 2020-01-16
前置文章:
前一篇文章相对简略地介绍了RPC
服务端的编写,而这篇博文最要介绍服务端(Client
)的实现。RPC
调用一般是面向契约编程的,而Client
的核心功能就是:把契约接口方法的调用抽象为使用Netty
向RPC
服务端通过私有协议发送一个请求。这里最底层的实现依赖于动态代理,因此动态代理是动态实现接口的最简单方式(如果字节码研究得比较深入,可以通过字节码编程实现接口)。需要的依赖如下:
JDK1.8+
Netty:4.1.44.Final
SpringBoot:2.2.2.RELEASE
一般可以通过JDK
动态代理或者Cglib
的字节码增强来实现此功能,为了简单起见,不引入额外的依赖,这里选用JDK
动态代理。这里重新搬出前面提到的契约接口HelloService
:
public interface HelloService { String sayHello(String name); }
接下来需要通过动态代理为此接口添加一个实现:
public class TestDynamicProxy { public static void main(String[] args) throws Exception { Class<HelloService> interfaceKlass = HelloService.class; InvocationHandler handler = new HelloServiceImpl(interfaceKlass); HelloService helloService = (HelloService) Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, handler); System.out.println(helloService.sayHello("throwable")); } @RequiredArgsConstructor private static class HelloServiceImpl implements InvocationHandler { private final Class<?> interfaceKlass; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 这里应该根据方法的返回值类型去决定返回结果 return String.format("[%s#%s]方法被调用,参数列表:%s", interfaceKlass.getName(), method.getName(), JSON.toJSONString(args)); } } } // 控制台输出结果 [club.throwable.contract.HelloService#sayHello]方法被调用,参数列表:["throwable"]
这里可以确认两点:
InvocationHandler
实现后会对被代理接口生成一个动态实现类。InvocationHandler
对应实例的invoke()
方法,传入的参数就是当前方法调用的元数据。Client
端需要通过动态代理为契约接口生成一个动态实现类,然后提取契约接口调用方法时候所能提供的元数据,通过这些元数据和Netty
客户端的支持(例如Netty
的Channel
)基于私有RPC
协议组装请求信息并且发送请求。这里先定义一个请求参数提取器接口RequestArgumentExtractor
:
@Data public class RequestArgumentExtractInput { private Class<?> interfaceKlass; private Method method; } @Data public class RequestArgumentExtractOutput { private String interfaceName; private String methodName; private List<String> methodArgumentSignatures; } // 接口 public interface RequestArgumentExtractor { RequestArgumentExtractOutput extract(RequestArgumentExtractInput input); }
简单实现一下,解析结果添加到缓存中,实现类DefaultRequestArgumentExtractor
代码如下:
public class DefaultRequestArgumentExtractor implements RequestArgumentExtractor { private final ConcurrentMap<CacheKey, RequestArgumentExtractOutput> cache = Maps.newConcurrentMap(); @Override public RequestArgumentExtractOutput extract(RequestArgumentExtractInput input) { Class<?> interfaceKlass = input.getInterfaceKlass(); Method method = input.getMethod(); String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); return cache.computeIfAbsent(new CacheKey(interfaceKlass.getName(), methodName, Lists.newArrayList(parameterTypes)), x -> { RequestArgumentExtractOutput output = new RequestArgumentExtractOutput(); output.setInterfaceName(interfaceKlass.getName()); List<String> methodArgumentSignatures = Lists.newArrayList(); for (Class<?> klass : parameterTypes) { methodArgumentSignatures.add(klass.getName()); } output.setMethodArgumentSignatures(methodArgumentSignatures); output.setMethodName(methodName); return output; }); } @RequiredArgsConstructor private static class CacheKey { private final String interfaceName; private final String methodName; private final List<Class<?>> parameterTypes; @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; CacheKey cacheKey = (CacheKey) o; return Objects.equals(interfaceName, cacheKey.interfaceName) && Objects.equals(methodName, cacheKey.methodName) && Objects.equals(parameterTypes, cacheKey.parameterTypes); } @Override public int hashCode() { return Objects.hash(interfaceName, methodName, parameterTypes); } } }
在不考虑重连、断连等情况下,新增一个类ClientChannelHolder
用于保存Netty
客户端的Channel
实例:
public class ClientChannelHolder { public static final AtomicReference<Channel> CHANNEL_REFERENCE = new AtomicReference<>(); }
接着新增一个契约动态代理工厂(工具类)ContractProxyFactory
,用于为契约接口生成代理类实例:
public class ContractProxyFactory { private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor(); private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap(); @SuppressWarnings("unchecked") public static <T> T ofProxy(Class<T> interfaceKlass) { // 缓存契约接口的代理类实例 return (T) CACHE.computeIfAbsent(interfaceKlass, x -> Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> { RequestArgumentExtractInput input = new RequestArgumentExtractInput(); input.setInterfaceKlass(interfaceKlass); input.setMethod(method); RequestArgumentExtractOutput output = EXTRACTOR.extract(input); // 封装请求参数 RequestMessagePacket packet = new RequestMessagePacket(); packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER); packet.setVersion(ProtocolConstant.VERSION); packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber()); packet.setMessageType(MessageType.REQUEST); packet.setInterfaceName(output.getInterfaceName()); packet.setMethodName(output.getMethodName()); packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0])); packet.setMethodArguments(args); Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get(); // 发起请求 channel.writeAndFlush(packet); // 这里方法返回值需要进行同步处理,相对复杂,后面专门开一篇文章讲解,暂时统一返回字符串 // 如果契约接口的返回值类型不是字符串,这里方法返回后会抛出异常 return String.format("[%s#%s]调用成功,发送了[%s]到NettyServer[%s]", output.getInterfaceName(), output.getMethodName(), JSON.toJSONString(packet), channel.remoteAddress()); })); } }
最后编写客户端ClientApplication
的代码:
@Slf4j public class ClientApplication { public static void main(String[] args) throws Exception { int port = 9092; EventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(workerGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X)); ch.pipeline().addLast(new ResponseMessagePacketDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception { Object targetPayload = packet.getPayload(); if (targetPayload instanceof ByteBuf) { ByteBuf byteBuf = (ByteBuf) targetPayload; int readableByteLength = byteBuf.readableBytes(); byte[] bytes = new byte[readableByteLength]; byteBuf.readBytes(bytes); targetPayload = FastJsonSerializer.X.decode(bytes, String.class); byteBuf.release(); } packet.setPayload(targetPayload); log.info("接收到来自服务端的响应消息,消息内容:{}", JSON.toJSONString(packet)); } }); } }); ChannelFuture future = bootstrap.connect("localhost", port).sync(); // 保存Channel实例,暂时不考虑断连重连 ClientChannelHolder.CHANNEL_REFERENCE.set(future.channel()); // 构造契约接口代理类实例 HelloService helloService = ContractProxyFactory.ofProxy(HelloService.class); String result = helloService.sayHello("throwable"); log.info(result); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
先启动《基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇》一文中的ServerApplication
,再启动ClientApplication
,控制台输出如下:
// 服务端日志 2020-01-16 22:34:51 [main] INFO c.throwable.server.ServerApplication - 启动NettyServer[9092]成功... 2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 服务端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)]) 2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 查找目标实现方法成功,目标类:club.throwable.server.contract.DefaultHelloService,宿主类:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello 2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1} // 客户端日志 2020-01-16 22:36:35 [main] INFO c.throwable.client.ClientApplication - [club.throwable.contract.HelloService#sayHello]调用成功,发送了[{"attachments":{},"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":["throwable"],"methodName":"sayHello","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}]到NettyServer[localhost/127.0.0.1:9092] 2020-01-16 22:36:35 [nioEventLoopGroup-2-1] INFO c.throwable.client.ClientApplication - 接收到来自服务端的响应消息,消息内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}
Client
端主要负责契约接口调用转换为发送RPC
协议请求这一步,核心技术就是动态代理,在不进行模块封装优化的前提下实现是相对简单的。这里其实Client
端还有一个比较大的技术难题没有解决,上面例子中客户端日志输出如果眼尖的伙伴会发现,Client
端发送RPC
请求的线程(main
线程)和Client
端接收Server
端RPC
响应处理的线程(nioEventLoopGroup-2-1
线程)并不相同,这一点是Netty
处理网络请求之所以能够如此高效的根源(简单来说就是请求和响应是异步的,两个流程本来是互不感知的)。但是更多情况下,我们希望外部请求是同步的,希望发送RPC
请求的线程得到响应结果再返回(这里请求和响应有可能依然是异步流程)。下一篇文章会详细分析一下如果对请求-响应做同步化处理。
Demo
项目地址:
(c-2-d e-a-20200116)