仅实现最基本的功能
RPC原理

实现步骤
1. 本地代理存根
使用cglib获取代理Service
- 在本地获取Service
OrderService orderService = ServiceFactory.create(OrderService.class, "http://localhost:8080/");
Order order = orderService.findByOrderNo("110");
log.info("rpc result:{}", order);
- 获取Service的实现
public class ServiceFactory {
static {
ParserConfig.getGlobalInstance().addAccept("org.example");
}
public static <T> T create(Class<T> serviceClass, String host) {
ServiceInterceptor interceptor = new ServiceInterceptor(serviceClass, host);
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(serviceClass);
enhancer.setCallback(interceptor);
return (T) enhancer.create();
}
}
public class ServiceInterceptor implements MethodInterceptor {
public static final MediaType JSON_TYPE = MediaType.get("application/json; charset=utf-8");
private Class serviceClass;
private String host;
public ServiceInterceptor(Class serviceClass, String host) {
this.serviceClass = serviceClass;
this.host = host;
}
@Override
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
String serviceClassName = serviceClass.getName();
String methodName = method.getName();
RpcRequest rpcRequest = RpcRequest.builder().serviceClass(serviceClassName).methodName(methodName)
.params(objects).build();
String reqJson = JSON.toJSONString(rpcRequest);
OkHttpClient client = new OkHttpClient();
final Request request = new Request.Builder()
.url(host)
.post(RequestBody.create(JSON_TYPE, reqJson))
.build();
String respJson = Objects.requireNonNull(client.newCall(request).execute().body()).string();
RpcResponse rpcResponse = JSON.parseObject(respJson, RpcResponse.class);
if (!rpcResponse.isSuccess()) {
throw rpcResponse.getException();
}
return JSON.parse(rpcResponse.getData().toString());
}
}
2.本地序列化、反序列化
使用Json序列化
- 使用fastjson
RpcRequest rpcRequest = RpcRequest.builder().serviceClass(serviceClassName).methodName(methodName)
.params(objects).build();
String reqJson = JSON.toJSONString(rpcRequest);
...
RpcResponse rpcResponse = JSON.parseObject(respJson, RpcResponse.class);
3.网络通信
使用http协议
- 使用了okhttp
OkHttpClient client = new OkHttpClient();
final Request request = new Request.Builder()
.url(host)
.post(RequestBody.create(JSON_TYPE, reqJson))
.build();
String respJson = Objects.requireNonNull(client.newCall(request).execute().body()).string();
4.远程序列化、反序列化
同样使用Json序列化
- 利用spring框架直接使用对象操作
@PostMapping("/")
public RpcResponse invoke(@RequestBody RpcRequest request) {
return invoker.invoke(request);
}
5. 远程服务存根
利用spring的beanFactory找到Service实现
- 实现ApplicationContextAware接口
@Setter
public class ServiceInvoker implements ApplicationContextAware {
private ApplicationContext applicationContext;
public RpcResponse invoke(RpcRequest rpcRequest) {
try {
Class<?> serviceClass = Class.forName(rpcRequest.getServiceClass());
Method method = Arrays.stream(serviceClass.getMethods()).filter(m -> m.getName()
.equals(rpcRequest.getMethodName())).findFirst().get();
final Object service = applicationContext.getBean(serviceClass);
Object data = method.invoke(service, rpcRequest.getParams());
return RpcResponse.builder().data(JSON.toJSONString(data, SerializerFeature.WriteClassName)).success(true).build();
} catch (Exception e) {
return RpcResponse.builder().exception(e).success(true).build();
}
}
}
6. 调用实际业务服务
通过反射调用
- 根据参数中的serviceName,methodName反射得到对应的class和method对象
Class<?> serviceClass = Class.forName(rpcRequest.getServiceClass());
Method method = Arrays.stream(serviceClass.getMethods()).filter(m -> m.getName()
.equals(rpcRequest.getMethodName())).findFirst().get();
final Object service = applicationContext.getBean(serviceClass);
Object data = method.invoke(service, rpcRequest.getParams());
7. 原路返回服务结果
- 重要:序列化后要包含类的信息,让调用端可以正确反序列化
RpcResponse.builder().data(JSON.toJSONString(data, SerializerFeature.WriteClassName)).success(true).build();
8. 返回给本地调用方
思考
上述仅仅实现了rpc的基本功能,在此之上,还可以实现一些扩展功能,比如负载均衡、路由、容错等功能。对于基本功能,也有很多可以优化的地方,比如服务注册发现,网络通信优化等等。
网友评论