在RPC框架中,服务暴露的api接口,并不会包含真实的业务逻辑,业务逻辑都在服务提供方应用里,但我们通过调用接口方法,确实拿到了想要的结果,是不是感觉有点神奇呢?
这里面用到的核心技术就是前面说的动态代理。RPC 会自动给接口生成一个代理类,当我们在项目中注入接口的时候,运行过程中实际绑定的是这个接口生成的代理类。这样在接口方法被调用的时候,它实际上是被生成代理类拦截到了,这样我们就可以在生成的代理类里面,加入远程调用逻辑。

既然动态代理是一种具体的技术框架,那就会涉及到选型。我们可以从这样三个角度去考虑:
因为代理类是在运行中生成的,那么代理框架生成代理类的速度、生成代理类的字节码大小等等,都会影响到其性能——生成的字节码越小,运行所占资源就越小。
还有就是我们生成的代理类,是用于接口方法请求拦截的,所以每次调用接口方法的时候,都会执行生成的代理类,这时生成的代理类的执行效率就需要很高效。
最后一个是从我们的使用角度出发的,我们肯定希望选择一个使用起来很方便的代理类框架,比如我们可以考虑:API 设计是否好理解、社区活跃度、还有就是依赖复杂度等等。
我们可以用Socket简单模拟连接请求
服务端:ServerSocket server = new ServerSocket(port)创建连接端口,server.accept();阻塞等待请求new ObjectInputStream(socket.getInputStream());new ObjectOutputStream(socket.getOutputStream())建立对象通讯;然后将ObjectInputStream::readObject读取到的数据通过反射到真正的方法进行执行,得到结构后ObjectOutputStream::writeObject输出回客户端
public void export(Object service, int port) throws Exception {
if (service == null) {
throw new IllegalArgumentException("service instance == null");
}
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port " + port);
}
ServerSocket server = new ServerSocket(port);
while (true) {
final Socket socket = server.accept();
new Thread(() -> {
try {
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
Method method = service.getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(service, arguments);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
input.close();
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
客户端:返回一个代理类实例,利用回调的方法,让方法执行的时候走通讯逻辑,同理有着socket ,input,output。
public <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
if (interfaceClass == null)
throw new IllegalArgumentException("Interface class == null");
if (! interfaceClass.isInterface())
throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
if (host == null || host.length() == 0)
throw new IllegalArgumentException("Host == null!");
if (port <= 0 || port > 65535)
throw new IllegalArgumentException("Invalid port " + port);
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[] {interfaceClass},
(proxy, method, arguments) -> {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(arguments);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
});
}
Proxy.newProxyInstance不理解的话可以看看文章:对Proxy.newProxyInstance的一些理解
回调函数简单理解就是返回一个对象,在接下来的逻辑中才真正执行对象逻辑;在上面的例子就是丰富原先方法的功能(加上连接处理与数据返回)。
客户端和服务端的分别调用
public static void main(String[] args) throws Exception {
RpcClient client = new RpcClient();
CalculatorService service = client.refer(CalculatorService.class, "127.0.0.1", 1234);
int result = service.add(2, 4);
System.out.println("result:" + result);
}
public static void main(String[] args) throws Exception {
CalculatorService service = new CalculatorServiceImpl();
RpcServer server = new RpcServer();
server.export(service, 1234);
}
之前使用过回调函数用户数据修复,由于生产环境和测试环境数据不同,修复逻辑也不同。为了复用代码和增强可读性,也用了回调函数。简单记录下用法。
创建函数式接口(用于回调)
@FunctionalInterface
interface CollectData{
void collectUserData(A xxx, B xxx, C xxx);
}
@Override
public void run(ApplicationArguments args) throws Exception {
if(redisTemplate.hasKey(key)) return;
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(this::repairProMediaData);
}
编写测试和生产环境各种的修复数据准备
public void repairProMediaData(){
CollectData collectData = (xxx, xxx, xxx) -> {
//生产环境逻辑 xxx
};
this.repairMediaData(collectData, false);
}
public void repairFatMediaData(){
CollectData collectData = (xxx, xxx, xxx) -> {
//测试环境逻辑 xxx
this.repairMediaData(collectData, true);
}
数据修复流程,并保留方法方便写单元测试
/**
* @Description: 修复cos, mysql以及cos中数据
* @Param: collectData 对用户当前文件夹中的数据进行收集
* @Param: flag 如果当前文件夹属于用户,是否对文件夹中的数据进行判断;生产环境无需判断(视频功能还没上线,文件夹中的数据不会错乱);
测试环境需要判断
* @Author: xxx
* @Date: 2021/1/26
*/
@VisibleForTesting
protected void repairMediaData(CollectData collectData, Boolean flag){
log.info("数据修复开始");
//xxx
this.deleteErrorFolder(mediaStroe, userData, collectData, flag);
//xxx
log.info("数据修复结束");
redisTemplate.opsForValue().set(key, 1);
}
修复数据逻辑
/**
* @Description: cos删除多余文件夹并返回当前用户的空间与文件数目
* @Param: [mediaStroe, userId]
* @return: java.lang.Long
* @Author: xxx
* @Date: 2021/1/25
*/
@VisibleForTesting
@Transactional
protected UserData deleteErrorFolder(MediaStroe mediaStroe, UserData userData, CollectData collectData, Boolean flag){
//xxx
collectData.collectUserData(userData, files, allRemoveFiles);
//xxx
}
网友评论