本文主要描述了hadoop rpc服务端的初始化和调用过程,相比客户端的初始化,rpc服务端感觉会简单点,但是调用过程却比客户端复杂一些。本文还是以namenode为例,namenode会在执行main方法的时候,创建一个namenode实例,及完成一系列的初始化过程,其中就包括了rpc的初始化过程。
rpc服务端的初始化
上面已经提到我们这里主要借用了namenode的远程服务,先来看看相关代码:
public class NameNode implements NameNodeStatusMXBean {public static void main(String argv[]) throws Exception { NameNode namenode = createNameNode(argv, null);} protected NameNode(Configuration conf, NamenodeRole role)throws IOException { initialize(conf);}protected void initialize(Configuration conf) throws IOException { rpcServer = createRpcServer(conf); startCommonServices(conf); //相当重要}protected NameNodeRpcServer createRpcServer(Configuration conf)throws IOException { return new NameNodeRpcServer(conf, this); }}
我们的linux的终端执行hadoop的启动命令的时候,最终的命令是调用NameNode的main方法,所以我们追踪代码的切入点是NameNode的main方法,方法比较简单,就是调用NameNode的构造函数创建一个NameNode,然后执行初始化方法initialize,这个方法相对来说,是我们关注的重点,包括rpc服务在内的初始化操作都放在这个方法里面。特定于rpc,他执行了两个相关的方法createRpcServer和startCommonServices,第一个方法见名思意,不多说,先简单介绍下后面的方法,该方法的作用就是启动namenode的rpc服务,稍后我给出代码。好的,从上面的代码可以看到,我们的rpcServer功能都放在了类NameNodeRpcServer里面,现在让我们来看看这个类里面相关的代码:
class NameNodeRpcServer implements NamenodeProtocols {public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this); BlockingService clientNNPbService = ClientNamenodeProtocol. newReflectiveBlockingService(clientProtocolServerTranslator); InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); // fs.defaultFS String bindHost = nn.getRpcServerBindHost(conf); if (bindHost == null) { bindHost = rpcAddr.getHostName(); } LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort()); this.clientRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService).setBindAddress(bindHost) .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()).build(); // Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, clientRpcServer); }}
在NameNodeRpcServer的构造函数里面最重要的一件事情是实例化clientRpcServer,这里面我最想说明的是,NameNode宣称自己实现了三个协议:ClientProtocol、DatanodeProtocol和NamenodeProtocol,在服务端的实现基本上就靠ClientNamenodeProtocolServerSideTranslatorPB之类的类型了,特别在实例化ClientNamenodeProtocolServerSideTranslatorPB的时候有传入一个形参,这个形参就是NameNodeRpcServer实例,看代码:
public ClientNamenodeProtocolServerSideTranslatorPB(ClientProtocol server) throws IOException { this.server = server; } @Override public GetBlockLocationsResponseProto getBlockLocations( RpcController controller, GetBlockLocationsRequestProto req) throws ServiceException { try { LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(), req.getLength()); Builder builder = GetBlockLocationsResponseProto .newBuilder(); if (b != null) { builder.setLocations(PBHelper.convert(b)).build(); } return builder.build(); } catch (IOException e) { throw new ServiceException(e); } }
上面代码中的getBlockLocations也一定程度上说明了刚才的观点。
现在让我们回过头看看NameNode中initialize方法中执行的startCommonServices方法,这个方法用来启动clientRpcServer下面的线程,包括listener,handler、response,具体看代码:
public class NameNode implements NameNodeStatusMXBean {private void startCommonServices(Configuration conf) throws IOException { rpcServer.start();}}class NameNodeRpcServer implements NamenodeProtocols { void start() { clientRpcServer.start(); if (serviceRpcServer != null) { serviceRpcServer.start(); } }}public abstract class Server { public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }}
代码看到这里,启动过程中rpc相关的代码就结束了。
rpc服务端的调用过程
现在让我们来看看rpc被调用的过程,先来认识下Server的关键结构:
public abstract class Server { private Listener listener = null; private Responder responder = null; private Handler[] handlers = null; private class Responder extends Thread { } private class Listener extends Thread { } private class Handler extends Thread { }}
在初始化的时候,就启动listener、responder和handlers下面的所有线程。
其中listener线程里面启动了一个socker服务,专门用来接受客户端的请求,handler下面的线程用来处理具体的请求,responder写请求结果,具体过程可以看下下面的代码:
public abstract class Server { private Listener listener = null; private Responder responder = null; private Handler[] handlers = null; private class Listener extends Thread {public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) { Reader reader = new Reader( "Socket Reader #" + (i + 1) + " for port " + port); readers[i] = reader; reader.start(); } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }public void run() { while (running) { doAccept(key); }}void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { Reader reader = getReader(); Connection c = connectionManager.register(channel); key.attach(c); // so closeCurrentConnection can get the object reader.addConnection(c); }private class Reader extends Thread { public void run() { doRunLoop(); } private synchronized void doRunLoop() { while (running) { Connection conn = pendingConnections.take(); conn.channel.register(readSelector, SelectionKey.OP_READ, conn); } readSelector.select(); doRead(key); } void doRead(SelectionKey key) throws InterruptedException { Connection c = (Connection)key.attachment(); count = c.readAndProcess(); }} } public class Connection {public int readAndProcess(){ processOneRpc(data.array());}private void processOneRpc(byte[] buf){ processRpcRequest(header, dis);}private void processRpcRequest(RpcRequestHeaderProto header, DataInputStream dis) throws WrappedRpcServerException, InterruptedException { Call call = new Call(header.getCallId(), header.getRetryCount(), rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header .getClientId().toByteArray()); callQueue.put(call);} } private class Handler extends Thread {public void run() { final Call call = callQueue.take(); value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp); setupResponse(buf, call, returnStatus, detailedErr, value, errorClass, error); responder.doRespond(call);} } private class Responder extends Thread {void doRespond(Call call) throws IOException { processResponse(call.connection.responseQueue, true);}private boolean processResponse(LinkedListresponseQueue, boolean inHandler) throws IOException { int numBytes = channelWrite(channel, call.rpcResponse); done = true;} }}
这里给出了一个比较完整版Server的rpc调用过程,从listener都构造函数开始,在他的构造函数中起了几个reader线程,当监听器收到访问请求的时候,由reader请请求中读取数据,reader中实际上调用的是connection的readAndProcess方法,在这个方法中,会往RPC server中的callQueue添加call对象,之后,handler这个家伙从队列中取出当前call,具体的处理过程,用到了Server类的call方法,这地方有些玄机,仔细跟过代码的人才知道,因为server的实例类不再是org.apache.hadoop.ipc.Server,而是Protobuf的一个实现类,org.apache.hadoop.ipc.RPC.Server,而且call方法是被重写过的,代码如下:
@Override public Writable call(RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime) throws Exception { return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime); }
继续追踪下,差不多就可以到底了:
public class ProtobufRpcEngine implements RpcEngine {public static class Server extends RPC.Server { static class ProtoBufRpcInvoker implements RpcInvoker { public Writable call(RPC.Server server, String protocol, Writable writableRequest, long receiveTime) throws Exception { ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,clientVersion); BlockingService service = (BlockingService) protocolImpl.protocolImpl; result = service.callBlockingMethod(methodDescriptor, null, param); return new RpcResponseWrapper(result); }}}
这部分的代码也正是hadoop rpc与protobuf结合的地方,这地方在补充一点,protbufImpl就是NameNodeRpcServer初始化的时候,已经准备了,而且看懂ProtoBufRpcInvoker下的call方法,确实也是需要结合NameNodeRpcServer初始化过程来理解的。我朦朦胧胧的懂了。而且这地方的深入会让你看到一些本质的东西,举例的话,你会跟踪到ClientNamenodeProtocolServerSideTranslatorPB,然后是NameNodeRpcServer,再然后是FSNamesystem,最后你发现,服务端对文件系统的操作出自FSNamesystem。
继续回到handler中的run方法,call方法调用完了,就轮到Responder处理返回结果了。
整个过程就是这样了,需要说明点,上面写都东西有些可以确认没问题了,有些是个人结合书的一些总结,不一定对,仅供参考。