工学1号馆

home

深入剖析Hadoop RPC基本框架–ipc.RPC类

By Wu Yudong on June 22, 2015

原创文章,转载请注明: 转载自工学1号馆

ipc.RPC类是对底层客户机/服务器网络模型的封装,本篇文章进行详细剖析

RPC类包含如下内部类:

org.apache.hadoop.ipc.RPC.Invocation

org.apache.hadoop.ipc.RPC.ClientCache

org.apache.hadoop.ipc.RPC.Invoker

org.apache.hadoop.ipc.RPC.VersionMismatch

org.apache.hadoop.ipc.RPC.Server

Invoker类

  private static class Invoker implements InvocationHandler {
    private Client.ConnectionId remoteId;
    private Client client;
    private boolean isClosed = false;

    public Invoker(Class<? extends VersionedProtocol> protocol,
        InetSocketAddress address, UserGroupInformation ticket,
        Configuration conf, SocketFactory factory,
        int rpcTimeout) throws IOException {
      this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
          ticket, rpcTimeout, conf);
      this.client = CLIENTS.getClient(conf, factory);
    }

    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }

      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }
    
    /* close the IPC client that's responsible for this invoker's RPCs */ 
    synchronized private void close() {
      if (!isClosed) {
        isClosed = true;
        CLIENTS.stopClient(client);
      }
    }
  }

重点分析invoke方法:

ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);

call函数原型如下:

public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException

此方法返回一个调用,通过Writable类型的参数param到由remoteId定义的IPC服务器,返回Writable类型,如果网络问题或是远程控制代码出问题就会抛出异常

invoke将函数调用信息打包成序列化的Invocation对象,并通过网络发送给服务器端,服务器端收到该调用信息后,解析出函数名和参数列表等信息,利用java反射机制完成函数调用

Invocation类

  /** 一个方法调用,包括方法名和它包含的参数.*/
  private static class Invocation implements Writable, Configurable {
    private String methodName;
    private Class[] parameterClasses;
    private Object[] parameters;
    private Configuration conf;

    public Invocation() {}

    public Invocation(Method method, Object[] parameters) {
      this.methodName = method.getName(); //以String的形式返回类中的方法名称
      this.parameterClasses = method.getParameterTypes();  //按照声明顺序返回 Class 对象的数组,这些对象
           //描述了此 Method 对象所表示的方法的形参类型。
      this.parameters = parameters;
    }

    /** 被调用函数的名字. */
    public String getMethodName() { return methodName; }

    /** 参数类列表. */
    public Class[] getParameterClasses() { return parameterClasses; }

    /** 参数实例列表. */
    public Object[] getParameters() { return parameters; }

    public void readFields(DataInput in) throws IOException {
      methodName = UTF8.readString(in);
      parameters = new Object[in.readInt()];  // 读取四个输入字节并返回一个 int 值。
      parameterClasses = new Class[parameters.length];
      ObjectWritable objectWritable = new ObjectWritable();
      for (int i = 0; i < parameters.length; i++) {
        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
        parameterClasses[i] = objectWritable.getDeclaredClass();
      }
    }

    public void write(DataOutput out) throws IOException {
      UTF8.writeString(out, methodName);
      out.writeInt(parameterClasses.length);
      for (int i = 0; i < parameterClasses.length; i++) {
        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
                                   conf);
      }
    }

    public String toString() {  //重写toString(),以(,,,)的形式输出方法参数列表
      StringBuffer buffer = new StringBuffer();
      buffer.append(methodName);
      buffer.append("(");
      for (int i = 0; i < parameters.length; i++) {
        if (i != 0)
          buffer.append(", ");
        buffer.append(parameters[i]);
      }
      buffer.append(")");
      return buffer.toString();
    }

    public void setConf(Configuration conf) {
      this.conf = conf;
    }

    public Configuration getConf() {
      return this.conf;
    }

  }

ClientCache类

使用socket factory作为hash key来缓存Client对象

static private class ClientCache {
    private Map<SocketFactory, Client> clients =
      new HashMap<SocketFactory, Client>();

    /**
     * 当没有客户端缓存存在的时候,使用一个user-provided SocketFactory 构建&缓存一个IPC client对象.
     */
    private synchronized Client getClient(Configuration conf,
        SocketFactory factory) {
      Client client = clients.get(factory);
      if (client == null) {
        client = new Client(ObjectWritable.class, conf, factory);
        clients.put(factory, client);
      } else {
        client.incCount();
      }
      return client;
    }

    /**
     * 使用默认的SocketFactory构建&缓存一个客户端IPC
     * 没有客户端缓存存在.
     */
    private synchronized Client getClient(Configuration conf) {
      return getClient(conf, SocketFactory.getDefault());
    }

    /**
     * 关闭一个 RPC 客户端连接 
     * 一个 RPC 客户端被关闭当且仅当它的引用数变为0.
     */
    private void stopClient(Client client) {
      synchronized (this) {
        client.decCount();
        if (client.isZeroReference()) {
          clients.remove(client.getSocketFactory());
        }
      }
      if (client.isZeroReference()) {
        client.stop();
      }
    }
  }

  private static ClientCache CLIENTS=new ClientCache();
  
  //仅仅用来进行单元测试
  static Client getClient(Configuration conf) {
    return CLIENTS.getClient(conf);
  }

Server类

该类的主要作用是构建一个RPC Server

 /** An RPC Server. */
  public static class Server extends org.apache.hadoop.ipc.Server {
    private Object instance;
    private boolean verbose;

    /** 构建一个 RPC server.
     * @param instance 那个方法被调用的实例
     * @param conf 使用的配置
     * @param bindAddress 被用来绑定监听连接的地址
     * @param port 监听连接的端口
     */
    public Server(Object instance, Configuration conf, String bindAddress, int port) 
      throws IOException {
      this(instance, conf,  bindAddress, port, 1, false, null);
    }
    
    private static String classNameBase(String className) {
      String[] names = className.split("\\.", -1);
      if (names == null || names.length == 0) {
        return className;
      }
      return names[names.length-1];
    }
    
    /** 构建一个 RPC server.
     * @param instance 那个被调用方法的实例
     * @param conf 使用的配置
     * @param bindAddress 被用来绑定监听连接的地址
     * @param port 监听连接的端口
     * @param numHandlers 处理方法运行线程的数量
     * @param verbose 是否每一个调用都需要日志记录
     */
    public Server(Object instance, Configuration conf, String bindAddress,  int port,
                  int numHandlers, boolean verbose, 
                  SecretManager<? extends TokenIdentifier> secretManager) 
        throws IOException {
      super(bindAddress, port, Invocation.class, numHandlers, conf,
          classNameBase(instance.getClass().getName()), secretManager);
      this.instance = instance;
      this.verbose = verbose;
    }

    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
    throws IOException {
      try {
        Invocation call = (Invocation)param;
        if (verbose) log("Call: " + call);

        Method method =
          protocol.getMethod(call.getMethodName(),
                                   call.getParameterClasses());
        method.setAccessible(true);   //设置类方法可访问

        long startTime = System.currentTimeMillis();
        Object value = method.invoke(instance, call.getParameters());
        int processingTime = (int) (System.currentTimeMillis() - startTime);  //计算处理时间
        int qTime = (int) (startTime-receivedTime);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Served: " + call.getMethodName() +
                    " queueTime= " + qTime +
                    " procesingTime= " + processingTime);
        }
        rpcMetrics.addRpcQueueTime(qTime);
        rpcMetrics.addRpcProcessingTime(processingTime);
        rpcMetrics.addRpcProcessingTime(call.getMethodName(), processingTime);
        if (verbose) log("Return: "+value);

        return new ObjectWritable(method.getReturnType(), value);

      } catch (InvocationTargetException e) {
        Throwable target = e.getTargetException();
        if (target instanceof IOException) {
          throw (IOException)target;
        } else {
          IOException ioe = new IOException(target.toString());
          ioe.setStackTrace(target.getStackTrace());
          throw ioe;
        }
      } catch (Throwable e) {
        if (!(e instanceof IOException)) {
          LOG.error("Unexpected throwable object ", e);
        }
        IOException ioe = new IOException(e.toString());
        ioe.setStackTrace(e.getStackTrace());
        throw ioe;
      }
    }
  }

VersionMismatch类

该类的作用是给RPC协议设置一个失配(匹配失败)版本.

public static class VersionMismatch extends IOException {
    private String interfaceName;  //接口名
    private long clientVersion;   //客户端接口
    private long serverVersion;  //服务端接口
    
    /**
     * 创建一个协议失配异常
     * @param interfaceName 协议失配的名字
     * @param clientVersion 客户端的协议版本
     * @param serverVersion 服务端的协议版本
     */
    public VersionMismatch(String interfaceName, long clientVersion,
                           long serverVersion) {
      super("Protocol " + interfaceName + " version mismatch. (client = " +
            clientVersion + ", server = " + serverVersion + ")");
      this.interfaceName = interfaceName;
      this.clientVersion = clientVersion;
      this.serverVersion = serverVersion;
    }
    
    /**
     * 获取协议名Get the interface name
     * @return 返回java Class名 
     *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
     */
    public String getInterfaceName() {
      return interfaceName;
    }
    
    /**
     * 获取客户端的优先版本
     */
    public long getClientVersion() {
      return clientVersion;
    }
    
    /**
     * 获取服务端的认可版本
     */
    public long getServerVersion() {
      return serverVersion;
    }
  }

 

如果文章对您有帮助,欢迎点击下方按钮打赏作者

Comments

No comments yet.
To verify that you are human, please fill in "七"(required)