第六章:小朱笔记hadoop之源码分析-ipc分析 第二节:Client类分析

wdeo0 2013-05-19

第六章:小朱笔记hadoop之源码分析-ipc分析

第二节:Client类分析

 Client端的底层通信直接采用了阻塞式IO编程:

/**
 * A client for an IPC service. IPC calls take a single {@link Writable} as a
 * parameter, and return a {@link Writable} as their value. A service runs on a
 * port and is defined by a parameter class and a value class.
 * Client端的底层通信直接采用了阻塞式IO编程
 * 
 * @see Server
 */
public class Client {

    public static final Log LOG = LogFactory.getLog(Client.class);
    // // 客户端维护到服务端的一组连接
    private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();

    private Class<? extends Writable> valueClass; // class of call values
    private int counter; // counter for call ids

    // 客户端进程是否在运行
    private AtomicBoolean running = new AtomicBoolean(true); // if client runs
    final private Configuration conf;

    // // Socket工厂,用来创建Socket连接
    private SocketFactory socketFactory; // how to create sockets
    private int refCount = 1;

    // // 通过配置文件读取ping间隔
    final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
    // 默认ping间隔为1分钟
    final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
    final static int PING_CALL_ID = -1;
.......

}

(1)client运行的流程;

(a)创建代理对象;
(b)代理对象调用相应方法(invoke());
(c)invoke调用client对象的call方法,向服务器发送请求(参数、方法);
(d)再等待call方法的完成;
(c)返回请求结果;

(2)client主要的内部类

(a) Call,表示一次rpc的调用请求
(b)Connection,表示一个client与server之间的连接,一个连接一个线程启动。该类是一个连接管理内部线程类,该内部类是一个连接线程,继承自Thread类。它读取每一个Call调用实例执行后从服务端返回的响应信息,并通知其他调用实例.每一个连接具有一个连接到远程主机的Socket,该Socket能够实现多路复用,使得多个调用复用该Socket,客户端收到的调用得到的响应可能是无序的
(c)ConnectionId:连接的标记(包括server地址,协议,其他一些连接的配置项信息)
(d)ParallelCall:实现并行调用的请求
(e)ParallelResults:并行调用的执行结果

(3)client一次完整请求调用过程

demo示例:

/**
 * 
 * Description: RPCserver test<br>
 * 
 * Copyright: Copyright (c) 2013 <br>
 * Company: www.renren.com
 * 
 * @author zhuhui{[email protected]} 2013-5-17
 * @version 1.0
 */
public class RPCserver {

    /**
     * @param args
     */
    public static void main(String[] args) {
        Server server;
        try {
            server = RPC.getServer(new HelloProtocalImp(), "127.0.0.1", 9813, 6, true, new Configuration());
            server.start();
            try {
                server.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

 第一步: 创建代理对象

写道
HelloProtocal rpcInterface=(HelloProtocal)RPC.getProxy(HelloProtocal.class, HelloProtocal.versionID, nameNodeAddr, new Configuration());
(1)getproxy()方法
VersionedProtocol proxy =(VersionedProtocol) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol },new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
(2)Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)方法
Invoker是一个实现了InvocationHandler 接口的类
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory); CLIENTS是客户端与服务端链接缓存

第二步:调用业务方法

private static class Invoker implements InvocationHandler {
    private Client.ConnectionId remoteId;
    private Client client;
    private boolean isClosed = false;

    public Invoker(Class<? extends VersionedProtocol> protocol,
        InetSocketAddress address, UserGroupInformation ticket,
        Configuration conf, SocketFactory factory,
        int rpcTimeout) throws IOException {
      this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
          ticket, rpcTimeout, conf);
      this.client = CLIENTS.getClient(conf, factory);
    }

    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }

      // 构造一个RPC.Invocation实例作为参数传递给调用程序,执行调用,返回值为value  
      ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);

      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }
    
    /* close the IPC client that's responsible for this invoker's RPCs */ 
    synchronized private void close() {
      if (!isClosed) {
        isClosed = true;
        CLIENTS.stopClient(client);
      }
    }
写道
ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);

       Invocation 用于封装方法名和参数,作为数据传输层。远程调用的主要关键就是Invocation实现了Writable接口,Invocation在 write(DataOutput out)函数中将调用的methodName写入到out,将调用方法的参数个数写入out ,同时逐个将参数的className写入out,最后将所有参数逐个写入out,这也就决定了通过RPC实现调用的方法中的参数要么是简单类型,要么是 String,要么是实现了Writable接口的类(参数自己知道如何序列化到stream),要么是数组(数组的元素也必须为简单类 型,String,实现了Writable接口的类)。

第三步:client对象的call方法,向服务器发送请求(参数、方法);

      客户端Client类提供的最基本的功能就是执行RPC调用,其中,提供了两种调用方式,一种就是串行单个调用,另一种就是并行调用。

这里只分析串行单个调用的实现方法call,如下所示:

/**
     * Make a call, passing <code>param</code>, to the IPC server defined by
     * <code>remoteId</code>, returning the value. Throws exceptions if there
     * are network problems or if the remote code threw an exception.
     */
    public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException {
        // // 使用请求参数值构造一个Call实例
        Call call = new Call(param);
        // // 从连接池connections中获取到一个连接(或可能创建一个新的连接)
        Connection connection = getConnection(remoteId, call);

        connection.sendParam(call); // send the parameter // 向IPC服务器发送参数
        boolean interrupted = false;
        synchronized (call) {
            while (!call.done) {
                try {
                    call.wait(); // wait for the result // 等待IPC服务器响应
                    // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程
                } catch (InterruptedException ie) {
                    // save the fact that we were interrupted
                    interrupted = true;
                }
            }

            if (interrupted) {
                // set the interrupt flag now that we are done waiting
                Thread.currentThread().interrupt();
            }

            if (call.error != null) {
                if (call.error instanceof RemoteException) {
                    call.error.fillInStackTrace();
                    throw call.error;
                } else { // local exception
                    // use the connection because it will reflect an ip change,
                    // unlike
                    // the remoteId
                    throw wrapException(connection.getRemoteAddress(), call.error);
                }
            } else {
                return call.value;// 调用返回的响应值
            }
        }
    }

 (1)获得连接对象getConnection(remoteId, call);

private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException {
        if (!running.get()) {
            // the client is stopped
            throw new IOException("The client is stopped");
        }
        Connection connection;
        /*
         * we could avoid this allocation for each RPC by having a connectionsId
         * object and with set() method. We need to manage the refs for keys in
         * HashMap properly. For now its ok.
         */
        do {
            synchronized (connections) {
                connection = connections.get(remoteId);
                if (connection == null) {
                    connection = new Connection(remoteId);
                    connections.put(remoteId, connection);
                }
            }
        } while (!connection.addCall(call));// 可见 一个connection 可以有多个调用call
        connection.setupIOstreams();// 实际进行连接 每个connection都新起一个线程

        // we don't invoke the method below inside "synchronized (connections)"
        // block above. The reason for that is if the server happens to be slow,
        // it will take longer to establish a connection and that will slow the
        // entire system down.
        return connection;
    }

 (2)实际进行连接并启动接收线程 connection.setupIOstreams();

private synchronized void setupIOstreams() throws InterruptedException {
     
        short numRetries = 0;
        final short maxRetries = 15;
        Random rand = null;
        while (true) {

          setupConnection(); //建立连接  

          InputStream inStream = NetUtils.getInputStream(socket); //获得输入流  
          OutputStream outStream = NetUtils.getOutputStream(socket); //获得输出流  
          writeRpcHeader(outStream);
          if (useSasl) {
            final InputStream in2 = inStream;
            final OutputStream out2 = outStream;
            UserGroupInformation ticket = remoteId.getTicket();
            if (authMethod == AuthMethod.KERBEROS) {
              if (ticket.getRealUser() != null) {
                ticket = ticket.getRealUser();
              }
            }
            boolean continueSasl = false;
            try { 
              continueSasl = 
                ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
                  @Override
                  public Boolean run() throws IOException {
                    return setupSaslConnection(in2, out2);
                  }
                }); 
            } catch (Exception ex) {
              if (rand == null) {
                rand = new Random();
              }
              handleSaslConnectionFailure(numRetries++, maxRetries, ex, rand,
                   ticket);
              continue;
            }
            if (continueSasl) {
              // Sasl connect is successful. Let's set up Sasl i/o streams.
              inStream = saslRpcClient.getInputStream(inStream);
              outStream = saslRpcClient.getOutputStream(outStream);
            } else {
              // fall back to simple auth because server told us so.
              authMethod = AuthMethod.SIMPLE;
              header = new ConnectionHeader(header.getProtocol(),
                  header.getUgi(), authMethod);
              useSasl = false;
            }
          }
          this.in = new DataInputStream(new BufferedInputStream
              (new PingInputStream(inStream)));//将输入流装饰成DataInputStream 
          this.out = new DataOutputStream//将输出流装饰成DataOutputStream  
          (new BufferedOutputStream(outStream));

          writeHeader();

          // update last activity time
          touch();
          //当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread  
          // start the receiver thread after the socket connection has been set up
          start();
          return;
        }
      } catch (Throwable t) {
        if (t instanceof IOException) {
          markClosed((IOException)t);
        } else {
          markClosed(new IOException("Couldn't set up IO streams", t));
        }
        close();
      }
    }

	//注意:setupConnection(); //建立连接  
       // start();  Connection继承了Tread 启动接受线程等待服务端传回数据
 
 public void run() {
      while (waitForWork()) {//wait here for work - read or close connection  
        //等待某个连接实例空闲,如果存在则唤醒它执行一些任务  
        receiveResponse();
      }
      close();
}

(3)最终建立链接 setupConnection()

  

private synchronized void setupConnection() throws IOException {
            short ioFailures = 0;
            short timeoutFailures = 0;
            while (true) {
                try {
                    this.socket = socketFactory.createSocket();
                    this.socket.setTcpNoDelay(tcpNoDelay);

                    /*
                     * Bind the socket to the host specified in the principal
                     * name of the client, to ensure Server matching address of
                     * the client connection to host name in principal passed.
                     */
                    if (UserGroupInformation.isSecurityEnabled()) {
                        KerberosInfo krbInfo = remoteId.getProtocol().getAnnotation(KerberosInfo.class);
                        if (krbInfo != null && krbInfo.clientPrincipal() != null) {
                            String host = SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName());

                            // If host name is a valid local address then bind
                            // socket to it
                            InetAddress localAddr = NetUtils.getLocalInetAddress(host);
                            if (localAddr != null) {
                                this.socket.bind(new InetSocketAddress(localAddr, 0));
                            }
                        }
                    }
                    // 设置连接超时为20s
                    // connection time out is 20s
                    NetUtils.connect(this.socket, server, 20000);
                    if (rpcTimeout > 0) {
                        pingInterval = rpcTimeout; // rpcTimeout overwrites
                                                   // pingInterval
                    }

                    this.socket.setSoTimeout(pingInterval);
                    return;
                } catch (SocketTimeoutException toe) {
                    /*
                     * Check for an address change and update the local
                     * reference. Reset the failure counter if the address was
                     * changed
                     */
                    /*
                     * 设置最多连接重试为45次。 总共有20s*45 = 15 分钟的重试时间。
                     */
                    if (updateAddress()) {
                        timeoutFailures = ioFailures = 0;
                    }
                    /*
                     * The max number of retries is 45, which amounts to 20s*45
                     * = 15 minutes retries.
                     */
                    handleConnectionFailure(timeoutFailures++, 45, toe);
                } catch (IOException ie) {
                    if (updateAddress()) {
                        timeoutFailures = ioFailures = 0;
                    }
                    handleConnectionFailure(ioFailures++, maxRetries, ie);
                }
            }
        }

 (4)发送调用参数

public void sendParam(Call call) {
            if (shouldCloseConnection.get()) {
                return;
            }

            DataOutputBuffer d = null;
            try {
                synchronized (this.out) {
                    if (LOG.isDebugEnabled())
                        LOG.debug(getName() + " sending #" + call.id);

                    // for serializing the
                    // data to be written
                    d = new DataOutputBuffer();
                    d.writeInt(call.id);
                    call.param.write(d);
                    byte[] data = d.getData();
                    int dataLength = d.getLength();

                    out.writeInt(dataLength); // 首先写出数据的长度
                    out.write(data, 0, dataLength); // 向服务端写数据
                    out.flush();
                }
            } catch (IOException e) {
                markClosed(e);
            } finally {
                // the buffer is just an in-memory buffer, but it is still
                // polite to
                // close early
                IOUtils.closeStream(d);
            }
        }

第四步 receiveResponse接收服务器返回数据

/*
         * Receive a response. Because only one receiver, so no synchronization
         * on in.
         * 接收到响应(因为每次从DataInputStream in中读取响应信息只有一个,无需同步)
         */
        private void receiveResponse() {
            if (shouldCloseConnection.get()) {
                return;
            }
            touch();

            try {
                int id = in.readInt(); // try to read an id // 阻塞读取id

                if (LOG.isDebugEnabled())
                    LOG.debug(getName() + " got value #" + id);

                Call call = calls.get(id); // 在calls池中找到发送时的那个对象

                int state = in.readInt(); // read call status // 阻塞读取call对象的状态
                if (state == Status.SUCCESS.state) {
                    Writable value = ReflectionUtils.newInstance(valueClass, conf);
                    value.readFields(in); // read value
                    // 读取数据
                    // 将读取到的值赋给call对象,同时唤醒Client等待线程
                    call.setValue(value);
                    calls.remove(id);
                } else if (state == Status.ERROR.state) {
                    call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in)));
                    calls.remove(id);
                } else if (state == Status.FATAL.state) {
                    // Close the connection
                    markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in)));
                }
            } catch (IOException e) {
                markClosed(e);
            }
        }

      Hadoop的RPC对外的接口其实是同步的,但是,RPC的内部实现其实是异步消息机制。hadoop用线程wait/notify机制 实现异步转同步,发送请求(call)之后wait请求处理完毕,接收完响应(connection.receiveResponse())之后 notify,notify()方法在call.setValue中。但现在有一个问题,一个connection有多个call。可能同时有多个 call在等待接收消息,那么是当client接收到response后,怎样确认它到底是之前哪个request的response呢?这个就是依靠的 connection中的一个HashTable<Integer, Call>了,其中的Integer是用来标识Call,这样就可以将request和response对应上了。

时序图:

 
第六章:小朱笔记hadoop之源码分析-ipc分析 第二节:Client类分析
 


第六章:小朱笔记hadoop之源码分析-ipc分析 第二节:Client类分析
 

相关推荐