原创文章,转载请注明: 转载自工学1号馆
这篇文章简单的介绍RPC的概念以及Hadoop RPC 框架的使用方法
RPC基本概念
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
Hadoop RPC基本框架
简单来说,Hadoop RPC = 动态代理 + 定制好的二进制流。如果不关注细节,从用户的角度来看,它的结构大致像下图:
Hadoop的RPC主要是通过Java的动态代理(Dynamic Proxy)与反射(Reflect)实现,源代码在org.apache.hadoop.ipc下,有以下几个主要类:
RPC:实现了一个简单的RPC模型
Client:RPC服务的客户端
Server:服务端的抽象类
RPC.Server:服务端的具体类
VersionedProtocol:所有的使用RPC服务的类都要实现该接口,在创建代理时,用来判断代理对象是否创建正确。
在后面的文章中将详细剖析各个类的源代码和运行机制
Hadoop RPC 的使用
Hadoop RPC 主要对外提供了2种接口:
public static VersionedProtocol getProxy/waitForProxy():构建一个客户端代理对象(该对象实现了某个协议),用于向服务器发送RPC请求
public static Server getServer():为某个协议(实际上是Java接口)实例构造一个服务器对象,用于处理客户端发送的请求
Hadoop RPC使用方法可分为以下几个步骤:
步骤1、定义RPC协议
Hadoop中所定义的RPC接口都需要继承VersionedProtocol接口,它描述了协议的版本信息
interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol { public static final long versionID = 1L; String echo(String value) throws IOException; int add(int v1, int v2) throws IOException; }
步骤2、实现RPC协议
Hadoop RPC协议通常是一个java接口,用户需要实现该接口
public static class ClientProtocolImpl implements ClientProtocol { @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return ClientProtocol.versionID; } @Override public String echo(String value) throws IOException { return value; } @Override public int add(int v1, int v2) throws IOException { return v1 + v2; } }
步骤3:构造并启动RPC Server
server = RPC.getServer(new ClientProtocolImpl(), serverHost, serverPort, numHandlers, false, conf); server.start();
serverHost:服务器的host
serverPort:服务器的监听端口号
numHandlers:服务器端请求的线程数目
步骤4:构造RPC Client,并发送RPC请求
proxy = (ClientProtocol)RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, addr, conf); int result = proxy.add(5, 6); String echoResult = Proxy.echo("result");
在这里可以使用java动态代理的例子来简单的理解一下:
public interface CalculatorProtocol { //定义一个接口 public int add(int a, int b); public int subtract(int a, int b); } public class DynamicProxyExample { public static void main(String[] args) { CalculatorProtocol server = new Server(); InvocationHandler handler = new CalculatorHandler(server); CalculatorProtocol client = (CalculatorProtocol)Proxy.newProxyInstance(server.getClass().getClassLoader(), server.getClass().getInterfaces(), handler); int r = client.add(3, 5); System.out.println("3 + 5 = " + r); r = client.subtract(7, 2); System.out.println("7 - 2 = " + r); } } class Server implements CalculatorProtocol { //实现接口协议 @Override public int add(int a, int b) { return a + b; } @Override public int subtract(int a, int b) { return a - b; } } class CalculatorHandler implements InvocationHandler { //实现调用处理器接口 private Object objOriginal; public CalculatorHandler(Object obj) { this.objOriginal = obj; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //可添加一些预处理 Object result = method.invoke(this.objOriginal, args); //可添加一些后续处理 return result; } }
Comments