原创文章,转载请注明: 转载自工学1号馆
private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; public Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf); this.client = CLIENTS.getClient(conf, factory); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } /* close the IPC client that's responsible for this invoker's RPCs */ synchronized private void close() { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); } } }
ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException
/** 一个方法调用,包括方法名和它包含的参数.*/ private static class Invocation implements Writable, Configurable { private String methodName; private Class[] parameterClasses; private Object[] parameters; private Configuration conf; public Invocation() {} public Invocation(Method method, Object[] parameters) { this.methodName = method.getName(); //以String的形式返回类中的方法名称 this.parameterClasses = method.getParameterTypes(); //按照声明顺序返回 Class 对象的数组,这些对象 //描述了此 Method 对象所表示的方法的形参类型。 this.parameters = parameters; } /** 被调用函数的名字. */ public String getMethodName() { return methodName; } /** 参数类列表. */ public Class[] getParameterClasses() { return parameterClasses; } /** 参数实例列表. */ public Object[] getParameters() { return parameters; } public void readFields(DataInput in) throws IOException { methodName = UTF8.readString(in); parameters = new Object[in.readInt()]; // 读取四个输入字节并返回一个 int 值。 parameterClasses = new Class[parameters.length]; ObjectWritable objectWritable = new ObjectWritable(); for (int i = 0; i < parameters.length; i++) { parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf); parameterClasses[i] = objectWritable.getDeclaredClass(); } } public void write(DataOutput out) throws IOException { UTF8.writeString(out, methodName); out.writeInt(parameterClasses.length); for (int i = 0; i < parameterClasses.length; i++) { ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], conf); } } public String toString() { //重写toString(),以(,,,)的形式输出方法参数列表 StringBuffer buffer = new StringBuffer(); buffer.append(methodName); buffer.append("("); for (int i = 0; i < parameters.length; i++) { if (i != 0) buffer.append(", "); buffer.append(parameters[i]); } buffer.append(")"); return buffer.toString(); } public void setConf(Configuration conf) { this.conf = conf; } public Configuration getConf() { return this.conf; } }
使用socket factory作为hash key来缓存Client对象
static private class ClientCache { private Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>(); /** * 当没有客户端缓存存在的时候,使用一个user-provided SocketFactory 构建&缓存一个IPC client对象. */ private synchronized Client getClient(Configuration conf, SocketFactory factory) { Client client = clients.get(factory); if (client == null) { client = new Client(ObjectWritable.class, conf, factory); clients.put(factory, client); } else { client.incCount(); } return client; } /** * 使用默认的SocketFactory构建&缓存一个客户端IPC * 没有客户端缓存存在. */ private synchronized Client getClient(Configuration conf) { return getClient(conf, SocketFactory.getDefault()); } /** * 关闭一个 RPC 客户端连接 * 一个 RPC 客户端被关闭当且仅当它的引用数变为0. */ private void stopClient(Client client) { synchronized (this) { client.decCount(); if (client.isZeroReference()) { clients.remove(client.getSocketFactory()); } } if (client.isZeroReference()) { client.stop(); } } } private static ClientCache CLIENTS=new ClientCache(); //仅仅用来进行单元测试 static Client getClient(Configuration conf) { return CLIENTS.getClient(conf); }
该类的主要作用是构建一个RPC Server
/** An RPC Server. */ public static class Server extends org.apache.hadoop.ipc.Server { private Object instance; private boolean verbose; /** 构建一个 RPC server. * @param instance 那个方法被调用的实例 * @param conf 使用的配置 * @param bindAddress 被用来绑定监听连接的地址 * @param port 监听连接的端口 */ public Server(Object instance, Configuration conf, String bindAddress, int port) throws IOException { this(instance, conf, bindAddress, port, 1, false, null); } private static String classNameBase(String className) { String[] names = className.split("\\.", -1); if (names == null || names.length == 0) { return className; } return names[names.length-1]; } /** 构建一个 RPC server. * @param instance 那个被调用方法的实例 * @param conf 使用的配置 * @param bindAddress 被用来绑定监听连接的地址 * @param port 监听连接的端口 * @param numHandlers 处理方法运行线程的数量 * @param verbose 是否每一个调用都需要日志记录 */ public Server(Object instance, Configuration conf, String bindAddress, int port, int numHandlers, boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()), secretManager); this.instance = instance; this.verbose = verbose; } public Writable call(Class<?> protocol, Writable param, long receivedTime) throws IOException { try { Invocation call = (Invocation)param; if (verbose) log("Call: " + call); Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); //设置类方法可访问 long startTime = System.currentTimeMillis(); Object value = method.invoke(instance, call.getParameters()); int processingTime = (int) (System.currentTimeMillis() - startTime); //计算处理时间 int qTime = (int) (startTime-receivedTime); if (LOG.isDebugEnabled()) { LOG.debug("Served: " + call.getMethodName() + " queueTime= " + qTime + " procesingTime= " + processingTime); } rpcMetrics.addRpcQueueTime(qTime); rpcMetrics.addRpcProcessingTime(processingTime); rpcMetrics.addRpcProcessingTime(call.getMethodName(), processingTime); if (verbose) log("Return: "+value); return new ObjectWritable(method.getReturnType(), value); } catch (InvocationTargetException e) { Throwable target = e.getTargetException(); if (target instanceof IOException) { throw (IOException)target; } else { IOException ioe = new IOException(target.toString()); ioe.setStackTrace(target.getStackTrace()); throw ioe; } } catch (Throwable e) { if (!(e instanceof IOException)) { LOG.error("Unexpected throwable object ", e); } IOException ioe = new IOException(e.toString()); ioe.setStackTrace(e.getStackTrace()); throw ioe; } } }
public static class VersionMismatch extends IOException { private String interfaceName; //接口名 private long clientVersion; //客户端接口 private long serverVersion; //服务端接口 /** * 创建一个协议失配异常 * @param interfaceName 协议失配的名字 * @param clientVersion 客户端的协议版本 * @param serverVersion 服务端的协议版本 */ public VersionMismatch(String interfaceName, long clientVersion, long serverVersion) { super("Protocol " + interfaceName + " version mismatch. (client = " + clientVersion + ", server = " + serverVersion + ")"); this.interfaceName = interfaceName; this.clientVersion = clientVersion; this.serverVersion = serverVersion; } /** * 获取协议名Get the interface name * @return 返回java Class名 * (eg. org.apache.hadoop.mapred.InterTrackerProtocol) */ public String getInterfaceName() { return interfaceName; } /** * 获取客户端的优先版本 */ public long getClientVersion() { return clientVersion; } /** * 获取服务端的认可版本 */ public long getServerVersion() { return serverVersion; } }