Hadoop RPC遠程過程調(diào)用源碼解析及實例
什么是RPC?
1、RPC(Remote Procedure Call)遠程過程調(diào)用,它允許一臺計算機程序遠程調(diào)用另外一臺計算機的子程序,而不用去關(guān)心底層的網(wǎng)絡(luò)通信細節(jié),對我們來說是透明的。經(jīng)常用于分布式網(wǎng)絡(luò)通信中。
2、Hadoop的進程間交互都是通過RPC來進行的,比如Namenode與Datanode之間,Jobtracker與Tasktracker之間等。
RPC協(xié)議假定某些傳輸協(xié)議的存在,如TCP或UDP,為通信程序之間攜帶信息數(shù)據(jù)。在OSI網(wǎng)絡(luò)通信模型中, RPC跨越了傳輸層和應(yīng)用層。 RPC使得開發(fā)包括網(wǎng)絡(luò)分布式多程序在內(nèi)的應(yīng)用程序更加容易。
RPC采用客戶機/服務(wù)器模式。請求程序就是一個客戶機,而服務(wù)提供程序就是一個服務(wù)器。
首先,客戶機調(diào)用進程發(fā)送一個有進程參數(shù)的調(diào)用信息到服務(wù)進程,然后等待應(yīng)答信息,在服務(wù)器端,進程保持睡眠狀態(tài)直到調(diào)用信息的到達為止。當一個調(diào)用信息到達,服務(wù)器獲得進程參數(shù),計算結(jié)果,發(fā)送答復(fù)信息給client,然后等待下一個調(diào)用信息,最后,客戶端調(diào)用進程接收答復(fù)信息,獲得進程結(jié)果,然后調(diào)用執(zhí)行繼續(xù)進行。
RPC特點
1、透明性:遠程調(diào)用其他機器上的程序,對用戶來說就像是調(diào)用本地方法一樣。
2、高性能:RPC server能夠并發(fā)處理多個來自Client的請求(請求隊列)。3、可控性:jdk中已經(jīng)提供了一個RPC框架–RMI,但是該RPC框架過于重量級并且可控之處比較少,所以Hadoop RPC實現(xiàn)了自定義的RPC框架。
Hadoop RPC通信
1、序列化層:Client與Server端 通信傳遞的信息采用了Hadoop里提供的序列化類或自定義Writable類型。
2、函數(shù)調(diào)用層:Hadoop RPC通過動態(tài)代理以及Java反射機制實現(xiàn)函數(shù)調(diào)用。
3、網(wǎng)絡(luò)傳輸層:Hadoop RPC采用了基于TCP/IP的socket機制。
4、服務(wù)器端框架層:RPC Server利用Java NIO以及采用了事件驅(qū)動的I/O模型,提高RPC Server的并發(fā)處理能力。
Hadoop的整個體系結(jié)構(gòu)就是構(gòu)建在RPC之上(org.apache.hadoop.ipc)。
Hadoop RPC設(shè)計技術(shù)
1、動態(tài)代理
2、反射3、序列化4、非阻塞的異步IO(NIO)
動態(tài)代理
1、動態(tài)代理可以提供對另一個對象的訪問,同時隱藏實際對象的具體事實,代理對象對客戶隱藏了實際對象。
2、動態(tài)代理可以對請求進行其他的一些處理,在不允許直接訪問某些類,或需要對訪問做一些特殊處理等,這時候可以考慮使用代理。3)目前Java開發(fā)包中提供了對動態(tài)代理的支持,但現(xiàn)在只支持對接口的實現(xiàn)。相關(guān)的類與接口:java.lang.reflect.Proxy--類 java.lang.reflect.InvocationHandler--接口
動態(tài)代理創(chuàng)建對象過程:
InvocationHandler handler = new InvocationHandlerImpl(...) Proxy.newInstance(...)
具體實現(xiàn)可參考如下:
根據(jù)上圖查看hadoop2.6.0源碼
Client
Server
RPC
幾個重要的協(xié)議
ClientProtocol是客戶端(FileSystem)與NameNode通信的接口。
DatanodeProtocol是DataNode與NameNode通信的接口NamenodeProtocol是SecondaryNameNode與NameNode通信的接口。DFSClient是直接調(diào)用NameNode接口的對象。用戶代碼是通過DistributedFileSystem調(diào)用DFSClient對象,才能與NameNode打交道。
模擬Hadoop RPC通信
- package MyRPC;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.ipc.VersionedProtocol;
- public interface MyRPCProtocal extends VersionedProtocol{
- public static long versionID = 23234l;//很重要很重要,搞了一下午才解決掉。
- public Text test(Text t);
- }
- package MyRPC;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.ipc.ProtocolSignature;
- import org.apache.hadoop.ipc.RPC;
- import org.apache.hadoop.ipc.RPC.Server;
- public class RPCServer implements MyRPCProtocal{
- Server server = null;
- public RPCServer() throws IOException, InterruptedException{
- //server = RPC.getServer(this,"localhost",8888,new Configuration());
- //相對于以前的版本有略微的改動
- RPC.Builder ins = new RPC.Builder(new Configuration());
- ins.setInstance(this);
- ins.setBindAddress("localhost");
- ins.setPort(9999);
- ins.setProtocol(MyRPCProtocal.class);
- //RPC.setProtocolEngine(new Configuration(), MyRPCProtocal.class, RpcEngine.class);
- server = ins.build();//獲得一個server實例
- server.start();
- server.join();
- }
- public static void main(String[] args) throws IOException, InterruptedException {
- new RPCServer();
- }
- @Override
- public long getProtocolVersion(String protocol, long clientVersion)
- throws IOException {
- return MyRPCProtocal.versionID;
- }
- @Override
- public ProtocolSignature getProtocolSignature(String protocol,
- long clientVersion, int clientMethodsHash) throws IOException {
- return new ProtocolSignature();
- }
- @Override
- public Text test(Text t) {
- if(t.toString().equals("RPC")){
- return new Text("ok");
- }
- return new Text("false");
- }
- }
- package MyRPC;
- import java.net.InetSocketAddress;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.ipc.RPC;
- public class RPCClient {
- private MyRPCProtocal protocal;
- public RPCClient() throws Exception{
- InetSocketAddress address = new InetSocketAddress("localhost",9999);
- protocal = (MyRPCProtocal)RPC.waitForProxy
- (MyRPCProtocal.class,MyRPCProtocal.versionID, address, new Configuration());
- //RPC.setProtocolEngine(new Configuration(), MyRPCProtocal.class, RpcEngine.class);
- }
- public void call(String s){
- final Text string = protocal.test(new Text(s));
- System.out.println(string.toString());
- }
- public static void main(String[] args) throws Exception {
- RPCClient client = new RPCClient();
- client.call("RPC");
- }
- }