RPC框架

作者: NazgulSun | 来源:发表于2020-09-30 17:30 被阅读0次
  1. 实现一个 rpc 框架:
    可参考(https://github.com/Snailclimb/guide-rpc-framework
  • 目标 定义 HelloService, client 调用 helloService.sayHello()的时候,可以完全屏蔽 远端服务的调用的细节。

实现服务的远程动态代理:
helloService 有一个代理实现,这个代理可以是 http,tcp, mq 怎么样都可以,这些细节对服务调研都是透明的。
代理的实现与springboot结合,可以自己定义一套 annotation,然后在BeanPostProcessor 的方法里面,对 bean做一个动态增强。
比如,如果一个bean里面引用RpcReference,就可以给他做一个动态代理,然后再把属性set回去。

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> targetClass = bean.getClass();
        Field[] declaredFields = targetClass.getDeclaredFields();
        for (Field declaredField : declaredFields) {
            RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);
            if (rpcReference != null) {
                RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
                        .group(rpcReference.group()).version(rpcReference.version()).build();
                RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceProperties);
                Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());
                declaredField.setAccessible(true);
                try {
                    declaredField.set(bean, clientProxy);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }

        }
        return bean;
    }

对于helloService 他的动态代理,就是RpcClientProxy, 当调用hello方法的时候,实际调用的是proxy的invoke方法

public Object invoke(Object proxy, Method method, Object[] args) {
        log.info("invoked method: [{}]", method.getName());
        RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
                .parameters(args)
                .interfaceName(method.getDeclaringClass().getName())
                .paramTypes(method.getParameterTypes())
                .requestId(UUID.randomUUID().toString())
                .group(rpcServiceProperties.getGroup())
                .version(rpcServiceProperties.getVersion())
                .build();
        RpcResponse<Object> rpcResponse = null;
        if (clientTransport instanceof NettyClientTransport) {
            CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) clientTransport.sendRpcRequest(rpcRequest);
            rpcResponse = completableFuture.get();
        }
        if (clientTransport instanceof SocketRpcClient) {
            rpcResponse = (RpcResponse<Object>) clientTransport.sendRpcRequest(rpcRequest);
        }
        RpcMessageChecker.check(rpcResponse, rpcRequest);
        return rpcResponse.getData();
    }

真正的实现细节是在:clientTransport.sendRpcRequest
clientTransport是一个接口,可以用不同的协议来实现,这里以netty作为说明

    @Override
    public CompletableFuture<RpcResponse<Object>> sendRpcRequest(RpcRequest rpcRequest) {
        // build return value
        CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
        // build rpc service name by rpcRequest
        String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
        // get server address
        InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
        // get  server address related channel
        Channel channel = channelProvider.get(inetSocketAddress);
        if (channel != null && channel.isActive()) {
            // put unprocessed request
            unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
            channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    log.info("client send message: [{}]", rpcRequest);
                } else {
                    future.channel().close();
                    resultFuture.completeExceptionally(future.cause());
                    log.error("Send failed:", future.cause());
                }
            });
        } else {
            throw new IllegalStateException();
        }

        return resultFuture;
    }

clientTransport 包含了 channelprovider,serviceProvider,并且是一个异步的调用,返回Future。
serviceDiscovery 负责重zk中查询,相应的服务,然后重可用服务列表里面选择一个可用,选择策略可以使自定义的。
channelProvider.get(inetSocketAddress) 则是从 nettyClient里面获取一个连接.也是使用回调的异步通知方式

    public Channel doConnect(InetSocketAddress inetSocketAddress) {
        CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
        bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
                completableFuture.complete(future.channel());
            } else {
                throw new IllegalStateException();
            }
        });
        return completableFuture.get();
    }
    
    主要的client 配置如下:
       // RpcResponse -> ByteBuf
    ch.pipeline().addLast(new DefaultDecoder(RpcResponse.class));
    // ByteBuf -> RpcRequest
    ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
    ch.pipeline().addLast(new NettyClientHandler());

一个是 KryoSeriablizer,一个是NettyClientHandler。
这两个比较简单,一个是java独有的序列化,一个是读取 服务端返回的 resp。

到这里基于java动态代理的 client sendRpcRequest 模式就处理完毕。

再看下Server端,Server端,其实就相对来说比较简单,主要的工作就是每启动一个服务,就注册起来,以便服务发现可以获得对应的服务。

   public void registerService(Object service, RpcServiceProperties rpcServiceProperties) {
        serviceProvider.publishService(service, rpcServiceProperties);
    }

    @SneakyThrows
    public void start() {
        CustomShutdownHook.getCustomShutdownHook().clearAll();
        String host = InetAddress.getLocalHost().getHostAddress();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    // 是否开启 TCP 底层心跳机制
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    // 当客户端第一次进行请求的时候才会进行初始化
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            // 30 秒之内没有收到客户端请求的话就关闭连接
                            ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
                            ch.pipeline().addLast(new DefaultDecoder(RpcRequest.class));
                            ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });

主要的工作就是定义RequestHandler,去处理不同客户端的请求。

    public Object handle(RpcRequest rpcRequest) {
        Object service = serviceProvider.getService(rpcRequest.toRpcProperties());
        return invokeTargetMethod(rpcRequest, service);
    }

    /**
     * get method execution results
     *
     * @param rpcRequest client request
     * @param service    service object
     * @return the result of the target method execution
     */
    private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) {
        Object result;
        try {
            Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
            result = method.invoke(service, rpcRequest.getParameters());
            log.info("service:[{}] successful invoke method:[{}]", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
        } catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
            throw new RpcException(e.getMessage(), e);
        }
        return result;
    }

request handler 就是根据request 从服务列表里面获得服务的对象,然后反射调用对应的方法。

高并发和性能
进一步处理高并发的问题,比如 有1000 qps 调用 hellowService,那么就要做scala的扩容。
服务端可以启动新的实例动态扩容,加入到zookeeper列表里面。 client端,需要订阅服务上线下线的事件,这样可以动态更新服务列表
而不用每次请求的时候,都去查询zookeeper。
其他优化层面,就是在Netty 上做优化,比如配置 boss work 的数目等。 序列化kyro,protobuf 等的选择。

fallback,熔断降级
熔断,比如网络问题,可能一次要等待一个timeout,如果大量的timeout可能会有问题,框架要识别这种情况,使得快速失败。
降级,可以认为的定义一些场景,比如 物理资源,网络资源,异常的时候优先给哪些服务提供资源,而另外一些服务则调用一些mock的服务,
这个可能要看一个完整的rpc框架,比如dubbo。

本文先理清楚整个 rpc 框架要处理的细节。

相关文章

网友评论

      本文标题:RPC框架

      本文链接:https://www.haomeiwen.com/subject/kuqpuktx.html