上篇文章我们写了一个最简单的RPC,我们今天把这个RPC改造一下。
我们想着能不能通过注解直接发布服务呢?
我们首先定义一个注解
@Target(ElementType.TYPE) //类/接口
@Retention(RetentionPolicy.RUNTIME)
@Component //被spring进行扫描?
public @interface RpcService {
Class<?> value(); //拿到服务的接口
}
然后实现的效果就是在服务的实现上加一个注解就可以实现服务的发布。
@RpcService(value = IHelloService.class")
public class HelloServiceImpl implements IHelloService{
@Override
public String sayHello(String content) {
System.out.println("request in sayHello:"+content);
return "request in sayHello:"+content;
}
}
首先我们引入spring 的依赖,定义一个MyRpcServer类。实现ApplicationContextAware接口的setApplicationContext方法和实现InitializingBean接口的afterPropertiesSet。
InitializingBean接口为bean提供了属性初始化后的处理方法,它只包括afterPropertiesSet方法,凡是继承该接口的类,在bean的属性初始化后都会执行该方法。
ApplicationContextAware可以获取到applicationContext并做一些响应的处理。
我们希望bean加载后就直接执行发布操作。我们把原来的发布操作直接复制过来。
@Component
public class MyRpcServer implements ApplicationContextAware, InitializingBean {
private ExecutorService executorService = Executors.newCachedThreadPool();
private int port;
public MyRpcServer(int port) {
this.port = port;
}
@Override
public void afterPropertiesSet() {
try (ServerSocket serverSocket = new ServerSocket(port)) {
while (true) {// 不断接受请求
Socket socket = serverSocket.accept();// BIO
// 每一个socket 交给一个processorHandler来处理
executorService.execute(new ProcessorHandler(socket, service));
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
}
}
这里executorService.execute(new ProcessorHandler(socket, service))
就报错了这里的service没有,我们这里也不能写死,现在和spring整合起来了,我们再看看我们的setApplicationContext方法,我们有了applicationContext是不是就可以拿到所有的加了注解的bean,那么我们再把这个bean发布出去不就好了。
@Component
public class MyRpcServer implements ApplicationContextAware, InitializingBean {
ExecutorService executorService = Executors.newCachedThreadPool();
private Map<String, Object> handlerMap = new HashMap<>();
private int port;
public MyRpcServer(int port) {
this.port = port;
}
@Override
public void afterPropertiesSet() {
try (ServerSocket serverSocket = new ServerSocket(port)) {
while (true) {//不断接受请求
Socket socket = serverSocket.accept();//BIO
//每一个socket 交给一个processorHandler来处理
executorService.execute(new ProcessorHandler(socket, handlerMap));
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class);
if (!serviceBeanMap.isEmpty()) {
for (Object servcieBean : serviceBeanMap.values()) {
//拿到注解
RpcService rpcService = servcieBean.getClass().getAnnotation((RpcService.class));
String serviceName = rpcService.value().getName();//拿到接口类定义
handlerMap.put(serviceName, servcieBean);
}
}
}
}
对应的我们的handle的代码就相应的改为下面的。
public class ProcessorHandler implements Runnable {
private Socket socket;
private Map<String, Object> handlerMap;
public ProcessorHandler(Socket socket, Map<String, Object> handlerMap) {
this.socket = socket;
this.handlerMap = handlerMap;
}
@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
//输入流中应该有什么东西?
//请求哪个类,方法名称、参数
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = invoke(rpcRequest); //反射调用本地服务
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (IOException |
ClassNotFoundException |
NoSuchMethodException |
IllegalAccessException |
InvocationTargetException e) {
e.printStackTrace();
}
}
private Object invoke(RpcRequest request) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//反射调用
String serviceName = request.getClassName();
Object service = handlerMap.get(serviceName);
if (service == null) {
throw new RuntimeException("service not found:" + serviceName);
}
Object[] args = request.getParameters(); //拿到客户端请求的参数
Method method = null;
if (args != null) {
Class<?>[] types = new Class[args.length]; //获得每个参数的类型
for (int i = 0; i < args.length; i++) {
types[i] = args[i].getClass();
}
Class clazz = Class.forName(request.getClassName()); //跟去请求的类进行加载
method = clazz.getMethod(request.getMethodName(), types); //sayHello, saveUser找到这个类中的方法
} else {
Class clazz = Class.forName(request.getClassName()); //跟去请求的类进行加载
method = clazz.getMethod(request.getMethodName()); //sayHello找到这个类中的方法
}
return method.invoke(service, args);
}
}
启动类就改为
public class App {
public static void main(String[] args) {
/* IHelloService helloService=new HelloServiceImpl();
RpcProxyServer proxyServer=new RpcProxyServer();
proxyServer.publisher(helloService,8080);*/
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(SpringConfig.class);
context.start();
}
}
还有一个配置类,配置服务发布端口。
@Configuration
@ComponentScan(basePackages = "com.yhx.vip")
public class SpringConfig {
@Bean(name = "myRpcServer")
public MyRpcServer myRpcServer() {
return new MyRpcServer(8080);
}
}
类似的我们再将客户端改造一下
public class App {
public static void main(String[] args) {
/*
* RpcProxyClient rpcProxyClient=new RpcProxyClient();
*
* IHelloService iHelloService=rpcProxyClient.clientProxy
* (IHelloService.class,"localhost",8080);
*
* String result=iHelloService.sayHello("Mic"); System.out.println(result);
*/
ApplicationContext context = new AnnotationConfigApplicationContext(SpringConfig.class);
RpcProxyClient rpcProxyClient = context.getBean(RpcProxyClient.class);
}
}
我们可以拓展一下给我们的Rpc服务增加一个版本号。
@Target(ElementType.TYPE) //类/接口
@Retention(RetentionPolicy.RUNTIME)
@Component //被spring进行扫描?
public @interface RpcService {
Class<?> value(); //拿到服务的接口
/**
* 版本号
*/
String version() default "";
}
我们在加注解的时候就除了有类信息另外在加上版本号信息。
@RpcService(value = IHelloService.class,version = "v1.0")
public class HelloServiceImpl implements IHelloService{
@Override
public String sayHello(String content) {
System.out.println("【V1.0】request in sayHello:"+content);
return "【V1.0】Say Hello:"+content;
}
}
同时我们的handle在做判断时加上一个一个版本号的校验。
public class ProcessorHandler implements Runnable {
private Socket socket;
private Map<String, Object> handlerMap;
public ProcessorHandler(Socket socket, Map<String, Object> handlerMap) {
this.socket = socket;
this.handlerMap = handlerMap;
}
@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
//输入流中应该有什么东西?
//请求哪个类,方法名称、参数
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = invoke(rpcRequest); //反射调用本地服务
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (IOException |
ClassNotFoundException |
NoSuchMethodException |
IllegalAccessException |
InvocationTargetException e) {
e.printStackTrace();
}
}
private Object invoke(RpcRequest request) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//反射调用
String serviceName = request.getClassName();
String version = request.getVersion();
//增加版本号的判断
if (!StringUtils.isEmpty(version)) {
serviceName += "-" + version;
}
Object service = handlerMap.get(serviceName);
if (service == null) {
throw new RuntimeException("service not found:" + serviceName);
}
Object[] args = request.getParameters(); //拿到客户端请求的参数
Method method = null;
if (args != null) {
Class<?>[] types = new Class[args.length]; //获得每个参数的类型
for (int i = 0; i < args.length; i++) {
types[i] = args[i].getClass();
}
Class clazz = Class.forName(request.getClassName()); //跟去请求的类进行加载
method = clazz.getMethod(request.getMethodName(), types); //sayHello, saveUser找到这个类中的方法
} else {
Class clazz = Class.forName(request.getClassName()); //跟去请求的类进行加载
method = clazz.getMethod(request.getMethodName());
}
return method.invoke(service, args);
}
}
下篇我们将用使用zookeeper作为注册中心来改造这个RPC。
网友评论