美文网首页
使用netty手写Rpc框架

使用netty手写Rpc框架

作者: 剑道_7ffc | 来源:发表于2020-03-25 12:24 被阅读0次

概述

架构的演进过程

image.png

单一应用架构-->垂直应用架构-->分布式架构-->流动计算架构

Rpc服务治理框架

主要有dubbo和spring Cloud

代码设计

功能描述

调用远程方法向调用本地方法一样方便

主要模块

api:对外开放的接口
provider:服务的具体实现
protocol:自定义协议
registry:有效的服务
monitor:监控
cosumer:客户端调用

jar

        <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>4.0.24.Final</version>
        </dependency>

代码

api

public interface IOperationService {
    /**
     * 加
     */
    int add(int a, int b);

    /**
     * 减
     */
    int minus(int a, int b);


    /**
     * 乘
     */
    int mul(int a, int b);


    /**
     * 除
     */
    int div(int a, int b);
}
public interface IStudentSevice {
    /**
     * 获取学生名字
     */
    String getStudentName(int id);
}

provider

public class OperationService implements IOperationService {
    @Override
    public int add(int a, int b) {
        return a + b;
    }

    @Override
    public int minus(int a, int b) {
        return a - b;
    }

    @Override
    public int mul(int a, int b) {
        return a * b;
    }

    @Override
    public int div(int a, int b) {
        return a / b;
    }
}
public class StudentSevice implements IStudentSevice {
    @Override
    public String getStudentName(int id) {
        return "Edward" + id;
    }
}

protocol

public class RpcInvocationProtocol implements Serializable {
    private String className;
    private String methodName;
    private Class[] types;
    private Object[] values;
}

registry

public class RpcRegistry {
    private int port;
    public RpcRegistry(int port) {
        this.port = port;
    }
    /**
     * 启动
     */
    public void start(){
        EventLoopGroup mainGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap server = new ServerBootstrap()
                    .group(mainGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>(){
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,
                                    4, 0, 4))
                                    .addLast(new LengthFieldPrepender(4))
                                    .addLast("encoder",new ObjectEncoder())
                                    .addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
                                    .addLast(new RpcRegistryHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG,128)//128等到队列
                    .childOption(ChannelOption.SO_KEEPALIVE, true);//长连接
            ChannelFuture future = server.bind(this.port).sync();//异步io操作
            System.out.println("NIO Server Channel,绑定的端口:" + this.port);
            future.channel().closeFuture().sync();
        }catch (Exception e){
            mainGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new RpcRegistry(8080).start();
    }
}
/**
 * 服务处理
 */
public class RpcRegistryHandler extends ChannelInboundHandlerAdapter {

    //类名
    private List<String> classNames = new ArrayList<String>();
    //类名-->对象
    private Map<String, Object> nameInstanceMap = new ConcurrentHashMap<>();

    public RpcRegistryHandler() {
        //扫描文件
        scannerFile("netty.rpc.provider");

        //注册
        doRegister();
    }

    /**
     * 扫描文件
     */
    private void scannerFile(String packageName){
        if(packageName.isEmpty()){
            return;
        }

        URL fileUrl = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
        for (File childFile : new File(fileUrl.getFile()).listFiles()) {
            if(childFile.isFile()){
                classNames.add(packageName + "." + childFile.getName().replace(".class",""));
            }else{
                scannerFile(packageName + "." + childFile.getName());
            }
        }
    }

    /**
     * 注册
     */
    private void doRegister(){
        if(classNames.isEmpty()){
            return;
        }

        try{
            for (String className : classNames) {
                Class<?> clazz = Class.forName(className);
                String name = clazz.getInterfaces()[0].getName();
                nameInstanceMap.put(name, clazz.newInstance());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcInvocationProtocol protocol = (RpcInvocationProtocol) msg;

        Object result = null;
        if(nameInstanceMap.containsKey(protocol.getClassName())){
            Object service = nameInstanceMap.get(protocol.getClassName());
            Method method = service.getClass().getMethod(protocol.getMethodName(), protocol.getTypes());
            result = method.invoke(service, protocol.getValues());
        }

        ctx.writeAndFlush(result);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();

        System.out.println("异常错误");
    }
}

cosumer

/**
 * 消费者代理类
 */
public class RpcConsumerProxy implements InvocationHandler {
    private int port;
    private Class clazz;

    public RpcConsumerProxy(int port) {
        this.port = port;
    }

    public <T> T createInstance(Class<T> clazz){
        this.clazz = clazz;

        return (T) Proxy.newProxyInstance(clazz.getClassLoader(),new Class[]{clazz}, this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //组装对象
        RpcInvocationProtocol protocol = new RpcInvocationProtocol();
        protocol.setClassName(clazz.getName());
        protocol.setMethodName(method.getName());
        protocol.setTypes(method.getParameterTypes());
        protocol.setValues(args);

        final RpcConsumerHandler consumerHandler = new RpcConsumerHandler();
        //远程传输
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            Bootstrap server = new Bootstrap()
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,
                                    4, 0, 4))
                                    .addLast(new LengthFieldPrepender(4))
                                    .addLast("encoder", new ObjectEncoder())
                                    .addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
                                    .addLast(consumerHandler);
                        }
                    })
                    .option(ChannelOption.SO_KEEPALIVE, true);//长连接
            ChannelFuture future = server.connect("localhost", 8080).sync();
            future.channel().writeAndFlush(protocol).sync();
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            workGroup.shutdownGracefully();
        }
        return consumerHandler.getContent();
    }
}
/**
 * 消费信息
 */
public class RpcConsumerHandler extends ChannelInboundHandlerAdapter {
    private Object content;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        this.content = msg;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        System.out.println("客户端异常");
    }

    public Object getContent() {
        return content;
    }
}
/**
 * 消费者
 */
public class RpcConsumer {
    public static void main(String[] args) {
        RpcConsumerProxy proxy = new RpcConsumerProxy(8080);
        IStudentSevice studentSevice = proxy.createInstance(IStudentSevice.class);
        System.out.println("获取学生名字: " + studentSevice.getStudentName(1));

        IOperationService operationService = proxy.createInstance(IOperationService.class);
        System.out.println("a + b = " + operationService.add(8,2));
        System.out.println("a - b = " + operationService.minus(8,2));
        System.out.println("a * b = " + operationService.mul(8,2));
        System.out.println("a / b = " + operationService.div(8,2));
    }
}

运行结果

服务端


image.png

客户端


image.png

相关文章

  • 使用netty手写Rpc框架

    概述 架构的演进过程 单一应用架构-->垂直应用架构-->分布式架构-->流动计算架构 Rpc服务治理框架 主要有...

  • 基于netty手写RPC框架

    代码目录结构 rpc-common存放公共类 rpc-interface为rpc调用方需要调用的接口 rpc-re...

  • 【手撸RPC框架】SpringBoot+Netty4实现RPC框

    【手撸RPC框架】SpringBoot+Netty4实现RPC框架 线程模型 Netty高性能架构设计[https...

  • Spark-通信架构

    Spark 2.x版本使用Netty通讯框架作为内部通讯组间。Spark基于Netty新的RPC框架借鉴了Akka...

  • 分布式事务框架 seata-golang 通信模型详解

    简介:Java 的世界里,大家广泛使用的一个高性能网络通信框架 netty,很多 RPC 框架都是基于 netty...

  • Netty之HelloWorld

    现在Java网络编程框架这块基本已经被Netty垄断了,几乎所有框架底层的RPC通信都是使用Netty来实现,Ne...

  • netty 基本使用- 作为http服务器

    netty 概念 netty 使用场景 1,可以作为rpc的通信的框架远程过程的调用。 2,netty可以作为长连...

  • Netty学习笔记

    公司使用thrift作为RPC框架,其中通信框架使用netty取代thrift自带的通信,所以看了官网文档,然后翻...

  • netty的高性能之道

    背景介绍 Netty 惊人的性能数据 使用Netty(NIO框架)相比于BIO性能提升8倍 传统RPC调用性能差的...

  • 自定义Netty编码器/解码器

    这是《手写RPC框架,我学会了什么?》系列的第06篇 埋坑篇 提到要用netty重写socket通信的部分。net...

网友评论

      本文标题:使用netty手写Rpc框架

      本文链接:https://www.haomeiwen.com/subject/pbhtuhtx.html