定义RPC协议
IProxyProtocol类
public interface IProxyProtocol extends VersionedProtocol {
static final long versionID= 1L;
int Add(int number1, int number2);
}
- Hadoop中所有自定义RPC接口都需要继承VersionedProtocol接口,它描述了协议的版本信息
- 默认情况下,不同版本号的RPC Client和Server之间不能互相通信,因此客户端和服务端通过版本号标识
实现RPC协议
MyProxy类
public class MyProxy implements IProxyProtocol{
@Override
public int Add(int number1, int number2) {
System.out.println("被调用了");
int result = number1 + number2;
return result;
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
System.out.println("MyProxy.ProtocolVersion=" + IProxyProtocol.versionID);
return IProxyProtocol.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
return null;
}
}
构造RPC Server并启动服务
MyServer类
public class MyServer {
public static int PORT = 5433;
public static String IPAddress = "127.0.0.1";
public static void main(String[] args) throws IOException {
MyProxy proxy = new MyProxy();
Configuration conf = new Configuration();
Server server = new RPC.Builder(conf).setProtocol(IProxyProtocol.class)
.setInstance(new MyProxy()).setBindAddress(IPAddress).setPort(PORT)
.build();
server.start();
}
}
构造RPC Client并启动客户端
MyClient类
public class MyClient {
public static void main(String[] args) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(MyServer.IPAddress, MyServer.PORT);
try {
IProxyProtocol proxy = RPC.waitForProxy(IProxyProtocol.class, IProxyProtocol.versionID, inetSocketAddress,
new Configuration());
int result = proxy.Add(10, 25);
System.out.println("10+25=" + result);
RPC.stopProxy(proxy);
} catch (IOException e) {
e.printStackTrace();
}
}
}
网友评论