wdeo0 2013-05-19
Client端的底层通信直接采用了阻塞式IO编程:
/** * A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on a * port and is defined by a parameter class and a value class. * Client端的底层通信直接采用了阻塞式IO编程 * * @see Server */ public class Client { public static final Log LOG = LogFactory.getLog(Client.class); // // 客户端维护到服务端的一组连接 private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>(); private Class<? extends Writable> valueClass; // class of call values private int counter; // counter for call ids // 客户端进程是否在运行 private AtomicBoolean running = new AtomicBoolean(true); // if client runs final private Configuration conf; // // Socket工厂,用来创建Socket连接 private SocketFactory socketFactory; // how to create sockets private int refCount = 1; // // 通过配置文件读取ping间隔 final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; // 默认ping间隔为1分钟 final static int DEFAULT_PING_INTERVAL = 60000; // 1 min final static int PING_CALL_ID = -1; ....... }
(1)client运行的流程;
(a)创建代理对象;
(b)代理对象调用相应方法(invoke());
(c)invoke调用client对象的call方法,向服务器发送请求(参数、方法);
(d)再等待call方法的完成;
(c)返回请求结果;
(2)client主要的内部类
(a) Call,表示一次rpc的调用请求
(b)Connection,表示一个client与server之间的连接,一个连接一个线程启动。该类是一个连接管理内部线程类,该内部类是一个连接线程,继承自Thread类。它读取每一个Call调用实例执行后从服务端返回的响应信息,并通知其他调用实例.每一个连接具有一个连接到远程主机的Socket,该Socket能够实现多路复用,使得多个调用复用该Socket,客户端收到的调用得到的响应可能是无序的
(c)ConnectionId:连接的标记(包括server地址,协议,其他一些连接的配置项信息)
(d)ParallelCall:实现并行调用的请求
(e)ParallelResults:并行调用的执行结果
(3)client一次完整请求调用过程
demo示例:
/** * * Description: RPCserver test<br> * * Copyright: Copyright (c) 2013 <br> * Company: www.renren.com * * @author zhuhui{[email protected]} 2013-5-17 * @version 1.0 */ public class RPCserver { /** * @param args */ public static void main(String[] args) { Server server; try { server = RPC.getServer(new HelloProtocalImp(), "127.0.0.1", 9813, 6, true, new Configuration()); server.start(); try { server.join(); } catch (InterruptedException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } } }
第一步: 创建代理对象
第二步:调用业务方法
private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; public Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf); this.client = CLIENTS.getClient(conf, factory); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } // 构造一个RPC.Invocation实例作为参数传递给调用程序,执行调用,返回值为value ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } /* close the IPC client that's responsible for this invoker's RPCs */ synchronized private void close() { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); } }
Invocation 用于封装方法名和参数,作为数据传输层。远程调用的主要关键就是Invocation实现了Writable接口,Invocation在 write(DataOutput out)函数中将调用的methodName写入到out,将调用方法的参数个数写入out ,同时逐个将参数的className写入out,最后将所有参数逐个写入out,这也就决定了通过RPC实现调用的方法中的参数要么是简单类型,要么是 String,要么是实现了Writable接口的类(参数自己知道如何序列化到stream),要么是数组(数组的元素也必须为简单类 型,String,实现了Writable接口的类)。
第三步:client对象的call方法,向服务器发送请求(参数、方法);
客户端Client类提供的最基本的功能就是执行RPC调用,其中,提供了两种调用方式,一种就是串行单个调用,另一种就是并行调用。
这里只分析串行单个调用的实现方法call,如下所示:
/** * Make a call, passing <code>param</code>, to the IPC server defined by * <code>remoteId</code>, returning the value. Throws exceptions if there * are network problems or if the remote code threw an exception. */ public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { // // 使用请求参数值构造一个Call实例 Call call = new Call(param); // // 从连接池connections中获取到一个连接(或可能创建一个新的连接) Connection connection = getConnection(remoteId, call); connection.sendParam(call); // send the parameter // 向IPC服务器发送参数 boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result // 等待IPC服务器响应 // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程 } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception // use the connection because it will reflect an ip change, // unlike // the remoteId throw wrapException(connection.getRemoteAddress(), call.error); } } else { return call.value;// 调用返回的响应值 } } }
(1)获得连接对象getConnection(remoteId, call);
private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException { if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } Connection connection; /* * we could avoid this allocation for each RPC by having a connectionsId * object and with set() method. We need to manage the refs for keys in * HashMap properly. For now its ok. */ do { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId); connections.put(remoteId, connection); } } } while (!connection.addCall(call));// 可见 一个connection 可以有多个调用call connection.setupIOstreams();// 实际进行连接 每个connection都新起一个线程 // we don't invoke the method below inside "synchronized (connections)" // block above. The reason for that is if the server happens to be slow, // it will take longer to establish a connection and that will slow the // entire system down. return connection; }
(2)实际进行连接并启动接收线程 connection.setupIOstreams();
private synchronized void setupIOstreams() throws InterruptedException { short numRetries = 0; final short maxRetries = 15; Random rand = null; while (true) { setupConnection(); //建立连接 InputStream inStream = NetUtils.getInputStream(socket); //获得输入流 OutputStream outStream = NetUtils.getOutputStream(socket); //获得输出流 writeRpcHeader(outStream); if (useSasl) { final InputStream in2 = inStream; final OutputStream out2 = outStream; UserGroupInformation ticket = remoteId.getTicket(); if (authMethod == AuthMethod.KERBEROS) { if (ticket.getRealUser() != null) { ticket = ticket.getRealUser(); } } boolean continueSasl = false; try { continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { return setupSaslConnection(in2, out2); } }); } catch (Exception ex) { if (rand == null) { rand = new Random(); } handleSaslConnectionFailure(numRetries++, maxRetries, ex, rand, ticket); continue; } if (continueSasl) { // Sasl connect is successful. Let's set up Sasl i/o streams. inStream = saslRpcClient.getInputStream(inStream); outStream = saslRpcClient.getOutputStream(outStream); } else { // fall back to simple auth because server told us so. authMethod = AuthMethod.SIMPLE; header = new ConnectionHeader(header.getProtocol(), header.getUgi(), authMethod); useSasl = false; } } this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(inStream)));//将输入流装饰成DataInputStream this.out = new DataOutputStream//将输出流装饰成DataOutputStream (new BufferedOutputStream(outStream)); writeHeader(); // update last activity time touch(); //当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread // start the receiver thread after the socket connection has been set up start(); return; } } catch (Throwable t) { if (t instanceof IOException) { markClosed((IOException)t); } else { markClosed(new IOException("Couldn't set up IO streams", t)); } close(); } } //注意:setupConnection(); //建立连接 // start(); Connection继承了Tread 启动接受线程等待服务端传回数据 public void run() { while (waitForWork()) {//wait here for work - read or close connection //等待某个连接实例空闲,如果存在则唤醒它执行一些任务 receiveResponse(); } close(); }
(3)最终建立链接 setupConnection()
private synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; while (true) { try { this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); /* * Bind the socket to the host specified in the principal * name of the client, to ensure Server matching address of * the client connection to host name in principal passed. */ if (UserGroupInformation.isSecurityEnabled()) { KerberosInfo krbInfo = remoteId.getProtocol().getAnnotation(KerberosInfo.class); if (krbInfo != null && krbInfo.clientPrincipal() != null) { String host = SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName()); // If host name is a valid local address then bind // socket to it InetAddress localAddr = NetUtils.getLocalInetAddress(host); if (localAddr != null) { this.socket.bind(new InetSocketAddress(localAddr, 0)); } } } // 设置连接超时为20s // connection time out is 20s NetUtils.connect(this.socket, server, 20000); if (rpcTimeout > 0) { pingInterval = rpcTimeout; // rpcTimeout overwrites // pingInterval } this.socket.setSoTimeout(pingInterval); return; } catch (SocketTimeoutException toe) { /* * Check for an address change and update the local * reference. Reset the failure counter if the address was * changed */ /* * 设置最多连接重试为45次。 总共有20s*45 = 15 分钟的重试时间。 */ if (updateAddress()) { timeoutFailures = ioFailures = 0; } /* * The max number of retries is 45, which amounts to 20s*45 * = 15 minutes retries. */ handleConnectionFailure(timeoutFailures++, 45, toe); } catch (IOException ie) { if (updateAddress()) { timeoutFailures = ioFailures = 0; } handleConnectionFailure(ioFailures++, maxRetries, ie); } } }
(4)发送调用参数
public void sendParam(Call call) { if (shouldCloseConnection.get()) { return; } DataOutputBuffer d = null; try { synchronized (this.out) { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); // for serializing the // data to be written d = new DataOutputBuffer(); d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); // 首先写出数据的长度 out.write(data, 0, dataLength); // 向服务端写数据 out.flush(); } } catch (IOException e) { markClosed(e); } finally { // the buffer is just an in-memory buffer, but it is still // polite to // close early IOUtils.closeStream(d); } }
第四步 receiveResponse接收服务器返回数据
/* * Receive a response. Because only one receiver, so no synchronization * on in. * 接收到响应(因为每次从DataInputStream in中读取响应信息只有一个,无需同步) */ private void receiveResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { int id = in.readInt(); // try to read an id // 阻塞读取id if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); Call call = calls.get(id); // 在calls池中找到发送时的那个对象 int state = in.readInt(); // read call status // 阻塞读取call对象的状态 if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value // 读取数据 // 将读取到的值赋给call对象,同时唤醒Client等待线程 call.setValue(value); calls.remove(id); } else if (state == Status.ERROR.state) { call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); calls.remove(id); } else if (state == Status.FATAL.state) { // Close the connection markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); } } catch (IOException e) { markClosed(e); } }
Hadoop的RPC对外的接口其实是同步的,但是,RPC的内部实现其实是异步消息机制。hadoop用线程wait/notify机制 实现异步转同步,发送请求(call)之后wait请求处理完毕,接收完响应(connection.receiveResponse())之后 notify,notify()方法在call.setValue中。但现在有一个问题,一个connection有多个call。可能同时有多个 call在等待接收消息,那么是当client接收到response后,怎样确认它到底是之前哪个request的response呢?这个就是依靠的 connection中的一个HashTable<Integer, Call>了,其中的Integer是用来标识Call,这样就可以将request和response对应上了。
时序图: