idwtwt 2018-08-02
在protobuf序列化的前面,加上一个自定义的头,这个头包含序列化的长度和它的类型。在解压的时候根据包头来反序列化。
假设socket上要传输2个类型的数据,股票行情信息和期权行情信息:
股票的.proto定义:
syntax = "proto3"; package test.model.protobuf; option java_package = "test.model.protobuf"; message StockTick { string stockId = 1; int price = 2; }
期权的.proto定义:
syntax = "proto3"; package test.model.protobuf; option java_package = "test.model.protobuf"; message OptionTick { string optionId = 1; string securityId = 2; int price = 3; }
netty4官方事实上已经实现了protobuf的编解码的插件,但是只能用于传输单一类型的protobuf序列化。我这里截取一段netty代码,熟悉netty的同学马上就能理解它的作用:
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(StockTickOuterClass.StockTick.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new CustomProtoServerHandler()); }
看以上代码高亮部分,netty4官方的编解码器必须指定单一的protobuf类型才行。具体每个类的作用:
ProtobufEncoder:用于对Probuf类型序列化。
ProtobufVarint32LengthFieldPrepender:用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
ProtobufVarint32FrameDecoder:用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
ProtobufDecoder:反序列化指定的Probuf字节数组为protobuf类型。
我们可以参考以上官方的编解码代码,将实现我们客户化的protobuf编解码插件,但是要支持多种不同类型protobuf数据在一个socket上传输:
import com.google.protobuf.MessageLite; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * 参考ProtobufVarint32LengthFieldPrepender 和 ProtobufEncoder */ @Sharable public class CustomProtobufEncoder extends MessageToByteEncoder<MessageLite> { HangqingEncoder hangqingEncoder; public CustomProtobufEncoder(HangqingEncoder hangqingEncoder) { this.hangqingEncoder = hangqingEncoder; } @Override protected void encode( ChannelHandlerContext ctx, MessageLite msg, ByteBuf out) throws Exception { byte[] body = msg.toByteArray(); byte[] header = encodeHeader(msg, (short)body.length); out.writeBytes(header); out.writeBytes(body); return; } private byte[] encodeHeader(MessageLite msg, short bodyLength) { byte messageType = 0x0f; if (msg instanceof StockTickOuterClass.StockTick) { messageType = 0x00; } else if (msg instanceof OptionTickOuterClass.OptionTick) { messageType = 0x01; } byte[] header = new byte[4]; header[0] = (byte) (bodyLength & 0xff); header[1] = (byte) ((bodyLength >> 8) & 0xff); header[2] = 0; // 保留字段 header[3] = messageType; return header; } }
CustomProtobufEncoder序列化传入的protobuf类型,并且为它创建了一个4个字节的包头,格式如下
body长度(low) | body长度 (high) | 保留字节 | 类型 |
其中的encodeHeader方法具体的实现要根据你要传输哪些protobuf类型来修改代码,也可以稍加设计避免使用太多的if…else。
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; import com.google.protobuf.MessageLite; /** * 参考ProtobufVarint32FrameDecoder 和 ProtobufDecoder */ public class CustomProtobufDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while (in.readableBytes() > 4) { // 如果可读长度小于包头长度,退出。 in.markReaderIndex(); // 获取包头中的body长度 byte low = in.readByte(); byte high = in.readByte(); short s0 = (short) (low & 0xff); short s1 = (short) (high & 0xff); s1 <<= 8; short length = (short) (s0 | s1); // 获取包头中的protobuf类型 in.readByte(); byte dataType = in.readByte(); // 如果可读长度小于body长度,恢复读指针,退出。 if (in.readableBytes() < length) { in.resetReaderIndex(); return; } // 读取body ByteBuf bodyByteBuf = in.readBytes(length); byte[] array; int offset; int readableLen= bodyByteBuf.readableBytes(); if (bodyByteBuf.hasArray()) { array = bodyByteBuf.array(); offset = bodyByteBuf.arrayOffset() + bodyByteBuf.readerIndex(); } else { array = new byte[readableLen]; bodyByteBuf.getBytes(bodyByteBuf.readerIndex(), array, 0, readableLen); offset = 0; } //反序列化 MessageLite result = decodeBody(dataType, array, offset, readableLen); out.add(result); } } public MessageLite decodeBody(byte dataType, byte[] array, int offset, int length) throws Exception { if (dataType == 0x00) { return StockTickOuterClass.StockTick.getDefaultInstance(). getParserForType().parseFrom(array, offset, length); } else if (dataType == 0x01) { return OptionTickOuterClass.OptionTick.getDefaultInstance(). getParserForType().parseFrom(array, offset, length); } return null; // or throw exception } }
CustomProtobufDecoder实现了2个功能,1)通过包头中的长度信息来解决半包和粘包。 2)把消息body反序列化为对应的protobuf类型(根据包头中的类型信息)。
其中的decodeBody方法具体的实现要根据你要传输哪些protobuf类型来修改代码,也可以稍加设计避免使用太多的if…else。
如何把我们自定义的编解码用于netty Server:
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder",new CustomProtobufDecoder()); pipeline.addLast("encoder",new CustomProtobufEncoder()); pipeline.addLast(new CustomProtoServerHandler()); }