原创文章,转载请注明: 转载自工学1号馆
ipc.RPC类是对底层客户机/服务器网络模型的封装,本篇文章进行详细剖析
RPC类包含如下内部类:
org.apache.hadoop.ipc.RPC.Invocation
org.apache.hadoop.ipc.RPC.ClientCache
org.apache.hadoop.ipc.RPC.Invoker
org.apache.hadoop.ipc.RPC.VersionMismatch
org.apache.hadoop.ipc.RPC.Server
Invoker类
private static class Invoker implements InvocationHandler {
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
public Invoker(Class<? extends VersionedProtocol> protocol,
InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory);
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean logDebug = LOG.isDebugEnabled();
long startTime = 0;
if (logDebug) {
startTime = System.currentTimeMillis();
}
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
synchronized private void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
}
重点分析invoke方法:
ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);
call函数原型如下:
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException
此方法返回一个调用,通过Writable类型的参数param到由remoteId定义的IPC服务器,返回Writable类型,如果网络问题或是远程控制代码出问题就会抛出异常
invoke将函数调用信息打包成序列化的Invocation对象,并通过网络发送给服务器端,服务器端收到该调用信息后,解析出函数名和参数列表等信息,利用java反射机制完成函数调用
Invocation类
/** 一个方法调用,包括方法名和它包含的参数.*/
private static class Invocation implements Writable, Configurable {
private String methodName;
private Class[] parameterClasses;
private Object[] parameters;
private Configuration conf;
public Invocation() {}
public Invocation(Method method, Object[] parameters) {
this.methodName = method.getName(); //以String的形式返回类中的方法名称
this.parameterClasses = method.getParameterTypes(); //按照声明顺序返回 Class 对象的数组,这些对象
//描述了此 Method 对象所表示的方法的形参类型。
this.parameters = parameters;
}
/** 被调用函数的名字. */
public String getMethodName() { return methodName; }
/** 参数类列表. */
public Class[] getParameterClasses() { return parameterClasses; }
/** 参数实例列表. */
public Object[] getParameters() { return parameters; }
public void readFields(DataInput in) throws IOException {
methodName = UTF8.readString(in);
parameters = new Object[in.readInt()]; // 读取四个输入字节并返回一个 int 值。
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) {
parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
conf);
}
}
public String toString() { //重写toString(),以(,,,)的形式输出方法参数列表
StringBuffer buffer = new StringBuffer();
buffer.append(methodName);
buffer.append("(");
for (int i = 0; i < parameters.length; i++) {
if (i != 0)
buffer.append(", ");
buffer.append(parameters[i]);
}
buffer.append(")");
return buffer.toString();
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return this.conf;
}
}
ClientCache类
使用socket factory作为hash key来缓存Client对象
static private class ClientCache {
private Map<SocketFactory, Client> clients =
new HashMap<SocketFactory, Client>();
/**
* 当没有客户端缓存存在的时候,使用一个user-provided SocketFactory 构建&缓存一个IPC client对象.
*/
private synchronized Client getClient(Configuration conf,
SocketFactory factory) {
Client client = clients.get(factory);
if (client == null) {
client = new Client(ObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
client.incCount();
}
return client;
}
/**
* 使用默认的SocketFactory构建&缓存一个客户端IPC
* 没有客户端缓存存在.
*/
private synchronized Client getClient(Configuration conf) {
return getClient(conf, SocketFactory.getDefault());
}
/**
* 关闭一个 RPC 客户端连接
* 一个 RPC 客户端被关闭当且仅当它的引用数变为0.
*/
private void stopClient(Client client) {
synchronized (this) {
client.decCount();
if (client.isZeroReference()) {
clients.remove(client.getSocketFactory());
}
}
if (client.isZeroReference()) {
client.stop();
}
}
}
private static ClientCache CLIENTS=new ClientCache();
//仅仅用来进行单元测试
static Client getClient(Configuration conf) {
return CLIENTS.getClient(conf);
}
Server类
该类的主要作用是构建一个RPC Server
/** An RPC Server. */
public static class Server extends org.apache.hadoop.ipc.Server {
private Object instance;
private boolean verbose;
/** 构建一个 RPC server.
* @param instance 那个方法被调用的实例
* @param conf 使用的配置
* @param bindAddress 被用来绑定监听连接的地址
* @param port 监听连接的端口
*/
public Server(Object instance, Configuration conf, String bindAddress, int port)
throws IOException {
this(instance, conf, bindAddress, port, 1, false, null);
}
private static String classNameBase(String className) {
String[] names = className.split("\\.", -1);
if (names == null || names.length == 0) {
return className;
}
return names[names.length-1];
}
/** 构建一个 RPC server.
* @param instance 那个被调用方法的实例
* @param conf 使用的配置
* @param bindAddress 被用来绑定监听连接的地址
* @param port 监听连接的端口
* @param numHandlers 处理方法运行线程的数量
* @param verbose 是否每一个调用都需要日志记录
*/
public Server(Object instance, Configuration conf, String bindAddress, int port,
int numHandlers, boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
super(bindAddress, port, Invocation.class, numHandlers, conf,
classNameBase(instance.getClass().getName()), secretManager);
this.instance = instance;
this.verbose = verbose;
}
public Writable call(Class<?> protocol, Writable param, long receivedTime)
throws IOException {
try {
Invocation call = (Invocation)param;
if (verbose) log("Call: " + call);
Method method =
protocol.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true); //设置类方法可访问
long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime); //计算处理时间
int qTime = (int) (startTime-receivedTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime);
}
rpcMetrics.addRpcQueueTime(qTime);
rpcMetrics.addRpcProcessingTime(processingTime);
rpcMetrics.addRpcProcessingTime(call.getMethodName(), processingTime);
if (verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value);
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
throw (IOException)target;
} else {
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
throw ioe;
}
} catch (Throwable e) {
if (!(e instanceof IOException)) {
LOG.error("Unexpected throwable object ", e);
}
IOException ioe = new IOException(e.toString());
ioe.setStackTrace(e.getStackTrace());
throw ioe;
}
}
}
VersionMismatch类
该类的作用是给RPC协议设置一个失配(匹配失败)版本.
public static class VersionMismatch extends IOException {
private String interfaceName; //接口名
private long clientVersion; //客户端接口
private long serverVersion; //服务端接口
/**
* 创建一个协议失配异常
* @param interfaceName 协议失配的名字
* @param clientVersion 客户端的协议版本
* @param serverVersion 服务端的协议版本
*/
public VersionMismatch(String interfaceName, long clientVersion,
long serverVersion) {
super("Protocol " + interfaceName + " version mismatch. (client = " +
clientVersion + ", server = " + serverVersion + ")");
this.interfaceName = interfaceName;
this.clientVersion = clientVersion;
this.serverVersion = serverVersion;
}
/**
* 获取协议名Get the interface name
* @return 返回java Class名
* (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
*/
public String getInterfaceName() {
return interfaceName;
}
/**
* 获取客户端的优先版本
*/
public long getClientVersion() {
return clientVersion;
}
/**
* 获取服务端的认可版本
*/
public long getServerVersion() {
return serverVersion;
}
}

Comments