guangzhou



shanghai

Recent posts:
Blog index
About
RSS

深入剖析Hadoop RPC基本框架–Hadoop RPC框架与使用

June 21, 2015     Hadoop   673   

原创文章,转载请注明: 转载自工学1号馆

这篇文章简单的介绍RPC的概念以及Hadoop RPC 框架的使用方法

RPC基本概念

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

Hadoop RPC基本框架

简单来说,Hadoop RPC = 动态代理 + 定制好的二进制流。如果不关注细节,从用户的角度来看,它的结构大致像下图:

proxyhadoop1

Hadoop的RPC主要是通过Java的动态代理(Dynamic Proxy)与反射(Reflect)实现,源代码在org.apache.hadoop.ipc下,有以下几个主要类:

RPC:实现了一个简单的RPC模型

Client:RPC服务的客户端

Server:服务端的抽象类

RPC.Server:服务端的具体类

VersionedProtocol:所有的使用RPC服务的类都要实现该接口,在创建代理时,用来判断代理对象是否创建正确。

在后面的文章中将详细剖析各个类的源代码和运行机制 :mrgreen:

Hadoop RPC 的使用

Hadoop RPC 主要对外提供了2种接口:

public static VersionedProtocol getProxy/waitForProxy():构建一个客户端代理对象(该对象实现了某个协议),用于向服务器发送RPC请求

public static Server getServer():为某个协议(实际上是Java接口)实例构造一个服务器对象,用于处理客户端发送的请求

Hadoop RPC使用方法可分为以下几个步骤:

步骤1、定义RPC协议

Hadoop中所定义的RPC接口都需要继承VersionedProtocol接口,它描述了协议的版本信息

interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {
	public static final long versionID = 1L;
	String echo(String value) throws IOException;
	int add(int v1, int v2) throws IOException;
}

步骤2、实现RPC协议

Hadoop RPC协议通常是一个java接口,用户需要实现该接口

public static class ClientProtocolImpl implements ClientProtocol {

	@Override
	public long getProtocolVersion(String protocol, long clientVersion)
				throws IOException {
		return ClientProtocol.versionID;
	}
	@Override
	public String echo(String value) throws IOException {
		return value;
	}

	@Override
	public int add(int v1, int v2) throws IOException {
		return v1 + v2;
	}
}

步骤3:构造并启动RPC Server

server = RPC.getServer(new ClientProtocolImpl(), serverHost, serverPort, numHandlers, false, conf);
server.start();

serverHost:服务器的host

serverPort:服务器的监听端口号

numHandlers:服务器端请求的线程数目

步骤4:构造RPC Client,并发送RPC请求

proxy = (ClientProtocol)RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, addr, conf);
int result = proxy.add(5, 6);
String echoResult = Proxy.echo("result");

在这里可以使用java动态代理的例子来简单的理解一下:

public interface CalculatorProtocol { //定义一个接口
	public int add(int a, int b);
	public int subtract(int a, int b);
}
public class DynamicProxyExample {
	public static void main(String[] args) {

		CalculatorProtocol server = new Server();
		InvocationHandler handler = new CalculatorHandler(server);
		CalculatorProtocol client = (CalculatorProtocol)Proxy.newProxyInstance(server.getClass().getClassLoader(), 
				server.getClass().getInterfaces(), handler);
		
		int r = client.add(3, 5);
		System.out.println("3 + 5 = " + r);
		r = client.subtract(7, 2);
		System.out.println("7 - 2 = " + r);
				

	}
}

class Server implements CalculatorProtocol {  //实现接口协议
	@Override
	public int add(int a, int b) {
		return a + b;
	}

	@Override
	public int subtract(int a, int b) {
		return a - b;
	}
}

class CalculatorHandler implements InvocationHandler {  //实现调用处理器接口
	private Object objOriginal;

	public CalculatorHandler(Object obj) {
		this.objOriginal = obj;
	}

	@Override
	public Object invoke(Object proxy, Method method, Object[] args)
			throws Throwable {
    //可添加一些预处理
		Object result = method.invoke(this.objOriginal, args);
    //可添加一些后续处理
		return result;
	}
}

 

如果文章对您有帮助,欢迎点击下方按钮打赏作者

Comments

No comments yet.
To verify that you are human, please fill in "七"(required)