极品小肥羊 2015-05-19
我们在开发一些RPC调用的程序时,通常会涉及到对象的序列化/反序列化的问题,比如一个“Person”对象从Client端通过TCP方式发送到Server端;因为TCP协议(UDP等这种低级协议)只能发送字节流,所以需要应用层将Java对象序列化成字节流,数据接收端再反序列化成Java对象即可。“序列化”一定会涉及到编码(encoding,format),目前我们可选择的编码方式:
1)使用JSON,将java对象转换成JSON结构化字符串。在web应用、移动开发方面等,基于Http协议下,这是常用的,因为JSON的可读性较强。性能稍差。
2)基于XML,和JSON一样,数据在序列化成字节流之前,都转换成字符串。可读性强,性能差,异构系统、open api类型的应用中常用。
3)使用JAVA内置的编码和序列化机制,可移植性强,性能稍差。无法跨平台(语言)。
4)其他开源的序列化/反序列化框架,比如Apache Avro,Apache Thrift,这两个框架和Protobuf相比,性能非常接近,而且设计原理如出一辙;其中Avro在大数据存储(RPC数据交换,本地存储)时比较常用;Thrift的亮点在于内置了RPC机制,所以在开发一些RPC交互式应用时,Client和Server端的开发与部署都非常简单。
评价一个序列化框架的优缺点,大概有2个方面:1)结果数据大小,原则上说,序列化后的数据尺寸越小,传输效率越高。 2)结构复杂度,这会影响序列化/反序列化的效率,结构越复杂,越耗时。
Protobuf是一个高性能、易扩展的序列化框架,它的性能测试有关数据可以参看官方文档。通常在TCP Socket通讯(RPC调用)相关的应用中使用;它本身非常简单,易于开发,而且结合Netty框架可以非常便捷的实现一个RPC应用程序,同时Netty也为Protobuf解决了有关Socket通讯中“半包、粘包”等问题(反序列化时,字节成帧)。
1、安装Protobuf
从“https://developers.google.com/protocol-buffers/docs/downloads”下载安装包,windows下的使用不再赘言;在linux或者mac下,下载tar.gz的压缩包,解压后执行:
$ ./configure $ make $ make check $ make install
此后,可以通过“protoc --version”查看是否安装成功了,安装过程不需要配置环境变量。安装主要是为了能够使用命令编译proto文件,实际部署环境并不需要。
2、样例
Protobuf需要一个schema声明文件,后缀为“.proto”的文本文件,内容样例如下:
option java_package = "com.test.protobuf"; option java_outer_classname="PersonProtos"; message Person { required string name = 1; required int32 id = 2; optional string email = 3; enum PhoneType { MOBILE = 0; HOME = 1; WORK = 2; } message PhoneNumber { required string number = 1; optional PhoneType type = 2 [default = HOME]; } repeated PhoneNumber phone = 4; }
如果你曾经使用过thrift、avro,你会发现它们都需要一个类似的schema文件,只是结构规则不同罢了。特别备注:protbuf和thrift的声明文件相似度极高。
“message”表示,声明一个“类”,即java中的class。message中可以内嵌message,就像java的内部类一样。一个message有多个filed,“required string name = 1”则表示:name字段在序列化、反序列化时为第一个字段,string类型,“required”表示这个字段的值是必选;可以看出每个filed都至少有着三个部分组成,其中filed的“位置index”全局唯一。“optional”表示这个filed是可选的(允许为null)。“repeated”表示这个filed是一个集合(list)。也可以通过[default = ]为一个“optional”的filed指定默认值。
我们可以在一个.proto文件中声明多个“message”,不过大部分情况下我们把互相继承或者依赖的类写入一个.proto文件,将那些没有关联关系的类分别写入不同的文件,这样便于管理。
我们可以在.proto文件的头部声明一些额外的信息,比如“java_package”表示当“generate code”时将生成的java代码放入指定的package中。“java_outer_classname”表示生成的java类的名称。
然后执行如下命令,生成JAVA代码:
protoc --java_out=./ Persion.proto
通过“--java_out”指定生成JAVA代码保存的目录,后面紧跟“.proto”文件的路径。此后我们看到生成 了Package和一个PersonProto.java文件,我们只需要把此java文件复制到项目中即可。
3、JAVA实例
1)pom.xml
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.6.1</version> </dependency>
2)测试:
PersonProtos.Person.Builder personBuilder = PersonProtos.Person.newBuilder(); personBuilder.setEmail("[email protected]"); personBuilder.setId(1000); PersonProtos.Person.PhoneNumber.Builder phone = PersonProtos.Person.PhoneNumber.newBuilder(); phone.setNumber("18610000000"); personBuilder.setName("张三"); personBuilder.addPhones(phone); PersonProtos.Person person = personBuilder.build();
获得到person实例后,我们可以通过如下方式,将person对象序列化、反序列化。
//第一种方式 //序列化 byte[] data = person.toByteArray();//获取字节数组,适用于SOCKET或者保存在磁盘。 //反序列化 PersonProtos.Person result = PersonProtos.Person.parseFrom(data); System.out.println(result.getEmail());
这种方式,适用于很多场景,Protobuf会根据自己的encoding方式,将JAVA对象序列化成字节数组。同时Protobuf也可以从字节数组中重新decoding,得到Java新的实例。
//第二种序列化:粘包,将一个或者多个protobuf对象字节写入stream。 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); //生成一个由:[字节长度][字节数据]组成的package。特别适合RPC场景 person.writeDelimitedTo(byteArrayOutputStream); //反序列化,从steam中读取一个或者多个protobuf字节对象 ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); result = PersonProtos.Person.parseDelimitedFrom(byteArrayInputStream); System.out.println(result.getEmail());
第二种方式,是RPC调用中、Socket传输时适用,在序列化的字节数组之前,添加一个varint32的数字表示字节数组的长度;那么在反序列化时,可以通过先读取varint,然后再依次读取此长度的字节;这种方式有效的解决了socket传输时如何“拆包”“封包”的问题。在Netty中,适用了同样的技巧。
//第三种序列化,写入文件或者Socket FileOutputStream fileOutputStream = new FileOutputStream(new File("/test.dt")); person.writeTo(fileOutputStream); fileOutputStream.close(); FileInputStream fileInputStream = new FileInputStream(new File("/test.dt")); result = PersonProtos.Person.parseFrom(fileInputStream); System.out.println(result);
第三种方式,比较少用。但是比较通用,意思为将序列化的字节数组写入到OutputStream中,具体的拆包工作,交给了高层框架。
4、protobuf入门介绍
以上述Person.proto文件为例:
message Person { required string name = 1; required int32 id = 2; optional string email = 3; }
声明了三个filed,每个filed都“规则”、“类型”、“字段名称”和一个“唯一的数字tag”。
1)其中“规则”可以为如下几个值:
“required”:表示此字段值必填,一个结构良好的message至少有一个flied为“required”。
“optional”:表示此字段值为可选的。对于此类型的字段,可以通过default来指定默认值,这是一个良好的设计习惯。
optional int32 page = 3 [default = 10];
如果没有指定默认值,在encoding时protobuf将会用一个特殊的默认值来替代。对于string,默认值为空,bool类型默认为false,数字类型默认位0,对于enum则默认值为枚举列表的第一个值。
“repeated”:表示这个字段的值可以允许被重复多次,如果转换成JAVA代码,此filed数据结构为list,有序的。可以在“repeated”类型的filed后使用“packed”--压缩,提高数据传输的效率。
repeated int32 numbers = 4 [packed=true];
特别需要注意:当你指定一个filed位required时,需要慎重考虑这个filed是否永远都是“必须的”。将一个required调整为optional,需要同时重新部署数据通讯的Client和Server端,否则将会对解析带来问题。
2)可以在一个.proto文件中,同时声明多个message,这样是允许的。
3)为message或者filed添加注释,风格和JAVA一样:
optional int32 page = 3;// Which page number do we want?
4)数据类型与JAVA对应关系:
protobuf | java |
double | double |
float | float |
int32 | int |
int64 | long |
bool | boolean |
string | String |
bytes | ByteString |
其中“ByteString”是Protobuf自定义的JAVA API。
5)枚举:和JAVA中Enum API一致,如果开发者希望某个filed的值只能在一些限定的列表中,可以将次filed声明为enum类型。Protobuf中,enum类型的每个值是一个int32的数字,不像JAVA中那样enum可以定义的非常复杂。如果enum中有些值是相同的,可以将“allow_alias”设定为true。
message Person { required Type type = 1; enum Type { option allow_alias = true; TEACHER = 0; STUDENT = 1; OTHER = 1;//the same as STUDENT } }
6)import:如果当前.proto文件中引用了其他proto文件的message类型,那么可以在此文件的开头声明import。
import "other_protos.proto";
不过这会引入一个小小的麻烦,如果你的“other_protos.proto”文件变更了目录,需要连带修改其他文件。
7)嵌入message:类似于java的内部类,即在message中,嵌入其他message。如Person.proto例子中的PhoneNumber。
8)更新message类型:如果一个现有的message类型无法满足当前的需要,比如你需要新增一个filed,但是仍然希望使用生成的旧代码来解析。
(1)不要修改现有fileds的数字tag,即字段的index数字。
(2)新增字段必须为optional或者repeated类型,同时还要为它们设置“default”值,这意味着“old”代码序列化的messages能够被“new”代码解析。“new”代码生成的数据也能被“old”代码解析,对于“old”代码而言,那些没有被声明的filed将会在解析式忽略。
(3)非“required”filed可以被删除,但是它的“数字tag”不能被其他字段重用。
(4)int32、uint32、int64、uint64、bool,是互相兼容的,它们可以从一个类型修改成另外一个,而不会对程序带来错误。参见源码WireFormat.FiledType
(5)sint32和sint64是兼容的,但和其他数字类型是不兼容的。
(6)string和bytes是兼容的,只要为UTF-8编码的。注意protobuf中string默认是UTF-8编码的。
(7)optional与repeated是兼容的。如果输入的数据格式是repeated,但是client希望接受的数据是optional,对于原生类型,那么client将会使用repeated的最后一个值,对于message类型,client将会merge这些输入的数据。
(8)修改“default”值通常不会有任何问题,只要保证这个默认值不会被真正的使用。
9)Map结构:
map<key_type, value_type> map = 3;
其中key_type可以为任何“整形”或者string类型,value_type可以为任意类型,只要JAVA API能够支持。map类型不能被“repeated”、“optional”或者“required”修饰,传输过程中无法确保map中数据的顺序,
对于文本格式,map是按照key排序。
10)如下为一些有用的选项:
(1)java_package:在.proto文件的顶部设定,指定生成JAVA文件时类所在的package。
option java_package = "com.example.foo";
(2)java_outer_classname:在.proto文件的顶部设定,指定生成JAVA文件时类的名字。一个.proto文件只会生成一个JAVA类。
option java_outer_classname = "FooProtos";
(3)packed:对于repeated类型有效,指定输入的数据是否“压缩”。
5、protobuf序列化原理:
其实protobuf的序列化原理并不是什么高超的“绝技”:如果你曾经了解过thrift、avro,或者从事过socket通信,那么你对protobuf的序列化方式并不感到惊奇;如下为protobuf的序列化format:
[serializedSize]{[int32(tag,type)][value]...}
对于一个message,序列化时首先就算这个message所有filed序列化需要占用的字节长度,计算这个长度是非常简单的,因为protobuf中每种类型的filed所占用的字节数是已知的(bytes、string除外),只需要累加即可。这个长度就是serializedSize,32为integer,在protobuf的某些序列化方式中可能使用varint32(一个压缩的、根据数字区间,使用不同字节长度的int);此后是filed列表输出,每个filed输出包含int32(tag,type)和value的字节数组,从上文我们知道每个filed都有一个唯一的数字tag表示它的index位置,type为字段的类型,tag和type分别占用一个int的高位、低位字节;如果filed为string、bytes类型,还会在value之前额外的补充添加一个varint32类型的数字,表示string、bytes的字节长度。
那么在反序列化的时候,首先读取一个32为的int表示serializedSize,然后读取serializedSize个字节保存在一个bytebuffer中,即读取一个完整的package。然后读取一个int32数字,从这个数字中解析出tag和type,如果type为string、bytes,然后补充读取一个varint32就知道了string的字节长度了,此后根据type或者字节长度,读取后续的字节数组并转换成java type。重复上述操作,直到整个package解析完毕。
protobuf的这种序列化format,极大的介绍了输入、输出的数据大小,而且复杂度非常低,从而性能较高。
6、protobuf与Netty编程:
1)Netty Server端样例
public class ProtobufNettyServerTestMain { public static void main(String[] args) { //bossGroup : NIO selector threadPool EventLoopGroup bossGroup = new NioEventLoopGroup(); //workerGroup : socket data read-write worker threadPool EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(PersonProtos.Person.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new ProtobufServerHandler());//自定义handler } }).childOption(ChannelOption.TCP_NODELAY,true); System.out.println("begin"); //bind到本地的18080端口 ChannelFuture future = bootstrap.bind(18080).sync(); //阻塞,直到channel.close future.channel().closeFuture().sync(); System.out.println("end"); } catch (Exception e) { e.printStackTrace(); } finally { //辅助线程优雅退出 workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
备注:channel内部维护一个pipeline,类似一个filter链表一样,所有的socket读写都会经过,对于write操作(outbound)会从pipeline列表的last-->first方向依次调用Encoder处理器;对于read操作(inbound)会从first-->last依次调用Decoder处理器。此外Encoder处理对于read操作不起效,Decoder处理器对write操作不起效,原理 稍后在Netty相关章节介绍。
ProtobufEncoder:非常简单,内部直接使用了message.toByteArray()将字节数据放入bytebuf中输出(out中,交由下一个encoder处理)。
ProtobufVarint32LengthFieldPrepender:因为ProtobufEncoder只是将message的各个filed按照规则输出了,并没有serializedSize,所以socket无法判定package(封包)。这个Encoder的作用就是在ProtobufEncoder生成的字节数组前,prepender一个varint32数字,表示serializedSize。
ProtobufVarint32FrameDecoder:这个decoder和Prepender做的工作正好对应,作用就是“成帧”,根据seriaziedSize读取足额的字节数组--一个完整的package。
ProtobufDecoder:和ProtobufEncoder对应,这个Decoder需要指定一个默认的instance,decoder将会解析byteArray,并根据format规则为此instance中的各个filed赋值。
2)ProtobufServerHandler.java
发送Protobuf数据和接收client发送的数据。一个自定义的处理器,通常我们的业务会在这里处理。
public class ProtobufServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { PersonProtos.Person person = (PersonProtos.Person)msg; //经过pipeline的各个decoder,到此Person类型已经可以断定 System.out.println(person.getEmail()); ChannelFuture future = ctx.writeAndFlush(build()); //发送数据之后,我们手动关闭channel,这个关闭是异步的,当数据发送完毕后执行。 future.addListener(ChannelFutureListener.CLOSE); } /** * 构建一个Protobuf实例,测试 * @return */ public MessageLite build() { PersonProtos.Person.Builder personBuilder = PersonProtos.Person.newBuilder(); personBuilder.setEmail("[email protected]"); personBuilder.setId(1000); PersonProtos.Person.PhoneNumber.Builder phone = PersonProtos.Person.PhoneNumber.newBuilder(); phone.setNumber("18610000000"); personBuilder.setName("张三"); personBuilder.addPhones(phone); return personBuilder.build(); } }
3)Netty Client样例
public class ProtobufNettyClientTestMain { public static void main(String[] args) throws Exception{ EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("initChannel"); ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(PersonProtos.Person.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new ProtobufClientHandler()); } }); ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 18080)); System.out.println("begin"); future.channel().closeFuture().sync(); System.out.println("Closed"); } catch (Exception e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } }
4)ProtobufClientHandler.java
public class ProtobufClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //当channel就绪后,我们首先通过client发送一个数据。 ctx.writeAndFlush(build()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { PersonProtos.Person person = (PersonProtos.Person)msg; System.out.println(person.getEmail()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace();; ctx.close(); } public MessageLite build() { PersonProtos.Person.Builder personBuilder = PersonProtos.Person.newBuilder(); personBuilder.setEmail("[email protected]"); personBuilder.setId(1000); PersonProtos.Person.PhoneNumber.Builder phone = PersonProtos.Person.PhoneNumber.newBuilder(); phone.setNumber("18610000000"); personBuilder.setName("李四"); personBuilder.addPhones(phone); return personBuilder.build(); } }
关于Netty的相关技术,请参考其他文档。
到此为止,我们基本上对protobuf使用方式,有了初步的了解。祝大家好运!