1. 什么是rpc?
rpc,即 Remote Procedure Call,中文:远程过程调用。简单点说,就是跨进程、跨机器、基于网络来实现的方法调用,下面以代码来解释:
import org.junit.Test;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
/**
* @Author juwm
* rpc调用方,可采用任意支持socket编程的语言实现
*/
public class RpcClient {
@Test
public void client() {
try (Socket socket = new Socket("127.0.0.1", 6639);
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream()) {
// 目标对象 、目标方法
String protocol = "{\"method\":\"targetMethod\",\"classFullName\":\"a.b.TargetObject\"}]";
out.write(protocol.getBytes(StandardCharsets.UTF_8));
int len = 100;
byte[] c = new byte[len];
StringBuffer buffer = new StringBuffer(100);
for (; ; ) {
int read = in.read(c);
buffer.append(new String(c));
if (read != len) // 如果数据长度是len的倍数,这里不会跳出循环
break;
}
String result = buffer.substring(0, buffer.lastIndexOf("]"));
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
import com.alibaba.fastjson.JSON;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author juwm
* rpc被调方,可采用任意支持socket编程的语言实现
*/
public class RpcServer {
private static Queue<Socket> socketList = new ArrayBlockingQueue(10);
private static ThreadPoolExecutor executor =
new ThreadPoolExecutor(5,
5,
1000,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
@Test
public void server() throws IOException {
// new a ServerSocket
ServerSocket server = new ServerSocket(6639);
while (true) {
// listen connect
Socket socket = server.accept();
// connect line up
socketList.offer(socket);
// create a task to Thread Pool
dealWithConnect();
}
}
public static void dealWithConnect() {
executor.execute(() -> {
try (Socket socket = socketList.poll();
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream()) {
int len = 10;
byte[] c = new byte[len];
StringBuffer buffer = new StringBuffer(100);
for (; ; ) {
/*
* 当read()执行时如果没有数据让它读,它就会阻塞直到有数据到来,
* 这是Bio的特点。read()阻塞后它后边的代码在数据到来前都将不得执行。
* 所以如果有代码需要再read()阻塞前执行,就需要知道客户端发送的数据的长度。
* 计算出当次读取的最后一次read(),执行完最后一次read()后先执行想要执行的代码,
* 然后再read(),这样就可以在read()阻塞前把想执行的代码执行。
*/
int read = in.read(c);
buffer.append(new String(c));
if(read != len) {
// 这里判断是否为最后一次读取的方案是:
// 读取出来的数据长度是否等于数组的长度。
// 如果读出来的数据不能填满数组,
// 说明读到头了,下一次读取read()将阻塞,
// 如果有代码需要在阻塞前执行,则此时执行,
// 这种设计的bug是如果数据的长度正好是10
// 或者10的倍数则不行
break;
}
}
String context = buffer.substring(0, buffer.lastIndexOf("]"));
Map map = JSON.parseObject(context, Map.class);
// client想调用的目标对象(全类名)
Class<?> objClass = Class.forName((String) map.get("classFullName"));
// client想调用的目标方法
Method me = objClass.getMethod((String) map.get("method"), null);
// 利用反射调用目标对象的方法,从而实现跨进程的方法调用:Remote Procedure Call
String result = me.invoke(objClass.newInstance(), null) + "]";
// 返回结果给client
out.write(result.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
/**
* @Author juwm
* 目标对象
*/
public class TargetObject {
// 目标方法
public String targetMethod() {
return "hello Remote Procedure Call!";
}
}
image.png
image.png
网友评论