by shihang.mai
1. 第一版
完整代码:
public class MyRPCTest {
@Test
public void statServer(){
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = boss;
ServerBootstrap sbs = new ServerBootstrap();
ChannelFuture bind = sbs.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("server accept client port:"+ch.remoteAddress().getPort());
ChannelPipeline p = ch.pipeline();
p.addLast(new ServerRequestHandler());
}
}).bind(new InetSocketAddress(9090));
try {
bind.sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//模拟consumer
@Test
public void get(){
new Thread(()->{
statServer();
}).start();
System.out.println("server started...");
AtomicInteger num = new AtomicInteger(0);
int size = 20;
Thread[] threads = new Thread[size];
for (int i = 0; i < size; i++) {
threads[i]= new Thread(()->{
Car car = proxyGet(Car.class);
car.ooxx("hello"+num.incrementAndGet());
});
}
for (Thread thread:threads) {
thread.start();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
public static <T>T proxyGet(Class<T> interfaceInfo){
//实现各个版本的动态代理
ClassLoader loader = interfaceInfo.getClassLoader();
Class<?>[] methodInfo = {interfaceInfo};
return (T)Proxy.newProxyInstance(loader, methodInfo, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String name = interfaceInfo.getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
MyContent content = new MyContent();
content.setArgs(args);
content.setName(name);
content.setMethodName(methodName);
content.setParameterTypes(parameterTypes);
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(out);
oout.writeObject(content);
byte[] msgBody = out.toByteArray();
Myheader header=createHeader(msgBody);
out.reset();
oout=new ObjectOutputStream(out);
oout.writeObject(header);
byte[] msgHeader = out.toByteArray();
ClientFactory factory = ClientFactory.getFactory();
NioSocketChannel clientChannel = factory.getClient(new InetSocketAddress(9090));
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeader.length + msgBody.length);
long id = header.getRequestID();
CountDownLatch countDownLatch = new CountDownLatch(1);
ResponseHander.addCallBack(id, new Runnable() {
@Override
public void run() {
countDownLatch.countDown();
}
});
byteBuf.writeBytes(msgHeader);
byteBuf.writeBytes(msgBody);
ChannelFuture channelFuture = clientChannel.writeAndFlush(byteBuf);
channelFuture.sync();
countDownLatch.await();
return null;
}
});
}
public static Myheader createHeader(byte[] msg) {
Myheader header = new Myheader();
int size = msg.length;
int f = 0x14141414;
//0x14 0001 0100
long requestID = Math.abs(UUID.randomUUID().getLeastSignificantBits());
header.setFlag(f);
header.setDataLen(size);
header.setRequestID(requestID);
return header;
}
}
class ServerRequestHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
ByteBuf sendBuf = buf.copy();
System.out.println("channel start:"+buf.readableBytes());
if(buf.readableBytes()>=107){
byte[] bytes = new byte[107];
buf.getBytes(buf.readerIndex(),bytes);
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream oin = new ObjectInputStream(in);
Myheader header = (Myheader) oin.readObject();
System.out.println("server response id:"+header.getRequestID());
if(buf.readableBytes()>=header.getDataLen()){
byte [] data = new byte[(int)header.getDataLen()];
buf.readBytes(data);
ByteArrayInputStream din = new ByteArrayInputStream(data);
ObjectInputStream doin = new ObjectInputStream(din);
MyContent content= (MyContent)doin.readObject();
System.out.println(content.getName());
}else{
System.out.println("channel else:"+buf.readableBytes());
}
}
ChannelFuture channelFuture = ctx.writeAndFlush(sendBuf);
channelFuture.sync();
}
}
class ResponseHander{
static ConcurrentHashMap<Long,Runnable> mapping = new ConcurrentHashMap<>();
public static void addCallBack(long requestID,Runnable cb){
mapping.putIfAbsent(requestID,cb);
}
public static void runCallBack(long requestID){
Runnable runnable = mapping.get(requestID);
runnable.run();
removeCB(requestID);
}
public static void removeCB(long requestID){
mapping.remove(requestID);
}
}
interface Car{
public void ooxx(String msg);
}
interface Fly{
void xxoo(String msg);
}
class MyContent implements Serializable{
String name;
String methodName;
Class<?>[] parameterTypes ;
Object[] args;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
}
class Myheader implements Serializable{
//32bit可以设置很多信息
int flag;
long requestID;
long dataLen;
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
public long getRequestID() {
return requestID;
}
public void setRequestID(long requestID) {
this.requestID = requestID;
}
public long getDataLen() {
return dataLen;
}
public void setDataLen(long dataLen) {
this.dataLen = dataLen;
}
}
class ClientFactory{
private ClientFactory(){}
int poolSize = 1;
NioEventLoopGroup clientWorker;
Random rand = new Random();
private static final ClientFactory factory;
static{
factory = new ClientFactory();
}
public static ClientFactory getFactory(){
return factory;
}
ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();
public synchronized NioSocketChannel getClient(InetSocketAddress address){
ClientPool clientPool = outboxs.get(address);
if(null==clientPool){
outboxs.putIfAbsent(address,new ClientPool(poolSize));
clientPool=outboxs.get(address);
}
int i = rand.nextInt(poolSize);
if(null!=clientPool.Clients[i]&&clientPool.Clients[i].isActive()){
return clientPool.Clients[I];
}
synchronized (clientPool.lock[I]){
return clientPool.Clients[i] = create(address);
}
}
private NioSocketChannel create(InetSocketAddress address){
clientWorker = new NioEventLoopGroup(1);
Bootstrap bs = new Bootstrap();
ChannelFuture connect = bs.group(clientWorker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ClientResponses());
}
}).connect(address);
try {
NioSocketChannel client =(NioSocketChannel) connect.sync().channel();
return client;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
class ClientResponses extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
if(buf.readableBytes()>=107){
byte[] bytes = new byte[107];
buf.readBytes(bytes);
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream oin = new ObjectInputStream(in);
Myheader header = (Myheader) oin.readObject();
System.out.println("client response id:"+header.getRequestID());
ResponseHander.runCallBack(header.requestID);
}
}
}
class ClientPool{
NioSocketChannel [] Clients;
Object [] lock;
ClientPool(int size){
Clients = new NioSocketChannel[size];
lock = new Object[size];
for (int i = 0; i < size; i++) {
lock[i] = new Object();
}
}
}

-
主线程调用远程接口,走动态代理,invoke()。
-
在invoke()里,组装ByteBuf(header+content),利用Client(NioSocketChannel)发送组装好ByteBuf到Server端
-
而Client是通过ClientFactory调用getClient(address)从连接池中获得
-
每一个NioSocketChannel注册了事件ClientResponse,这个事件会调用
ResponseHander.runCallBack(header.requestID)
-
在Server端注册ServerRequestHandler,当接收到信息后,往Client返回ByteBuf
-
在主逻辑中,为了发出请求后,主逻辑等待返回结果后,再往下执行。创建CountDownLatch,并加入回调逻辑,主线程await阻塞
CountDownLatch countDownLatch = new CountDownLatch(1); ResponseHander.addCallBack(id, new Runnable() { @Override public void run() { countDownLatch.countDown(); } });
-
当Client发送消息到Server端,Server端收到消息,执行ServerRequestHandler逻辑,往客户端发送消息,客户端收到消息,那么调用countDownLatch.countDown();主线程恢复
第一版bug

- 服务端的一次事件channelRead,可能接收到多个消息,导致一次的ByteBuf有多个header+content组合
- 服务端的一次事件channelRead,接收到的ByteBuf,里面含多个header+content组合,但是很可能最后一个header+content不是完整的组合,content不完整,在下一次的事件channelRead接收到的ByteBuf开头有上一次content的信息
2. 第二版
具体看步骤1 2 3 4
-
bug1
解决方法:循环读取
ServerRequestHandler
//1. if(buf.readableBytes()>=107) //2. buf.readBytes(bytes);
->
//1. while(buf.readableBytes()>=107) //2. buf.getBytes(buf.readerIndex(),bytes); //2.1读取content前 buf.readBytes(107);
-
Bug2
解决方法:
- 将本次不能处理的整个header+content拼接下一次的ByteBuf(ServerDecode extends ByteToMessageDecoder自带)
- 增加解码ServerDecode类extends ByteToMessageDecoder,将ServerRequestHandler逻辑转移到该类,并在服务端的pipeline中增加该类
完整代码:
MyRPCTest
public class MyRPCTest {
@Test
public void statServer(){
NioEventLoopGroup boss = new NioEventLoopGroup(50);
NioEventLoopGroup worker = boss;
ServerBootstrap sbs = new ServerBootstrap();
ChannelFuture bind = sbs.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("server accept client port:"+ch.remoteAddress().getPort());
ChannelPipeline p = ch.pipeline();
//4. 增加一个handler
p.addLast(new ServerDecode());
p.addLast(new ServerRequestHandler());
}
}).bind(new InetSocketAddress(9090));
try {
bind.sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//模拟consumer
@Test
public void get(){
new Thread(()->{
statServer();
}).start();
System.out.println("server started...");
AtomicInteger num = new AtomicInteger(0);
int size = 50;
Thread[] threads = new Thread[size];
for (int i = 0; i < size; i++) {
threads[i]= new Thread(()->{
Car car = proxyGet(Car.class);
String arg = "hello" + num.incrementAndGet();
String res = car.ooxx(arg);
System.out.println("client over msg:"+res+" src arg:"+arg);
});
}
for (Thread thread:threads) {
thread.start();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
public static <T>T proxyGet(Class<T> interfaceInfo){
//实现各个版本的动态代理
ClassLoader loader = interfaceInfo.getClassLoader();
Class<?>[] methodInfo = {interfaceInfo};
return (T)Proxy.newProxyInstance(loader, methodInfo, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String name = interfaceInfo.getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
MyContent content = new MyContent();
content.setArgs(args);
content.setName(name);
content.setMethodName(methodName);
content.setParameterTypes(parameterTypes);
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(out);
oout.writeObject(content);
byte[] msgBody = out.toByteArray();
Myheader header=createHeader(msgBody);
out.reset();
oout=new ObjectOutputStream(out);
oout.writeObject(header);
byte[] msgHeader = out.toByteArray();
ClientFactory factory = ClientFactory.getFactory();
NioSocketChannel clientChannel = factory.getClient(new InetSocketAddress(9090));
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeader.length + msgBody.length);
long id = header.getRequestID();
CompletableFuture<String> res = new CompletableFuture();
ResponseMappingCallback.addCallBack(id, res);
byteBuf.writeBytes(msgHeader);
byteBuf.writeBytes(msgBody);
ChannelFuture channelFuture = clientChannel.writeAndFlush(byteBuf);
channelFuture.sync();
//阻塞
return res.get();
}
});
}
public static Myheader createHeader(byte[] msg) {
Myheader header = new Myheader();
int size = msg.length;
int f = 0x14141414;
//0x14 0001 0100
long requestID = Math.abs(UUID.randomUUID().getLeastSignificantBits());
header.setFlag(f);
header.setDataLen(size);
header.setRequestID(requestID);
return header;
}
}
//增加ServerDecode
class ServerDecode extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> out) throws Exception {
//1. 从if->while 因为可能一次事件接收多个包
while(buf.readableBytes()>=108){
byte[] bytes = new byte[108];
//2. 从readBytes->getBytes 保证断开的包,把header+body放到下一次buf处理
buf.getBytes(buf.readerIndex(),bytes);
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream oin = new ObjectInputStream(in);
Myheader header = (Myheader) oin.readObject();
if(buf.readableBytes()>=header.getDataLen()){
//3. 移动指针到body开始位置 上面只是get并没移动指针
buf.readBytes(108);
byte [] data = new byte[(int)header.getDataLen()];
buf.readBytes(data);
ByteArrayInputStream din = new ByteArrayInputStream(data);
ObjectInputStream doin = new ObjectInputStream(din);
//A. 因现在服务端和客户端都公用一个解码器
if(header.getFlag()==0x14141414){
MyContent content= (MyContent)doin.readObject();
out.add(new Packmsg(header,content));
}else if(header.getFlag()==0x14141424){
MyContent content= (MyContent)doin.readObject();
out.add(new Packmsg(header,content));
}
}else{
break;
}
}
}
}
class ServerRequestHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packmsg requestPkg = (Packmsg) msg;
String ioThreadName = Thread.currentThread().getName();
//1. 当前的eventLoop处理 如果用这种写发io线程和executor的线程是同一个,因为一个连接就一个selector线性处理。
// ctx.executor().execute(new Runnable() {
//2. 分到的eventLoopGroup选择一个处理,那么io线程和executor的线程就是不同的线程
ctx.executor().parent().next().execute(new Runnable() {
@Override
public void run() {
String execThreadName = Thread.currentThread().getName();
MyContent content = new MyContent();
String s = "io threadName:" + ioThreadName + " exec threadName:" + execThreadName + " from args:" + requestPkg.content.getArgs()[0];
//System.out.println(s);
content.setRes(s);
byte[] contentByte = SerDerUtil.ser(content);
Myheader resHeader = new Myheader();
resHeader.setRequestID(requestPkg.header.getRequestID());
resHeader.setFlag(0x14141424);
resHeader.setDataLen(contentByte.length);
byte[] headerByte = SerDerUtil.ser(resHeader);
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(headerByte.length + contentByte.length);
byteBuf.writeBytes(headerByte);
byteBuf.writeBytes(contentByte);
ctx.writeAndFlush(byteBuf);
}
});
}
}
class ResponseMappingCallback {
static ConcurrentHashMap<Long,CompletableFuture> mapping = new ConcurrentHashMap<>();
public static void addCallBack(long requestID,CompletableFuture cb){
mapping.putIfAbsent(requestID,cb);
}
public static void runCallBack(Packmsg msg){
CompletableFuture cf = mapping.get(msg.header.getRequestID());
cf.complete(msg.content.getRes());
removeCB(msg.header.getRequestID());
}
public static void removeCB(long requestID){
mapping.remove(requestID);
}
}
interface Car{
public String ooxx(String msg);
}
interface Fly{
void xxoo(String msg);
}
class MyContent implements Serializable{
String name;
String methodName;
Class<?>[] parameterTypes ;
Object[] args;
String res;
public String getRes() {
return res;
}
public void setRes(String res) {
this.res = res;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
}
class Myheader implements Serializable{
//32bit可以设置很多信息
int flag;
long requestID;
long dataLen;
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
public long getRequestID() {
return requestID;
}
public void setRequestID(long requestID) {
this.requestID = requestID;
}
public long getDataLen() {
return dataLen;
}
public void setDataLen(long dataLen) {
this.dataLen = dataLen;
}
}
class ClientFactory{
private ClientFactory(){}
int poolSize = 10;
NioEventLoopGroup clientWorker;
Random rand = new Random();
private static final ClientFactory factory;
static{
factory = new ClientFactory();
}
public static ClientFactory getFactory(){
return factory;
}
ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();
public synchronized NioSocketChannel getClient(InetSocketAddress address){
ClientPool clientPool = outboxs.get(address);
if(null==clientPool){
outboxs.putIfAbsent(address,new ClientPool(poolSize));
clientPool=outboxs.get(address);
}
int i = rand.nextInt(poolSize);
if(null!=clientPool.Clients[i]&&clientPool.Clients[i].isActive()){
return clientPool.Clients[I];
}
synchronized (clientPool.lock[I]){
return clientPool.Clients[i] = create(address);
}
}
private NioSocketChannel create(InetSocketAddress address){
clientWorker = new NioEventLoopGroup(1);
Bootstrap bs = new Bootstrap();
ChannelFuture connect = bs.group(clientWorker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ServerDecode());
p.addLast(new ClientResponses());
}
}).connect(address);
try {
NioSocketChannel client =(NioSocketChannel) connect.sync().channel();
return client;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
class ClientResponses extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packmsg responsepkg = (Packmsg) msg;
ResponseMappingCallback.runCallBack(responsepkg);
}
}
class ClientPool{
NioSocketChannel [] Clients;
Object [] lock;
ClientPool(int size){
Clients = new NioSocketChannel[size];
lock = new Object[size];
for (int i = 0; i < size; i++) {
lock[i] = new Object();
}
}
}
Packmsg
public class Packmsg {
Myheader header;
MyContent content;
public Myheader getHeader() {
return header;
}
public void setHeader(Myheader header) {
this.header = header;
}
public MyContent getContent() {
return content;
}
public void setContent(MyContent content) {
this.content = content;
}
public Packmsg(Myheader header, MyContent content) {
this.header = header;
this.content = content;
}
}
SerDerUtil
public class SerDerUtil {
static ByteArrayOutputStream out = new ByteArrayOutputStream();
public synchronized static byte[] ser(Object msg){
out.reset();
ObjectOutputStream oout = null;
byte[] msgBody = null;
try {
oout = new ObjectOutputStream(out);
oout.writeObject(msg);
msgBody = out.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return msgBody;
}
}

-
主线程调用远程接口,走动态代理,invoke()。
-
在invoke()里,组装ByteBuf(header+content),利用Client(NioSocketChannel)发送组装好ByteBuf到Server端
-
而Client是通过ClientFactory调用getClient(address)从连接池中获得
-
每一个NioSocketChannel注册了事件ServerDecode和ClientResponse,
- 引入ServerDecode是为了解决Bug2,并且把header+content-->Packmsg
- ClientResponse这个事件会调用
Packmsg responsepkg = (Packmsg) msg; ResponseMappingCallback.runCallBack(responsepkg);
-
在Server端注册ServerDecode和ServerRequestHandler,当接收到信息后,往Client返回ByteBuf
-
引入ServerDecode是为了解决Bug2,并且把header+content-->Packmsg
-
ServerRequestHandler(已截取过代码)
这里Server接收到数据后,有4种写法:
-
在当前IO线程直接做业务处理,接收完就走业务处理 --> 处理业务线程 = IO线程
-
在当前IO线程自行抛出线程new Thread()处理业务 --> 处理业务线程 != IO线程
-
在当前IO线程用当前EventLoop执行(因为EventLoop也是一个执行器),和上面不同的是,这里是将接收到的信息变为task,然后task由EventLoop后续执行,单线程异步的-->处理业务线程 = IO线程
class ServerRequestHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String ioThreadName = Thread.currentThread().getName(); ctx.executor().execute(new Runnable() { @Override public void run() { String execThreadName = Thread.currentThread().getName(); String s = "io threadName:" + ioThreadName + " exec threadName:" + execThreadName + " from args:" + requestPkg.content.getArgs()[0]; } }); } }
-
-
- 在当前线程调用EventLoopGroup,选择一个去执行,这样充分将业务均匀分布在各个selector去执行-->处理业务线程 != IO线程
```java
class ServerRequestHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String ioThreadName = Thread.currentThread().getName();
ctx.executor().parent().next().execute(new Runnable() {
@Override
public void run() {
String execThreadName = Thread.currentThread().getName();
String s = "io threadName:" + ioThreadName + " exec threadName:" + execThreadName + " from args:" + requestPkg.content.getArgs()[0];
}
});
}
}
```
-
在主逻辑中,为了发出请求后,主逻辑等待返回结果后,再往下执行。创建CompletableFuture,并加入回调逻辑,主线程get阻塞
CompletableFuture<String> res = new CompletableFuture(); ResponseMappingCallback.addCallBack(id, res);
-
当Client发送消息到Server端,Server端收到消息,执行ServerRequestHandler逻辑,往客户端发送消息,客户端收到消息,那么调用CompletableFuture.complete;主线程get恢复
网友评论