概述
架构的演进过程

单一应用架构-->垂直应用架构-->分布式架构-->流动计算架构
Rpc服务治理框架
主要有dubbo和spring Cloud
代码设计
功能描述
调用远程方法向调用本地方法一样方便
主要模块
api:对外开放的接口
provider:服务的具体实现
protocol:自定义协议
registry:有效的服务
monitor:监控
cosumer:客户端调用
jar
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.24.Final</version>
</dependency>
代码
api
public interface IOperationService {
/**
* 加
*/
int add(int a, int b);
/**
* 减
*/
int minus(int a, int b);
/**
* 乘
*/
int mul(int a, int b);
/**
* 除
*/
int div(int a, int b);
}
public interface IStudentSevice {
/**
* 获取学生名字
*/
String getStudentName(int id);
}
provider
public class OperationService implements IOperationService {
@Override
public int add(int a, int b) {
return a + b;
}
@Override
public int minus(int a, int b) {
return a - b;
}
@Override
public int mul(int a, int b) {
return a * b;
}
@Override
public int div(int a, int b) {
return a / b;
}
}
public class StudentSevice implements IStudentSevice {
@Override
public String getStudentName(int id) {
return "Edward" + id;
}
}
protocol
public class RpcInvocationProtocol implements Serializable {
private String className;
private String methodName;
private Class[] types;
private Object[] values;
}
registry
public class RpcRegistry {
private int port;
public RpcRegistry(int port) {
this.port = port;
}
/**
* 启动
*/
public void start(){
EventLoopGroup mainGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap server = new ServerBootstrap()
.group(mainGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>(){
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,
4, 0, 4))
.addLast(new LengthFieldPrepender(4))
.addLast("encoder",new ObjectEncoder())
.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
.addLast(new RpcRegistryHandler());
}
})
.option(ChannelOption.SO_BACKLOG,128)//128等到队列
.childOption(ChannelOption.SO_KEEPALIVE, true);//长连接
ChannelFuture future = server.bind(this.port).sync();//异步io操作
System.out.println("NIO Server Channel,绑定的端口:" + this.port);
future.channel().closeFuture().sync();
}catch (Exception e){
mainGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new RpcRegistry(8080).start();
}
}
/**
* 服务处理
*/
public class RpcRegistryHandler extends ChannelInboundHandlerAdapter {
//类名
private List<String> classNames = new ArrayList<String>();
//类名-->对象
private Map<String, Object> nameInstanceMap = new ConcurrentHashMap<>();
public RpcRegistryHandler() {
//扫描文件
scannerFile("netty.rpc.provider");
//注册
doRegister();
}
/**
* 扫描文件
*/
private void scannerFile(String packageName){
if(packageName.isEmpty()){
return;
}
URL fileUrl = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
for (File childFile : new File(fileUrl.getFile()).listFiles()) {
if(childFile.isFile()){
classNames.add(packageName + "." + childFile.getName().replace(".class",""));
}else{
scannerFile(packageName + "." + childFile.getName());
}
}
}
/**
* 注册
*/
private void doRegister(){
if(classNames.isEmpty()){
return;
}
try{
for (String className : classNames) {
Class<?> clazz = Class.forName(className);
String name = clazz.getInterfaces()[0].getName();
nameInstanceMap.put(name, clazz.newInstance());
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcInvocationProtocol protocol = (RpcInvocationProtocol) msg;
Object result = null;
if(nameInstanceMap.containsKey(protocol.getClassName())){
Object service = nameInstanceMap.get(protocol.getClassName());
Method method = service.getClass().getMethod(protocol.getMethodName(), protocol.getTypes());
result = method.invoke(service, protocol.getValues());
}
ctx.writeAndFlush(result);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
System.out.println("异常错误");
}
}
cosumer
/**
* 消费者代理类
*/
public class RpcConsumerProxy implements InvocationHandler {
private int port;
private Class clazz;
public RpcConsumerProxy(int port) {
this.port = port;
}
public <T> T createInstance(Class<T> clazz){
this.clazz = clazz;
return (T) Proxy.newProxyInstance(clazz.getClassLoader(),new Class[]{clazz}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//组装对象
RpcInvocationProtocol protocol = new RpcInvocationProtocol();
protocol.setClassName(clazz.getName());
protocol.setMethodName(method.getName());
protocol.setTypes(method.getParameterTypes());
protocol.setValues(args);
final RpcConsumerHandler consumerHandler = new RpcConsumerHandler();
//远程传输
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
Bootstrap server = new Bootstrap()
.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,
4, 0, 4))
.addLast(new LengthFieldPrepender(4))
.addLast("encoder", new ObjectEncoder())
.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
.addLast(consumerHandler);
}
})
.option(ChannelOption.SO_KEEPALIVE, true);//长连接
ChannelFuture future = server.connect("localhost", 8080).sync();
future.channel().writeAndFlush(protocol).sync();
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
workGroup.shutdownGracefully();
}
return consumerHandler.getContent();
}
}
/**
* 消费信息
*/
public class RpcConsumerHandler extends ChannelInboundHandlerAdapter {
private Object content;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
this.content = msg;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
System.out.println("客户端异常");
}
public Object getContent() {
return content;
}
}
/**
* 消费者
*/
public class RpcConsumer {
public static void main(String[] args) {
RpcConsumerProxy proxy = new RpcConsumerProxy(8080);
IStudentSevice studentSevice = proxy.createInstance(IStudentSevice.class);
System.out.println("获取学生名字: " + studentSevice.getStudentName(1));
IOperationService operationService = proxy.createInstance(IOperationService.class);
System.out.println("a + b = " + operationService.add(8,2));
System.out.println("a - b = " + operationService.minus(8,2));
System.out.println("a * b = " + operationService.mul(8,2));
System.out.println("a / b = " + operationService.div(8,2));
}
}
运行结果
服务端

客户端

网友评论