美文网首页Java系统架构师
这样基于Netty重构RPC框架你不可能知道

这样基于Netty重构RPC框架你不可能知道

作者: 猿灯塔 | 来源:发表于2020-04-01 09:02 被阅读0次

    今天呢!灯塔君跟大家讲:     

    基于Netty重构RPC框架

    一.CyclicBarrier方法说明

    1.单一应用架构

    当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。。(最开始58 同城的站点架构用一个词概括就是“ALL IN ONE”。就像一个单机系统,所有的东西都部署在一台机器 上,包括站点、数据库、文件等等。而工程师每天的核心工作就是CURD,前端传过来一些数据,然后 业务逻辑层拼装成一些CURD访问数据库,数据库返回数据,数据拼装成页面,最终返回到浏览器。此 时,用于简化增删改查工作量的数据访问框架(ORM)是关键)

    2、垂直应用架构

    当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆成互不相干的几个应用,以提 升效率。应用拆分为不相干的几个应用,前后端分离,此时用于加速前端页面开发的Web MVC框架是 关键

    3、分布式服务服务架构

    当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定 的服务中心,使前端应用能更快速的响应多变的市场需求。同时将公共能力API抽取出来,作为独立的 公共服务供其他调用者消费,以实现服务的共享和重用,降低开发和运维成本。应用拆分之后会按照模 块独立部署,接口调用由本地API演进成跨进程的远程方法调用,此时RPC框架应运而生。此时,用于 提高业务复用及整合的分布式服务框架(RPC)是关键。

    4、流动计算架构

    当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问 压力实时管理集群容量,提高集群利用率。此时,用于提高机器利用率的资源调度和治理中心(SOA)是 关键。面向服务的架构(SOA)是一个组件模型,它将应用程序的不同功能单元(称为服务)进行拆 分,并通过这些服务之间定义良好的接口和协议联系起来。

    没有RPC框架之前,我们的服务调用是这样的:

    看出接口的调用完全没有规律可循,想怎么调,就怎么调。这导致业务发展到一定阶段之后,对接口的 维护变得非常困难。于是有人提出了服务治理的概念。所有服务间不允许直接调用,而是先到注册中心 进行登记,再由注册中心统一协调和管理所有服务的状态并对外发布,调用者只需要记住服务名称,去 找注册中心获取服务即可。这样,极大地规范了服务的管理,可以提高了所有服务端可控性。整个设计思想其实在我们生活中也能 找到活生生的案例。例如:我们平时工作交流,大多都是用IM 工具,而不是面对面吼。大家只需要相 互记住运营商(也就是注册中心)提供的号码(如:腾讯QQ)即可。再比如:我们打电话,所有电话 号码有运营商分配。我们需要和某一个人通话时,只需要拨通对方的号码,运营商(注册中心,如中国 移动、中国联通、中国电信)就会帮我们将信号转接过去。

    二.RPC介绍

    1、RPC简介:

    Remote Procedure Call远程过程调用RPC就是从一台机器(客户端)上通过参数传递的        方式调用另一台机器(服务器)上的一个函数或 方法(可以统称为服务)并得到返回的结果。

    RPC 会隐藏底层的通讯细节(不需要直接处理Socket通讯或Http通讯) RPC 是一个请求响应模 型。

    客户端发起请求,服务器返回响应(类似于Http的工作方式) RPC 在使用形式上像调用本地函数 (或方法)一样去调用远程的函数(或方法)。

     2、RPC通信原理

    RPC的主要作用有三方面:

    1、进程间通讯

    2、提供和本地方法调用一样的机制

    3、屏蔽用户对远程调用的细节实现

    RPC框架的好处首先就是长链接,不必每次通信都要像http一样去3次握手,减少了网络开销;其次就 是RPC框架一般都有注册中心,有丰富的监控管理;发布、下线接口、动态扩展等,对调用方来说是无 感知,统一化的操作。

    3、RPC通过过程

    1)服务消费方(client)调用以本地调用方式调用服务; 

    2)client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体; 

    3)client stub找到服务地址,并将消息发送到服务端; 

    4)server stub收到消息后进行解码; 

    5)server stub根据解码结果调用本地的服务;6)本地服务执行并将结果返回给server stub;7)server stub将返回结果打包成消息并发送至消费方; 

    8)client stub接收到消息,并进行解码; 

    9)服务消费方得到最终结果。

    4、常用的分布式RPC框架

    dubbo:国内最早开源的RPC框架,由阿里巴巴公司开发并于2011年末对外开源,仅支持java语言

    4、常用的分布式RPC框架

    dubbo:国内最早开源的RPC框架,由阿里巴巴公司开发并于2011年末对外开源,仅支持java语言

    motan:微博内部使用的RPC框架,于2016年对外开源,仅支持java语言

    Thrift:轻量级的跨语言RPC通信方案,支持多大25种变成语言

    gRPC:Google于2015年对外开源的跨语言PRC框架,支持常用的C++、java、Python、Go、 Ruby、PHP等多种语言。

    目前流行的RPC 服务治理框架主要有Dubbo 和Spring Cloud,

    下面我们以比较经典的Dubbo 为例。

    Dubbo 核心模块主要有:

    Registry:注册中心(主要负责保存所有可用的服务名称和服务地址。) 

    Provider:服务中心(实现对外提供的所有服务的具体功能) 

    Consumer:消费端(调用远程服务的服务消费方) 

    Monitor:监控中心(统计服务的调用次数和调用时间的监控中心) 

    Container:服务运行容器

    api:主要用来定义对外开放的功能与服务接口。protocol:主要定义自定义传输协议的内容

    蓝色方框代表业务有交互,绿色方框代表只对Dubbo内部交互。蓝色虚线为初始化时调用,红色虚线为 运行时异步调用,红色实线为运行时同步调用

    0、服务在容器中启动,加载,运行Provider

    1、Provider在启动时,向Registry注册自己提供的服务 

    2、Consumer在启动时,向Registry订阅自己所需的服务 

    3、Registry给Consumer返回Provider的地址列表,如果Provider地址有变更(上线/下线机器), Registry将基于长连接推动变更数据给Consumer 

    4、Consumer从Provider地址列表中,基于软负载均衡算法,选一台进行调用,如果失败,重试另一 台调用 

    5、Consumer和Provider,在内存中累计调用次数和时间,定时每分钟一次将统计数据发送到Monitor

    4.具体实现

    1、api

    package com.rpc.api; 

    /**

    * API模块,provider和Consumer都遵循API模块的

    规范 * 用来定义对外开放的功能与服务接口 

    */ 

    public interface IRpcHelloService { 

    String hello(String name); 

    }

    package com.rpc.api; 

    public interface IRpcService { 

    /*增加用户*/ 

    public String addUser(); 

    /*删除用户*/ 

    public String deleteUser(Integer id); 

    /*修改用户*/ 

    public String updateUser(Integer id); 

    /*查询用户*/ 

    public String queryUser(Integer id); 

    }

    2、Provider:服务中心

    package com.rpc.provider; 

    import com.rpc.api.IRpcService; 

    public class RpcServiceImpl implements 

    IRpcService { 

    @Override 

    public String addUser() { 

    return "增加用户"; 

    }

    @Override 

    public String deleteUser(Integer id) { 

    return "删除了编号为"+id+"的用户"; 

    }

    @Override 

    public String updateUser(Integer id) { 

    return "修改了编号为"+id+"的用户"; 

    }

    @Override 

    public String queryUser(Integer id) { 

    return "查询到了编号为"+id+"这个用户的信息"; 

    }

    package com.rpc.provider; 

    import com.rpc.api.IRpcHelloService; 

    /**

    * 服务中心,实现对外提供的所有服务的具体功能 

    */ 

    public class RpcHelloServiceImpl implements IRpcHelloService { 

    public String hello(String name) { 

    return "Hello " + name + "!"; 

    }

    3、protocol传输协议内容

    ackage com.rpc.protocol; 

    import java.io.Serializable; 

    /**

    * 自定义传输协议内容 

    */ 

    public class InvokerProtocol implements 

    Serializable { 

    private String className;//类名 

    private String methodName;//函数名称 

    private Class<?>[] parames;//形参列表

    private Object[] values;//实参列表 

    public String getClassName() { 

    return className; 

    }

    public void setClassName(String className)

     { 

    this.className = className; 

    }

    public String getMethodName() { 

    return methodName; 

    }

    public void setMethodName

    (String methodName)

     { 

    this.methodName = methodName; 

    }

    public Class<?>[] getParames() { 

    return parames; 

    }

    public void setParames(Class<?>[] parames) 

    this.parames = parames; 

    }

    public Object[] getValues() { 

    return values; 

    }

    public void setValues(Object[] values) { 

    this.values = values; 

    }

    4、Registry注册中心

    package com.rpc.registry; 

    import io.netty.bootstrap.ServerBootstrap; 

    import io.netty.channel.*; 

    import io.netty.channel.nio.NioEventLoopGroup; 

    import io.netty.channel.socket.SocketChannel; 

    importio.netty.channel.socket.nio.

    NioServerSocketChannel; 

    import io.netty.handler.codec.

    LengthFieldBasedFrameDecoder; 

    import io.netty.handler.codec.LengthFieldPrepender; 

    import io.netty.handler.codec.serialization.

    ClassResolvers; 

    import io.netty.handler.codec.serialization.

    ObjectDecoder; 

    import io.netty.handler.codec.serialization.

    ObjectEncoder; 

    /**

    * 注册中心主要功能就是负责将所有Provider的

    服务名称和服务引用地址注册到一个容器中, 

    * 并对外发布。启动一个对外的服务,

    并对外提供一个可访问的端口 

    * 主要负责保存所有可用的服务名称和服务地址 

    */ 

    public class RpcRegistry { 

    private int port;

    public RpcRegistry(int port){ 

    this.port = port; 

    }

    public void start(){ 

    EventLoopGroup bossGroup = new NioEventLoopGroup(); 

    EventLoopGroup workerGroup = new NioEventLoopGroup(); 

    try {

    ServerBootstrap b = new ServerBootstrap(); 

    b.group(bossGroup, workerGroup) 

    .channel(NioServerSocketChannel.class) 

    .childHandler(new ChannelInitializer<SocketChannel>() { 

    @Override 

    protected void initChannel(SocketChannel ch) throws 

    Exception { 

    ChannelPipeline pipeline = ch.pipeline(); 

    //自定义协议解码器 

    /** 入参有5个,分别解释如下 

    (1) maxFrameLength - 发送的数据包最大长度; 

    (2) lengthFieldOffset - 长度域偏移量,指的是长度域位于 

    整个数据包字节数组中的下标; 

    (3) lengthFieldLength - 长度域的自己的字节数长度。 

    (4) lengthAdjustment – 长度域的偏移量矫正。如果长度域 

    的值,除了包含有效数据域的长度外,

    还包含了其他域(如长度域自身)长度,那么,

    就需要进行矫正。矫 正的值为:包长 - 长度域的值 – 

    长度域偏移 – 长度域长。 

    (5) initialBytesToStrip – 丢弃的起始字节数。

    丢弃处于有 效数据前面的字节数量。

    比如前面有4个节点的长度域,则它的值为4。 

    */ 

    pipeline.addLast(new 

    LengthFieldBasedFrameDecoder(Integer.

    MAX_VALUE, 0, 4, 0, 4)); 

    //自定义协议编码器 

    pipeline.addLast(new LengthFieldPrepender(4)); 

    //对象参数类型编码器 

    pipeline.addLast("encoder",new ObjectEncoder()); 

    //对象参数类型解码器 

    pipeline.addLast("decoder",new 

    ObjectDecoder(Integer.MAX_VALUE, 

    ClassResolvers.cacheDisabled(null))); 

    pipeline.addLast(new RegistryHandler()); 

    })

    .option(ChannelOption.SO_BACKLOG, 128) 

    .childOption(ChannelOption.SO_KEEPALIVE, true); 

    ChannelFuture future = b.bind(port).sync(); 

    System.out.println("jiangym RPC Registry 

    start listen at " + port ); 

    future.channel().closeFuture().sync(); 

    } catch (Exception e) { 

    bossGroup.shutdownGracefully(); 

    workerGroup.shutdownGracefully(); 

    }

    //主启动类 

    public static void main(String[] args) throws Exception { 

    new RpcRegistry(8888).start(); 

    }

    package com.rpc.registry; 

    import com.rpc.protocol.InvokerProtocol; 

    import io.netty.channel.ChannelHandlerContext; 

    import io.netty.channel.ChannelInboundHandlerAdapter; 

    import java.io.File; 

    import java.lang.reflect.Method; 

    import java.net.URL; 

    import java.util.ArrayList; 

    import java.util.List; 

    import java.util.concurrent.ConcurrentHashMap; 

    public class RegistryHandler extends ChannelInboundHandlerAdapter { 

    //保存所有可用的服务(ConcurrentHashMap

    是线程安全且高效的HashMap实现) 

    public static ConcurrentHashMap

    <String, Object> 

    registryMap = new ConcurrentHashMap<String, Object>(); 

    //保存所有相关的服务类 

    private List<String> classNames = new ArrayList<String>(); 

    public RegistryHandler() { 

    //完成递归扫描 

    scannerClass("com.rpc.provider"); 

    doRegister(); 

    }

    @Override 

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 

    Exception { 

    Object result = new Object(); 

    InvokerProtocol request = (InvokerProtocol) msg; 

    //当客户端建立连接时,需要从自定义协议中获取信息,

    拿到具体的服务和实参 

    //使用反射调用 

    if (registryMap.containsKey(request.getClassName())) { 

    Object clazz = registryMap.get(request.getClassName()); 

    Method method = clazz.getClass().getMethod(request.

    getMethodName(), 

    request.getParames()); 

    result = method.invoke(clazz, 

    request.getValues()); 

    }

    ctx.write(result); 

    ctx.flush(); 

    ctx.close(); 

    }

    @Override 

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 

    throws Exception { 

    cause.printStackTrace(); 

    ctx.close(); 

    }

    /*

    * 递归扫描 

    */ 

    private void scannerClass(String packageName)

     { 

    URL url = 

    this.getClass().getClassLoader().getResource

    (packageName.replaceAll("\\.", 

    "/")); 

    File dir = new File(url.getFile());

    for (File file : dir.listFiles()) { 

    //如果是一个文件夹,继续递归 

    if (file.isDirectory()) { 

    scannerClass(packageName + "." +

     file.getName()); 

    } else { 

    classNames.add(packageName + "." + 

    file.getName().replace(".class", "").trim()); 

    }

    /**

    * 完成注册 

    */ 

    private void doRegister() { 

    if (classNames.size() == 0) { 

    return; 

    }

    for (String className : classNames) { 

    try {

    Class<?> clazz = Class.forName(className); 

    Class<?> i = clazz.getInterfaces()[0]; 

    registryMap.put(i.getName(),

    clazz.newInstance()); 

    } catch (Exception e) { 

    e.printStackTrace(); 

    }

    4、消费端

    package com.rpc.consumer; 

    import com.rpc.api.IRpcHelloService; 

    import com.rpc.api.IRpcService; 

    public class RpcConsumer { 

    public static void main(String [] args){ 

    IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class); 

    System.out.println(rpcHello.hello("Netty")); 

    IRpcService service = RpcProxy.create(IRpcService.class); 

    System.out.println(service.deleteUser(4)); 

    System.out.println(service.updateUser(3)); 

    System.out.println(service.queryUser(2)); 

    System.out.println(service.addUser()); 

    }

    package com.rpc.consumer; 

    import com.rpc.protocol.InvokerProtocol; 

    import io.netty.bootstrap.Bootstrap; 

    import io.netty.channel.*; 

    import io.netty.channel.nio.NioEventLoopGroup; 

    import io.netty.channel.socket.SocketChannel;

    import io.netty.channel.socket.nio.NioSocketChannel; 

    import io.netty.handler.codec.

    LengthFieldBasedFrameDecoder; 

    import io.netty.handler.codec.LengthFieldPrepender; 

    import io.netty.handler.codec.serialization.ClassResolvers; 

    import io.netty.handler.codec.serialization.ObjectDecoder; 

    import io.netty.handler.codec.serialization.ObjectEncoder; 

    import java.lang.reflect.InvocationHandler; 

    import java.lang.reflect.Method; 

    import java.lang.reflect.Proxy; 

    public class RpcProxy { 

    public static <T> T create(Class<?> clazz) { 

    //clazz传进来本身就是interface 

    MethodProxy proxy = new MethodProxy(clazz); 

    Class<?>[] interfaces = clazz.isInterface() ? 

    new Class[]{clazz} : 

    clazz.getInterfaces(); 

    T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), 

    interfaces, proxy); 

    return result; 

    }

    private static class MethodProxy implements InvocationHandler { 

    private Class<?> clazz; 

    public MethodProxy(Class<?> clazz) { 

    this.clazz = clazz; 

    }

    public Object invoke(Object proxy, Method 

    method, Object[] args) throws 

    Throwable { 

    //如果传进来是一个已实现的具体类 

    if (Object.class.equals(method.

    getDeclaringClass())) { 

    try {

    return method.invoke(this, args); 

    } catch (Throwable t) { 

    t.printStackTrace(); 

    }

    //如果传进来的是一个接口(核心) 

    } else { 

    return rpcInvoke(proxy, method, args); 

    }

    return null; 

    }

    /**

    * 实现接口的核心方法 

    *

    * @param method 

    * @param args 

    * @return 

    */ 

    public Object rpcInvoke(Object proxy, Method method, Object[] args) { 

    //传输协议封装 

    InvokerProtocol msg = new InvokerProtocol(); 

    msg.setClassName(this.clazz.getName()); 

    msg.setMethodName(method.getName()); 

    msg.setValues(args); 

    msg.setParames(method.getParameterTypes());

    final RpcProxyHandler consumerHandler = new RpcProxyHandler(); 

    EventLoopGroup group = new NioEventLoopGroup(); 

    try {

    Bootstrap b = new Bootstrap(); 

    b.group(group) 

    .channel(NioSocketChannel.class) 

    .option(ChannelOption.TCP_NODELAY, true) 

    .handler(new ChannelInitializer<

    SocketChannel>() { 

    @Override 

    public void initChannel(SocketChannel ch)

     throws 

    Exception { 

    ChannelPipeline pipeline = ch.pipeline(); 

    //自定义协议解码器 

    /** 入参有5个,分别解释如下 

    maxFrameLength:框架的最大长度。

    如果帧的长度大于此 值,则将抛出TooLongFrameException。

    lengthFieldOffset:长度字段的偏移量:

    即对应的长度字 段在整个消息数据中得位置 

    lengthFieldLength:长度字段的长度:

    如:长度字段是int 型表示,那么这个值就是

    4(long型就是8)

    lengthAdjustment:要添加到长度字段值的补偿值 

    initialBytesToStrip:从解码帧中去除的第一个字节数 

    */ 

    pipeline.addLast("frameDecoder", new 

    LengthFieldBasedFrameDecoder

    (Integer.MAX_VALUE, 0, 4, 0, 4)); 

    //自定义协议编码器 

    pipeline.addLast("frameEncoder", new 

    LengthFieldPrepender(4)); 

    //对象参数类型编码器 

    pipeline.addLast("encoder", new 

    ObjectEncoder()); 

    //对象参数类型解码器 

    pipeline.addLast("decoder", new 

    ObjectDecoder( 

    Integer.MAX_VALUE, 

    ClassResolvers.cacheDisabled(null))); 

    pipeline.addLast("handler", consumerHandler); 

    }); 

    ChannelFuture future = b.connect("localhost", 8888).sync(); 

    future.channel().writeAndFlush(msg).sync(); 

    future.channel().closeFuture().sync(); 

    } catch (Exception e) { 

    e.printStackTrace(); 

    } finally { 

    group.shutdownGracefully(); 

    }

    return consumerHandler.getResponse(); 

    }

    package com.rpc.consumer; 

    import io.netty.channel.ChannelHandlerContext; 

    import io.netty.channe

    l.

    ChannelInboundHandlerAdapter;public class RpcProxyHandler extends ChannelInboundHandlerAdapter { 

    private Object response; 

    public Object getResponse() { 

    return response; 

    }

    @Override 

    public void channelRead

    (ChannelHandlerContext ctx, Object msg) throws 

    Exception { 

    response = msg; 

    }

    @Override 

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 

    throws Exception { 

    System.out.println("client exception is general"); 

    }

    今天的干货就到这啦!

    ​原创申明:本文由公众号【名师猿灯塔】原创,转载请说明出处标注

    今天是猿灯塔“365天原创计划”第6天。

    今天呢!灯塔君跟大家讲:     

           基于Netty重构RPC框架

    一.CyclicBarrier方法说明

    1.单一应用架构

    当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。。(最开始58 同城的站点架构用一个词概括就是“ALL IN ONE”。就像一个单机系统,所有的东西都部署在一台机器 上,包括站点、数据库、文件等等。而工程师每天的核心工作就是CURD,前端传过来一些数据,然后 业务逻辑层拼装成一些CURD访问数据库,数据库返回数据,数据拼装成页面,最终返回到浏览器。此 时,用于简化增删改查工作量的数据访问框架(ORM)是关键)

    2、垂直应用架构

    当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆成互不相干的几个应用,以提 升效率。应用拆分为不相干的几个应用,前后端分离,此时用于加速前端页面开发的Web MVC框架是 关键

    3、分布式服务服务架构

    当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定 的服务中心,使前端应用能更快速的响应多变的市场需求。同时将公共能力API抽取出来,作为独立的 公共服务供其他调用者消费,以实现服务的共享和重用,降低开发和运维成本。应用拆分之后会按照模 块独立部署,接口调用由本地API演进成跨进程的远程方法调用,此时RPC框架应运而生。此时,用于 提高业务复用及整合的分布式服务框架(RPC)是关键。

    4、流动计算架构

    当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问 压力实时管理集群容量,提高集群利用率。此时,用于提高机器利用率的资源调度和治理中心(SOA)是 关键。面向服务的架构(SOA)是一个组件模型,它将应用程序的不同功能单元(称为服务)进行拆 分,并通过这些服务之间定义良好的接口和协议联系起来。

    没有RPC框架之前,我们的服务调用是这样的:

    看出接口的调用完全没有规律可循,想怎么调,就怎么调。这导致业务发展到一定阶段之后,对接口的 维护变得非常困难。于是有人提出了服务治理的概念。所有服务间不允许直接调用,而是先到注册中心 进行登记,再由注册中心统一协调和管理所有服务的状态并对外发布,调用者只需要记住服务名称,去 找注册中心获取服务即可。这样,极大地规范了服务的管理,可以提高了所有服务端可控性。整个设计思想其实在我们生活中也能 找到活生生的案例。例如:我们平时工作交流,大多都是用IM 工具,而不是面对面吼。大家只需要相 互记住运营商(也就是注册中心)提供的号码(如:腾讯QQ)即可。再比如:我们打电话,所有电话 号码有运营商分配。我们需要和某一个人通话时,只需要拨通对方的号码,运营商(注册中心,如中国 移动、中国联通、中国电信)就会帮我们将信号转接过去。

    二.RPC介绍

    1、RPC简介:

    Remote Procedure Call远程过程调用RPC就是从一台机器(客户端)上通过参数传递的        方式调用另一台机器(服务器)上的一个函数或 方法(可以统称为服务)并得到返回的结果。

    RPC 会隐藏底层的通讯细节(不需要直接处理Socket通讯或Http通讯) RPC 是一个请求响应模 型。

    客户端发起请求,服务器返回响应(类似于Http的工作方式) RPC 在使用形式上像调用本地函数 (或方法)一样去调用远程的函数(或方法)。

     2、RPC通信原理

    RPC的主要作用有三方面:

    1、进程间通讯

    2、提供和本地方法调用一样的机制

    3、屏蔽用户对远程调用的细节实现

    RPC框架的好处首先就是长链接,不必每次通信都要像http一样去3次握手,减少了网络开销;其次就 是RPC框架一般都有注册中心,有丰富的监控管理;发布、下线接口、动态扩展等,对调用方来说是无 感知,统一化的操作。

    3、RPC通过过程

    1)服务消费方(client)调用以本地调用方式调用服务; 

    2)client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体; 

    3)client stub找到服务地址,并将消息发送到服务端; 

    4)server stub收到消息后进行解码; 

    5)server stub根据解码结果调用本地的服务;6)本地服务执行并将结果返回给server stub;7)server stub将返回结果打包成消息并发送至消费方; 

    8)client stub接收到消息,并进行解码; 

    9)服务消费方得到最终结果。

    4、常用的分布式RPC框架

    dubbo:国内最早开源的RPC框架,由阿里巴巴公司开发并于2011年末对外开源,仅支持java语言

    4、常用的分布式RPC框架

    dubbo:国内最早开源的RPC框架,由阿里巴巴公司开发并于2011年末对外开源,仅支持java语言

    motan:微博内部使用的RPC框架,于2016年对外开源,仅支持java语言

    Thrift:轻量级的跨语言RPC通信方案,支持多大25种变成语言

    gRPC:Google于2015年对外开源的跨语言PRC框架,支持常用的C++、java、Python、Go、 Ruby、PHP等多种语言。

    目前流行的RPC 服务治理框架主要有Dubbo 和Spring Cloud,

    下面我们以比较经典的Dubbo 为例。

    Dubbo 核心模块主要有:

    Registry:注册中心(主要负责保存所有可用的服务名称和服务地址。) 

    Provider:服务中心(实现对外提供的所有服务的具体功能) 

    Consumer:消费端(调用远程服务的服务消费方) 

    Monitor:监控中心(统计服务的调用次数和调用时间的监控中心) 

    Container:服务运行容器

    api:主要用来定义对外开放的功能与服务接口。protocol:主要定义自定义传输协议的内容

    蓝色方框代表业务有交互,绿色方框代表只对Dubbo内部交互。蓝色虚线为初始化时调用,红色虚线为 运行时异步调用,红色实线为运行时同步调用

    0、服务在容器中启动,加载,运行Provider

    1、Provider在启动时,向Registry注册自己提供的服务 

    2、Consumer在启动时,向Registry订阅自己所需的服务 

    3、Registry给Consumer返回Provider的地址列表,如果Provider地址有变更(上线/下线机器), Registry将基于长连接推动变更数据给Consumer 

    4、Consumer从Provider地址列表中,基于软负载均衡算法,选一台进行调用,如果失败,重试另一 台调用 

    5、Consumer和Provider,在内存中累计调用次数和时间,定时每分钟一次将统计数据发送到Monitor

    4.具体实现

    1、api

    package com.rpc.api; 

    /**

    * API模块,provider和Consumer都遵循API模块的

    规范 * 用来定义对外开放的功能与服务接口 

    */ 

    public interface IRpcHelloService { 

    String hello(String name); 

    }

    package com.rpc.api; 

    public interface IRpcService { 

    /*增加用户*/ 

    public String addUser(); 

    /*删除用户*/ 

    public String deleteUser(Integer id); 

    /*修改用户*/ 

    public String updateUser(Integer id); 

    /*查询用户*/ 

    public String queryUser(Integer id); 

    }

    2、Provider:服务中心

    package com.rpc.provider; 

    import com.rpc.api.IRpcService; 

    public class RpcServiceImpl implements 

    IRpcService { 

    @Override 

    public String addUser() { 

    return "增加用户"; 

    }

    @Override 

    public String deleteUser(Integer id) { 

    return "删除了编号为"+id+"的用户"; 

    }

    @Override 

    public String updateUser(Integer id) { 

    return "修改了编号为"+id+"的用户"; 

    }

    @Override 

    public String queryUser(Integer id) { 

    return "查询到了编号为"+id+"这个用户的信息"; 

    }

    package com.rpc.provider; 

    import com.rpc.api.IRpcHelloService; 

    /**

    * 服务中心,实现对外提供的所有服务的具体功能 

    */ 

    public class RpcHelloServiceImpl implements IRpcHelloService { 

    public String hello(String name) { 

    return "Hello " + name + "!"; 

    }

    3、protocol传输协议内容

    ackage com.rpc.protocol; 

    import java.io.Serializable; 

    /**

    * 自定义传输协议内容 

    */ 

    public class InvokerProtocol implements 

    Serializable { 

    private String className;//类名 

    private String methodName;//函数名称 

    private Class<?>[] parames;//形参列表

    private Object[] values;//实参列表 

    public String getClassName() { 

    return className; 

    }

    public void setClassName(String className)

     { 

    this.className = className; 

    }

    public String getMethodName() { 

    return methodName; 

    }

    public void setMethodName

    (String methodName)

     { 

    this.methodName = methodName; 

    }

    public Class<?>[] getParames() { 

    return parames; 

    }

    public void setParames(Class<?>[] parames) 

    this.parames = parames; 

    }

    public Object[] getValues() { 

    return values; 

    }

    public void setValues(Object[] values) { 

    this.values = values; 

    }

    4、Registry注册中心

    package com.rpc.registry; 

    import io.netty.bootstrap.ServerBootstrap; 

    import io.netty.channel.*; 

    import io.netty.channel.nio.NioEventLoopGroup; 

    import io.netty.channel.socket.SocketChannel; 

    importio.netty.channel.socket.nio.

    NioServerSocketChannel; 

    import io.netty.handler.codec.

    LengthFieldBasedFrameDecoder; 

    import io.netty.handler.codec.LengthFieldPrepender; 

    import io.netty.handler.codec.serialization.

    ClassResolvers; 

    import io.netty.handler.codec.serialization.

    ObjectDecoder; 

    import io.netty.handler.codec.serialization.

    ObjectEncoder; 

    /**

    * 注册中心主要功能就是负责将所有Provider的

    服务名称和服务引用地址注册到一个容器中, 

    * 并对外发布。启动一个对外的服务,

    并对外提供一个可访问的端口 

    * 主要负责保存所有可用的服务名称和服务地址 

    */ 

    public class RpcRegistry { 

    private int port;

    public RpcRegistry(int port){ 

    this.port = port; 

    }

    public void start(){ 

    EventLoopGroup bossGroup = new NioEventLoopGroup(); 

    EventLoopGroup workerGroup = new NioEventLoopGroup(); 

    try {

    ServerBootstrap b = new ServerBootstrap(); 

    b.group(bossGroup, workerGroup) 

    .channel(NioServerSocketChannel.class) 

    .childHandler(new ChannelInitializer<SocketChannel>() { 

    @Override 

    protected void initChannel(SocketChannel ch) throws 

    Exception { 

    ChannelPipeline pipeline = ch.pipeline(); 

    //自定义协议解码器 

    /** 入参有5个,分别解释如下 

    (1) maxFrameLength - 发送的数据包最大长度; 

    (2) lengthFieldOffset - 长度域偏移量,指的是长度域位于 

    整个数据包字节数组中的下标; 

    (3) lengthFieldLength - 长度域的自己的字节数长度。 

    (4) lengthAdjustment – 长度域的偏移量矫正。如果长度域 

    的值,除了包含有效数据域的长度外,

    还包含了其他域(如长度域自身)长度,那么,

    就需要进行矫正。矫 正的值为:包长 - 长度域的值 – 

    长度域偏移 – 长度域长。 

    (5) initialBytesToStrip – 丢弃的起始字节数。

    丢弃处于有 效数据前面的字节数量。

    比如前面有4个节点的长度域,则它的值为4。 

    */ 

    pipeline.addLast(new 

    LengthFieldBasedFrameDecoder(Integer.

    MAX_VALUE, 0, 4, 0, 4)); 

    //自定义协议编码器 

    pipeline.addLast(new LengthFieldPrepender(4)); 

    //对象参数类型编码器 

    pipeline.addLast("encoder",new ObjectEncoder()); 

    //对象参数类型解码器 

    pipeline.addLast("decoder",new 

    ObjectDecoder(Integer.MAX_VALUE, 

    ClassResolvers.cacheDisabled(null))); 

    pipeline.addLast(new RegistryHandler()); 

    })

    .option(ChannelOption.SO_BACKLOG, 128) 

    .childOption(ChannelOption.SO_KEEPALIVE, true); 

    ChannelFuture future = b.bind(port).sync(); 

    System.out.println("jiangym RPC Registry 

    start listen at " + port ); 

    future.channel().closeFuture().sync(); 

    } catch (Exception e) { 

    bossGroup.shutdownGracefully(); 

    workerGroup.shutdownGracefully(); 

    }

    //主启动类 

    public static void main(String[] args) throws Exception { 

    new RpcRegistry(8888).start(); 

    }

    package com.rpc.registry; 

    import com.rpc.protocol.InvokerProtocol; 

    import io.netty.channel.ChannelHandlerContext; 

    import io.netty.channel.ChannelInboundHandlerAdapter; 

    import java.io.File; 

    import java.lang.reflect.Method; 

    import java.net.URL; 

    import java.util.ArrayList; 

    import java.util.List; 

    import java.util.concurrent.ConcurrentHashMap; 

    public class RegistryHandler extends ChannelInboundHandlerAdapter { 

    //保存所有可用的服务(ConcurrentHashMap

    是线程安全且高效的HashMap实现) 

    public static ConcurrentHashMap

    <String, Object> 

    registryMap = new ConcurrentHashMap<String, Object>(); 

    //保存所有相关的服务类 

    private List<String> classNames = new ArrayList<String>(); 

    public RegistryHandler() { 

    //完成递归扫描 

    scannerClass("com.rpc.provider"); 

    doRegister(); 

    }

    @Override 

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 

    Exception { 

    Object result = new Object(); 

    InvokerProtocol request = (InvokerProtocol) msg; 

    //当客户端建立连接时,需要从自定义协议中获取信息,

    拿到具体的服务和实参 

    //使用反射调用 

    if (registryMap.containsKey(request.getClassName())) { 

    Object clazz = registryMap.get(request.getClassName()); 

    Method method = clazz.getClass().getMethod(request.

    getMethodName(), 

    request.getParames()); 

    result = method.invoke(clazz, 

    request.getValues()); 

    }

    ctx.write(result); 

    ctx.flush(); 

    ctx.close(); 

    }

    @Override 

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 

    throws Exception { 

    cause.printStackTrace(); 

    ctx.close(); 

    }

    /*

    * 递归扫描 

    */ 

    private void scannerClass(String packageName)

     { 

    URL url = 

    this.getClass().getClassLoader().getResource

    (packageName.replaceAll("\\.", 

    "/")); 

    File dir = new File(url.getFile());

    for (File file : dir.listFiles()) { 

    //如果是一个文件夹,继续递归 

    if (file.isDirectory()) { 

    scannerClass(packageName + "." +

     file.getName()); 

    } else { 

    classNames.add(packageName + "." + 

    file.getName().replace(".class", "").trim()); 

    }

    /**

    * 完成注册 

    */ 

    private void doRegister() { 

    if (classNames.size() == 0) { 

    return; 

    }

    for (String className : classNames) { 

    try {

    Class<?> clazz = Class.forName(className); 

    Class<?> i = clazz.getInterfaces()[0]; 

    registryMap.put(i.getName(),

    clazz.newInstance()); 

    } catch (Exception e) { 

    e.printStackTrace(); 

    }

    4、消费端

    package com.rpc.consumer; 

    import com.rpc.api.IRpcHelloService; 

    import com.rpc.api.IRpcService; 

    public class RpcConsumer { 

    public static void main(String [] args){ 

    IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class); 

    System.out.println(rpcHello.hello("Netty")); 

    IRpcService service = RpcProxy.create(IRpcService.class); 

    System.out.println(service.deleteUser(4)); 

    System.out.println(service.updateUser(3)); 

    System.out.println(service.queryUser(2)); 

    System.out.println(service.addUser()); 

    }

    package com.rpc.consumer; 

    import com.rpc.protocol.InvokerProtocol; 

    import io.netty.bootstrap.Bootstrap; 

    import io.netty.channel.*; 

    import io.netty.channel.nio.NioEventLoopGroup; 

    import io.netty.channel.socket.SocketChannel;

    import io.netty.channel.socket.nio.NioSocketChannel; 

    import io.netty.handler.codec.

    LengthFieldBasedFrameDecoder; 

    import io.netty.handler.codec.LengthFieldPrepender; 

    import io.netty.handler.codec.serialization.ClassResolvers; 

    import io.netty.handler.codec.serialization.ObjectDecoder; 

    import io.netty.handler.codec.serialization.ObjectEncoder; 

    import java.lang.reflect.InvocationHandler; 

    import java.lang.reflect.Method; 

    import java.lang.reflect.Proxy; 

    public class RpcProxy { 

    public static <T> T create(Class<?> clazz) { 

    //clazz传进来本身就是interface 

    MethodProxy proxy = new MethodProxy(clazz); 

    Class<?>[] interfaces = clazz.isInterface() ? 

    new Class[]{clazz} : 

    clazz.getInterfaces(); 

    T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), 

    interfaces, proxy); 

    return result; 

    }

    private static class MethodProxy implements InvocationHandler { 

    private Class<?> clazz; 

    public MethodProxy(Class<?> clazz) { 

    this.clazz = clazz; 

    }

    public Object invoke(Object proxy, Method 

    method, Object[] args) throws 

    Throwable { 

    //如果传进来是一个已实现的具体类 

    if (Object.class.equals(method.

    getDeclaringClass())) { 

    try {

    return method.invoke(this, args); 

    } catch (Throwable t) { 

    t.printStackTrace(); 

    }

    //如果传进来的是一个接口(核心) 

    } else { 

    return rpcInvoke(proxy, method, args); 

    }

    return null; 

    }

    /**

    * 实现接口的核心方法 

    *

    * @param method 

    * @param args 

    * @return 

    */ 

    public Object rpcInvoke(Object proxy, Method method, Object[] args) { 

    //传输协议封装 

    InvokerProtocol msg = new InvokerProtocol(); 

    msg.setClassName(this.clazz.getName()); 

    msg.setMethodName(method.getName()); 

    msg.setValues(args); 

    msg.setParames(method.getParameterTypes());

    final RpcProxyHandler consumerHandler = new RpcProxyHandler(); 

    EventLoopGroup group = new NioEventLoopGroup(); 

    try {

    Bootstrap b = new Bootstrap(); 

    b.group(group) 

    .channel(NioSocketChannel.class) 

    .option(ChannelOption.TCP_NODELAY, true) 

    .handler(new ChannelInitializer<

    SocketChannel>() { 

    @Override 

    public void initChannel(SocketChannel ch)

     throws 

    Exception { 

    ChannelPipeline pipeline = ch.pipeline(); 

    //自定义协议解码器 

    /** 入参有5个,分别解释如下 

    maxFrameLength:框架的最大长度。

    如果帧的长度大于此 值,则将抛出TooLongFrameException。

    lengthFieldOffset:长度字段的偏移量:

    即对应的长度字 段在整个消息数据中得位置 

    lengthFieldLength:长度字段的长度:

    如:长度字段是int 型表示,那么这个值就是

    4(long型就是8)

    lengthAdjustment:要添加到长度字段值的补偿值 

    initialBytesToStrip:从解码帧中去除的第一个字节数 

    */ 

    pipeline.addLast("frameDecoder", new 

    LengthFieldBasedFrameDecoder

    (Integer.MAX_VALUE, 0, 4, 0, 4)); 

    //自定义协议编码器 

    pipeline.addLast("frameEncoder", new 

    LengthFieldPrepender(4)); 

    //对象参数类型编码器 

    pipeline.addLast("encoder", new 

    ObjectEncoder()); 

    //对象参数类型解码器 

    pipeline.addLast("decoder", new 

    ObjectDecoder( 

    Integer.MAX_VALUE, 

    ClassResolvers.cacheDisabled(null))); 

    pipeline.addLast("handler", consumerHandler); 

    }); 

    ChannelFuture future = b.connect("localhost", 8888).sync(); 

    future.channel().writeAndFlush(msg).sync(); 

    future.channel().closeFuture().sync(); 

    } catch (Exception e) { 

    e.printStackTrace(); 

    } finally { 

    group.shutdownGracefully(); 

    }

    return consumerHandler.getResponse(); 

    }

    package com.rpc.consumer; 

    import io.netty.channel.ChannelHandlerContext; 

    import io.netty.channe

    l.

    ChannelInboundHandlerAdapter;public class RpcProxyHandler extends ChannelInboundHandlerAdapter { 

    private Object response; 

    public Object getResponse() { 

    return response; 

    }

    @Override 

    public void channelRead

    (ChannelHandlerContext ctx, Object msg) throws 

    Exception { 

    response = msg; 

    }

    @Override 

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 

    throws Exception { 

    System.out.println("client exception is general"); 

    }

    今天的干货就到这啦!

    相关文章

      网友评论

        本文标题:这样基于Netty重构RPC框架你不可能知道

        本文链接:https://www.haomeiwen.com/subject/jymiuhtx.html