DGenerationX 2014-07-02
mina是对nio的具体实现。是目前比较高效和流行的nio框架了。
下面是对使用mina进行通讯的一个简单demo,后面再用mina写一个RPC的简单框架。
mina主要包括:
(使用的mina版本为2.0.0.M4core,具体可见官方网站)
mina也分服务端和客户端(这是肯定的...)
其中服务端为:NioSocketAcceptor
客户端为:NioSocketConnector
类似于Socket的服务端跟客户端Socket。除了这些用来基本通讯的之外,还有一些可以用来处理通讯中的操作类。就是在客户端和服务端的一个个filter。这些filter可以用来进行解码,编码,可以配置日志信息,可以设定序列化类型,等等。
另外为客户端和服务端都可以绑定一个IoHnadler,用来处理连接session在打开,收到信息,关闭等状态时候可以进行的动作。
现在就来使用mina进行一个简单的客户端上传文件的demo的实现:
demo实现的思想为:
客户端跟服务端建立起来连接,客户端每次想服务端传输一定大小的文件内容。(byte的方式),然后服务端接收这些byte,将其output出来,形成文件。客户端发送完毕后,传递一个完毕的标志,这里可以传个字符串”finish“,然后服务器收到这个结束标志,在写文件结束后,再传输个成功的标志给客户端,(字符串”success“)然后客户端关闭连接。
服务端:
代码比较简单。
import java.net.InetSocketAddress; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class Main { private static final int PORT = 8080; public static void main(String[] args) throws Exception { //服务端的实例 NioSocketAcceptor accept=new NioSocketAcceptor(); //添加filter,codec为序列化方式。这里为对象序列化方式,即表示传递的是对象。 accept.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); //添加filter,日志信息 accept.getFilterChain().addLast("logging", new LoggingFilter()); //设置服务端的handler accept.setHandler(new FileUploadHandler()); //绑定ip accept.bind(new InetSocketAddress(PORT)); System.out.println("upload server started."); } }
就这样简单的完成了服务端的实现,其实可以复杂一些,其中的log跟code都可以根据需要自己来写和生成,这里只使用mina自带的。
然后是服务端的handler。
import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; public class FileUploadHandler extends IoHandlerAdapter { private BufferedOutputStream out; private int count; private String fileName = "D:/log/test.jpg"; private static final Log log = LogFactory.getLog(FileUploadHandler.class); public void sessionOpened(IoSession session) throws Exception { System.out.println("server open"); } public void exceptionCaught(IoSession session, Throwable cause) throws Exception { System.out.println("exception"); session.close(true); super.exceptionCaught(session, cause); } public void messageReceived(IoSession session, Object message) { System.out.println("server received"); try { if (message instanceof FileUploadRequest) { //FileUploadRequest 为传递过程中使用的DO。 FileUploadRequest request = (FileUploadRequest) message; System.out.println(request.getFilename()); if (out == null) { //新建一个文件输入对象BufferedOutputStream,随便定义新文件的位置 out = new BufferedOutputStream(new FileOutputStream( "D:/log/" + request.getFilename())); out.write(request.getFileContent()); } else { out.write(request.getFileContent()); } count += request.getFileContent().length; } else if (message instanceof String) { if (((String)message).equals("finish")) { System.out.println("size is"+count); //这里是进行文件传输后,要进行flush和close否则传递的文件不完整。 out.flush(); out.close(); //回执客户端信息,上传文件成功 session.write("success"); } } } catch (Exception e) { e.printStackTrace(); } } public void sessionClosed(IoSession session) throws Exception { System.out.println("server session close"); } }
所有的handler都要继承IoHandlerAdapter,可以查看IoHandlerAdapter,其包括几个关于session状态的方法。按需进行重载即可。
然后就是公用的用来传输的DO:FileUploadRequest简单的POJO
import java.io.Serializable; public class FileUploadRequest implements Serializable { private String hostname; private String filename; private byte[] fileContent; public String getHostname() { return hostname; } public void setHostname(String hostname) { this.hostname = hostname; } public String getFilename() { return filename; } public void setFilename(String filename) { this.filename = filename; } public byte[] getFileContent() { return fileContent; } public void setFileContent(byte[] fileContent) { this.fileContent = fileContent; } }
接下来看下客户端的实现,也很简单:
import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; import nio.upload.server.FileUploadRequest; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class MainClient { private static final int PORT = 8080; /** * @param args * @throws IOException */ public static void main(String[] args) throws Exception { //客户端的实现 NioSocketConnector connector = new NioSocketConnector(); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); connector.getFilterChain().addLast("logging", new LoggingFilter()); FileUploadClientHandler h = new FileUploadClientHandler(); connector.setHandler(h); //本句需要加上,否则无法调用下面的readFuture来从session中读取到服务端返回的信息。 connector.getSessionConfig().setUseReadOperation(true); ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", PORT)); IoSession session; //等待连接成功 cf.awaitUninterruptibly(); session = cf.getSession(); System.out.println("client send begin"); //传递文件开始 String fileName = "test.jpg"; FileInputStream fis = new FileInputStream(new File(fileName)); byte[] a = new byte[1024 * 4]; FileUploadRequest request = new FileUploadRequest(); request.setFilename(fileName); request.setHostname("localhost"); while (fis.read(a, 0, a.length) != -1) { request.setFileContent(a); //像session中写入信息供服务端获得 session.write(request); } //发送完成的标志 session.write(new String("finish")); System.out.println("client send finished and wait success"); //接上面来取得服务端的信息 Object result = session.read().awaitUninterruptibly().getMessage(); if (result.equals("success")) { System.out.println("success!"); //关闭客户端 connector.dispose(); } } }
客户端handler的实现。
import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; public class FileUploadClientHandler extends IoHandlerAdapter { public void sessionOpened(IoSession session) throws Exception { System.out.println("client open"); } public void sessionClosed(IoSession session) throws Exception { System.out.println("client session close"); } public void messageReceived(IoSession session, Object message) throws Exception { System.out.println("thr result is" + message); } }
心得:
注意其中关于文件的操作,outPutStream在传输完之后要进行flush跟close,因此需要客户端给一个结束的标志,才能进行判断。mina的结构也比较清晰,深层次的以后再研究。
PS:
有个小小疑问:在客户端对session不停的write之后,服务端在接收的时候是能保证按照我写入的顺序来读取的吗。估计这其中的传递用的是一个能同步的队列。这样才能保证客户端写的顺序跟服务端读的顺序一致。