NettyRPCServer
package com.ctgu.netty.rpc.serverStub;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class NettyRPCServer {
private int port;
public NettyRPCServer(int port) {
this.port = port;
}
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("encoder", new ObjectEncoder());//编码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));//解码器
pipeline.addLast(new InvokeHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
System.out.println("-------server is ready------");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyRPCServer nettyRPCServer = new NettyRPCServer(9999);
nettyRPCServer.start();
}
}
InvokeHandler
package com.ctgu.netty.rpc.serverStub;
import com.ctgu.netty.rpc.entity.ClassInfo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.reflections.Reflections;
import java.lang.reflect.Method;
import java.util.Set;
public class InvokeHandler extends ChannelInboundHandlerAdapter {
/**
* 得到某个接口下某个实现类的名字
*
* @param classInfo
* @return
* @throws Exception
*/
private String getImplClassName(ClassInfo classInfo) throws Exception {
//服务方接口和实现类所在包路径
String interfacePath = "com.ctgu.netty.rpc.server";
int lastDot = classInfo.getClassName().lastIndexOf(".");
String interfaceName = classInfo.getClassName().substring(lastDot);
Class superaClass = Class.forName(interfacePath + interfaceName);
//得到某接口下所有实现类
Reflections reflections = new Reflections(interfacePath);
Set<Class> implClassSet = reflections.getSubTypesOf(superaClass);
if (implClassSet.size() == 0) {
System.out.println("未找到实现类");
return null;
} else if (implClassSet.size() > 1) {
System.out.println("找到多个实现类,未明确使用哪一个");
return null;
} else {
//把集合转化成数组
Class[] classes = implClassSet.toArray(new Class[0]);
return classes[0].getName();//得到实现类的名称
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClassInfo classInfo = (ClassInfo) msg;
Object object = Class.forName(getImplClassName(classInfo)).newInstance();
Method method = object.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
//通过反射调用实现类的方法
Object result = method.invoke(object, classInfo.getObjects());
ctx.writeAndFlush(result);
}
}
NettyRPCProxy
package com.ctgu.netty.rpc.clientStub;
import com.ctgu.netty.rpc.entity.ClassInfo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
public class NettyRPCProxy {
//根据接口创建代理对象
public static Object create(Class target) {
return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//封装ClassInfo
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(target.getName());
classInfo.setMethodName(method.getName());
classInfo.setObjects(args);
classInfo.setTypes(method.getParameterTypes());
//开始用Netty发送数据
EventLoopGroup group = new NioEventLoopGroup();
ResultHandler resultHandler = new ResultHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("encoder", new ObjectEncoder());//编码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));//解码器
pipeline.addLast(resultHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
channelFuture.channel().writeAndFlush(classInfo).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
return resultHandler.getResponse();
}
});
}
}
ResultHandler
package com.ctgu.netty.rpc.clientStub;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ResultHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
//读取客户端返回的数据(远程调用的结果)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
ctx.close();
}
}
网友评论