美文网首页
gRPC客户端详解

gRPC客户端详解

作者: 寒歌属熊总冬眠 | 来源:发表于2019-10-09 21:20 被阅读0次

    RPC框架的选择

    常见的RPC框架主要分为轻重两种。较轻的框架一般只负责通信,如rmi、webservice、restful、Thrift、gRPC等。较重的框架一般包括完整的服务发现、负载均衡策略等等如BAT三家的Dubbo、brpc、Tars之类。

    框架选择时个人认为首先要考虑的是框架的历史和项目的活跃程度。一个历史悠久的活跃项目(大概至少可以保证每两到三个月有一次小版本的更新)可以保证各种bug早已暴露并修复,让我们可以更专注于我们自己的项目本身,而不是要担心究竟是我们自己的代码有问题还是框架本身就有问题。

    重量级RPC框架有一个主要问题就是结构复杂,另外主语言之外的代码质量也不太容易保证。个人认为活跃的社区以及一个活跃的开源管理团队是这些重型RPC框架项目成功的必要前提条件。比如我们项目组试用过腾讯的Tars,C++同学表示没有任何问题,然后JAVA同学表示java版本有许多bug,修复bug的pull request需要两个多月才能得到merge,而官方jar包也将近两年没有更新过了。

    轻量级rpc框架中,restful可以被视作标杆。由于restful基于http协议,天然被各种框架支持,而且非常灵活。restful的缺点有两方面,一是过于灵活,缺少根据协议生成服务端和客户端代码的工具,联调往往要花更多的时间;二是大部分序列化基于json或者xml,相对来讲效率不理想。和restful相比,其它很多轻量级框架都有这样或者那样的缺点,有的缺少跨语言支持(rmi),有的既繁琐又缺乏效率优势(webservice)。个人认为其中相对理想的是gRPC和Thrift。

    gRPC简介

    Protobuf是一种google推出的非常流行的跨语言序列化/反序列化框架。在Protobuf2中就已经出现了用rpc定义服务的概念,但是一直缺少一种流行的rpc框架支持。当Http2推出之后,google将Http2和protobuf3结合,推出了gRPC。gRPC继承了Protobuf和Http2的优点,包括:

    • 序列化反序列化性能好
    • 强类型支持
    • 向前/向后兼容
    • 有代码生成机制,而且可以支持多语言
    • 长连接、多路复用

    同时gRPC还提供了简单地服务发现和负载均衡功能。虽然这并不是gRPC框架的重点,但是开发者可以非常容易的自己扩展gRPC这些功能,实现自己的策略或应用最新的相关方面技术,而不用像重型RPC框架一样受制于框架本身是否支持。

    gRPC与Thrift对比

    Thrift是Facebook推出的一种RPC框架,从性能上来讲远优于gRPC。但是在实际调研时发现有一个很麻烦的问题:Thrift的客户端是线程不安全的——这意味着在Spring中无法以单例形式注入到Bean中。解决方案有三种:

    1. 每次调用创建一个Thrift客户端。这不仅意味着额外的对象创建和垃圾回收开销,而且实际上相当于只使用了短链接,这是一个开发复杂度最低但是从性能上来讲最差的解决方案。
    2. 利用Pool,稍微复杂一点的解决方案,但是也非常成熟。但是问题在于一来缺少服务发现和负载均衡恐实现,需要很多额外开发;二来需要创建Pool数量*服务端数量个客户端,内存开销会比较大。
    3. 使用异步框架如Netty,可以成功避免创建过多的客户端,但是仍要自己实现服务发现和负载均衡,相对复杂。实际上Facebook有一个基于Netty的Thrift客户端,叫Nifty,但是快四年没更新了。。。

    相比较而言gRPC就友好多了,本身有简单而且可扩展的服务发现和负载均衡功能,底层基于Netty所以线程安全,在不需要极限压榨性能的情况下是非常好的选择。当然如果需要极限压榨性能Thrift也未必够看。

    gRPC入门

    gRPC服务定义

    gRPC中有一个特殊的关键字stream,表示可以以流式输入或输出多个protobuf对象。注意只有异步非阻塞的客户端支持以stream形式输入,同步阻塞客户端不支持以stream形式输入。

    syntax = "proto3";  //gRPC必须使用proto3
    
    option java_multiple_files = true;
    option java_package = "cn.lmh.examples.grpc.proto";
    
    service RouteGuide {
        // 输入一个坐标,返回坐标和时间(1:1)
        rpc getPoint(Point) returns (LocationNote) {}
        // 输入一个矩形,以stream形式返回一系列点(1:n)
        rpc listPoints(Rectangle) returns (stream Point) {}
        // 以stream形式输入一系列点,返回点的数量和总共花费的时间(m:1)
        rpc recordRoute(stream Point) returns (RouteSummary) {}
        // 以stream形式输入一系列点,以stream形式返回已输入点的数量和总共花费的时间(m:n)
        rpc getPointStream(stream Point) returns (stream RouteSummary) {}
    }
    
    message Point {
        int32 latitude = 1;
        int32 longitude = 2;
    }
    message Rectangle {
        Point lo = 1;
        Point hi = 2;
    }
    message LocationNote {
        Point location = 1;
        int64 timestamp = 2;
    }
    message RouteSummary {
        int32 point_count = 1;
        int64 elapsed_time = 2;
    }
    

    依赖和代码生成

    由于protoc的gRPC插件需要自己编译,而且存在环境问题。推荐使用gradle或者maven的protobuf插件。入门示例项目使用了gradle,根目录build.gradle配置如下:

    plugins {
        id 'java'
        id 'idea'
        id 'wrapper'
    }
    
    ext {
        groupId = 'cn.lmh.leviathan'
        proto = [
            version : "3.9.0",
            "grpc" :[
                version : "1.23.0"
            ]
        ]
    }
    
    allprojects{
        apply plugin: 'java'
        apply plugin: 'idea'
    
        sourceCompatibility=JavaVersion.VERSION_1_8
        targetCompatibility=JavaVersion.VERSION_1_8
    
        project.group = 'cn.lmh.examples'
    
        compileJava.options.encoding = 'UTF-8'
    }
    
    subprojects{
        repositories {
            mavenCentral()
            mavenLocal();
        };
        configurations {
            compile
        }
    
        dependencies {
            compile "io.grpc:grpc-netty-shaded:${proto.grpc.version}"
            compile "io.grpc:grpc-protobuf:${proto.grpc.version}"
            compile "io.grpc:grpc-stub:${proto.grpc.version}"
    
            testCompile group: 'junit', name: 'junit', version: '4.12'
        }
    }
    

    子项目build.gradle如下:

    plugins{
        id 'com.google.protobuf' version '0.8.10'   //引入protobuf插件
    }
    
    sourceSets{
        main{
            proto {
                srcDir 'src/main/proto' //指定.proto文件所在的位置
            }
        }
    }
    
    protobuf {
        generatedFilesBaseDir = "$projectDir/src"   //生成文件的根目录
    
        protoc {
            artifact = "com.google.protobuf:protoc:${proto.version}"    //protoc的版本
        }
    
        plugins {
            grpc {
                artifact = "io.grpc:protoc-gen-grpc-java:${proto.grpc.version}" //gRPC的版本
            }
        }
    
        generateProtoTasks {
            all()*.plugins {
                grpc {
                    outputSubDir = "java"   //grpc生成文件的子目录
                }
            }
        }
    }
    

    我们的入门子项目名称叫做starter,配置好build.gradle之后,执行gradlew :starter:generateProto就可以在src/main/java下生成对应的文件:

    gRPC生成的目录结构

    服务端

    无论客户端以异步非阻塞还是同步阻塞形式调用,gRPC服务端的Response都是异步形式。对于异步的Request或者Response,都需要实现gRPC的io.grpc.stub.StreamObserver接口。io.grpc.stub.StreamObserver接口有三个方法:

    • onNext:表示接收/发送一个对象
    • onError:处理异常
    • onCompleted:表示Request或Response结束

    当Request发送到服务端端时,会异步调用requestObserver的onNext方法,直到结束时调用requestObserver的onCompleted方法;服务端调用responseObserver的onNext把Response返回给客户端,直到调用responseObserver的onCompleted方法通知客户端Response结束。服务端代码如下:

    public class RouteGuideServer {
        private final int port;
        private final Server server;
    
        public RouteGuideServer(int port) throws IOException {
            this.port = port;
            server = ServerBuilder.forPort(port).addService(new RouteGuideService())
                    .build();
        }
    
        /**
         * Start server.
         */
        public void start() throws IOException {
            server.start();
            System.out.println("Server started, listening on " + port);
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    RouteGuideServer.this.stop();
                }
            });
        }
    
        /**
         * Stop server
         */
        public void stop() {
            if (server != null) {
                server.shutdown();
            }
        }
    
        /**
         * Await termination on the main thread since the grpc library uses daemon threads.
         */
        private void blockUntilShutdown() throws InterruptedException {
            if (server != null) {
                server.awaitTermination();
            }
        }
    
        public static void main(String[] args) throws Exception {
            RouteGuideServer server = new RouteGuideServer(8980);
            server.start();
            server.blockUntilShutdown();
        }
    
        private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
            @Override
            public void getPoint(Point request, StreamObserver<LocationNote> responseObserver) {
                LocationNote value = LocationNote
                    .newBuilder()
                    .setLocation(request)
                    .setTimestamp(System.nanoTime())
                    .build();
                responseObserver.onNext(value);
                responseObserver.onCompleted();
            }
    
            @Override
            public void listPoints(Rectangle request, StreamObserver<Point> responseObserver) {
                int left = Math.min(request.getLo().getLongitude(), request.getHi().getLongitude());
                int right = Math.max(request.getLo().getLongitude(), request.getHi().getLongitude());
                int top = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
                int bottom = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
                for (int x = left; x <= right; x++) {
                    for (int y = top; y >= bottom; y--) {
                        Point point = Point.newBuilder().setLongitude(x).setLatitude(y).build();
                        responseObserver.onNext(point);
                    }
                }
                responseObserver.onCompleted();
            }
    
            @Override
            public StreamObserver<Point> recordRoute(StreamObserver<RouteSummary> responseObserver) {
                return new StreamObserver<Point>() { //返回的是requestObserver
                    AtomicInteger pointCount = new AtomicInteger(0);
                    final long startTime = System.nanoTime();
    
                    @Override
                    public void onNext(Point value) {
                        int count = pointCount.incrementAndGet();
                    }
    
                    @Override
                    public void onError(Throwable t) {
                    }
    
                    @Override
                    public void onCompleted() {
                        RouteSummary result = RouteSummary.newBuilder()
                          .setElapsedTime(System.nanoTime() - startTime).setPointCount(pointCount.get()).build();
                        responseObserver.onNext(result);
                        responseObserver.onCompleted();
                    }
                };
            }
    
            @Override
            public StreamObserver<Point> getPointStream(StreamObserver<RouteSummary> responseObserver) {
                return new StreamObserver<Point>() { //返回的是requestObserver
                    AtomicInteger pointCount = new AtomicInteger(0);
                    final long startTime = System.nanoTime();
    
                    @Override
                    public void onNext(Point value) {
                        int count = pointCount.incrementAndGet();
                        RouteSummary result = RouteSummary.newBuilder()
                          .setElapsedTime(System.nanoTime() - startTime).setPointCount(count).build();
                        responseObserver.onNext(result);
                    }
    
                    @Override
                    public void onError(Throwable t) {
                    }
    
                    @Override
                    public void onCompleted() {
                        responseObserver.onCompleted();
                    }
                };
            }
        }
    }
    

    客户端

    gRPC的客户端有同步阻塞客户端(blockingStub)和异步非阻塞客户端(Stub)两种。同步客户端使用比较方便,但是性能较低,而且不支持stream形式的Request;异步客户端性能较高,支持stream形式的Request,但是如果想要以同步方式调用需要额外封装。本文将主要以异步为例。

    异步转同步

    由于gRPC的异步客户端性能较高且功能更完整,所以一般都会采用异步客户端。异步客户端接收到的Response也是以io.grpc.stub.StreamObserver形式。由于客户端的调用可能是在异步进程中但更可能是在同步进程中,所以就存在一个如何把gRPC异步Response转为同步Response的问题。

    一个比较常见的思路是写一个io.grpc.stub.StreamObserver实现,里面有一个内置变量保存异步Response的结果,再添加一个阻塞式的get()方法,直到Response结束才把所有结果返回。要知道Response是否结束,需要添加一个Boolean或者AtomicBoolean变量,初始化为false,调用responseObserver.onCompleted()方法时设置为true,这样就可以通过这个变量判断Response是否结束。

    阻塞get()方法最常见的思路是get()写一个while循环,直到变量值改为true才退出循环并返回结果。这种方式的优点是简单直接,任何语言都可以简单实现,缺点是由于使用循环可能CPU占用较高。而对于java这种多线程比较完善的语言,另一个比较好思路是Response结束前将线程挂起,当调用responseObserver.onCompleted()方法再唤醒线程。代码如下:

    public class CallableStreamObserver<T> implements StreamObserver<T> {
        List<T> values = new ArrayList<T>();
        boolean isCompleted = false;
        Throwable t = null;
    
        @Override
        public void onNext(T value) {
            this.values.add(value);
        }
    
        @Override
        public void onError(Throwable t) {
            this.isCompleted = true;
            notifyAll();
        }
    
        @Override
        public synchronized void onCompleted() {
            this.isCompleted = true;
            notifyAll();
        }
    
        public List<T> get() throws Throwable {
            if (!this.isCompleted) {
                synchronized (this) {
                    this.wait(60 * 1000);
                }
            }
            if (null != t) {
                throw this.t;
            } else {
                return this.values;
            }
        }
    }
    

    客户端代码

    public class RouteGuideClient {
    
        private final ManagedChannel channel;
        private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;
        private final RouteGuideGrpc.RouteGuideStub asyncStub;
    
        public RouteGuideClient(String host, int port) {
            String target = "dns:///" + host + ":" + port;
            ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
                    .forTarget(target)
                    .usePlaintext();
            channel = channelBuilder.build();
            blockingStub = RouteGuideGrpc.newBlockingStub(channel);
            asyncStub = RouteGuideGrpc.newStub(channel);
        }
    
        public void shutdown() throws InterruptedException {
            channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
        }
    
        public LocationNote getPoint(int lo, int lt, boolean blocking) throws Throwable {
            Point point = Point.newBuilder().setLongitude(lo).setLatitude(lt).build();
            if(blocking) {
                return blockingStub.getPoint(point);
            }else{
                CallableStreamObserver<LocationNote> responseObserver = new CallableStreamObserver<LocationNote>();
                asyncStub.getPoint(point, responseObserver);
                return responseObserver.get().get(0);
            }
        }
    
        public Iterator<Point> listPoints(int left, int top, int right, int bottom, boolean blocking) throws Throwable {
            Point hi = Point.newBuilder().setLongitude(left).setLatitude(top).build();
            Point lo = Point.newBuilder().setLongitude(right).setLatitude(bottom).build();
            Rectangle rec = Rectangle.newBuilder().setHi(hi).setLo(lo).build();
            if(blocking){
                return blockingStub.listPoints(rec);
            }else{
                CallableStreamObserver<Point> responseObserver = new CallableStreamObserver<Point>();
                asyncStub.listPoints(rec, responseObserver);
                return responseObserver.get().iterator();
            }
        }
    
        public RouteSummary recordRoute(Collection<Point> points) throws Throwable {
            CallableStreamObserver<RouteSummary> responseObserver = new CallableStreamObserver<RouteSummary>();
            StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
            points.stream().parallel().forEach(p -> requestObserver.onNext(p));
            requestObserver.onCompleted();
            return responseObserver.get().get(0);
    
        }
    
        public List<RouteSummary> getPointStream(Collection<Point> points) throws Throwable {
            CallableStreamObserver<RouteSummary> responseObserver = new CallableStreamObserver<RouteSummary>();
            StreamObserver<Point> requestObserver = asyncStub.getPointStream(responseObserver);
            points.stream().parallel().forEach(p -> requestObserver.onNext(p));
            requestObserver.onCompleted();
            return responseObserver.get();
        }
    }
    

    gRPC客户端代码详解

    gRPC官方将自己分为三层组件:Stub、Channel和Transport。

    • Stub层是最上层的代码,gRPC附带的插件可以从.proto文件直接生成Stub层代码,开发人员通过直接调用Stub层的代码调用RPC服务
    • Channel层是对Transport层功能的抽象,同时提供了很多有用的功能,比如服务发现和负载均衡。。
    • Transport层承担了将字节从网络中取出和放入数据的工作,有三种实现Netty、okHttp、inProgress。Transport层是最底层的代码。

    整个grpc-java项目的代码比较多。从风格上来讲,封装比较多,相对于interface更喜欢使用abstract class,相对于反射更喜欢使用硬编码,而且大量使用了单线程异步调用造成调用栈断裂,与常见的java项目的编码风格有很大差别,阅读起来可能容易不习惯。

    在源码层面本文将关注下面这些方面:

    • Channel的初始化过程;
    • gRPC中的服务发现;
    • gRPC中的负载均衡
    • Client与Server之间的数据传输

    Channel的初始化过程

    通过入门示例可以看到,Channel的初始化过程分三步:

    1. 调用forTarget方法创建io.grpc.ManagedChannelBuilder;
    2. 配置各种选项,不论如何配置,返回的总是io.grpc.ManagedChannelBuilder对象;
    3. 调用build方法创建io.grpc.ManagedChannel

    forTarget方法

    gRPC这里设计比较繁琐,过程比较绕。forTarget方法的实际功能就是把参数target赋值给io.grpc.ManagedChannelBuilder的内部变量target

    public static ManagedChannelBuilder<?> forTarget(String target) {
        return ManagedChannelProvider.provider().builderForTarget(target);
    }
    

    io.grpc.ManagedChannelProvider.provider()会返回一个io.grpc.ManagedChannelProvider实现。有哪些io.grpc.ManagedChannelProvider实现是在io.grpc.ManagedChannelProvider中以硬编码形式确定的,这里其实存在利用反射改进的空间。

    private static final class HardcodedClasses implements Iterable<Class<?>> {
        @Override
        public Iterator<Class<?>> iterator() {
            List<Class<?>> list = new ArrayList<>();
            try {
                list.add(Class.forName("io.grpc.okhttp.OkHttpChannelProvider"));
            } catch (ClassNotFoundException ex) {
                // ignore
            }
            try {
                list.add(Class.forName("io.grpc.netty.NettyChannelProvider"));
            } catch (ClassNotFoundException ex) {
            // ignore
            }
            return list.iterator();
        }
    }
    

    实际上就根据依赖的jar包不同就只有两个实现,一个netty的,一个okhttp的。如果像入门示例项目一样只配置了netty实现,那就只有netty的。io.grpc.netty.NettyChannelProvider的buildForTarget方法调用的是io.grpc.netty.NettyChannelBuilderforTarget方法。

    public NettyChannelBuilder builderForTarget(String target) {
        return NettyChannelBuilder.forTarget(target);
    }
    

    io.grpc.netty.NettyChannelBuilder继承自io.grpc.internal.AbstractManagedChannelImplBuilderforTarget方法实际上调用了父类的构造函数。

    NettyChannelBuilder(String target) {
        super(target);
    }
    
    public static NettyChannelBuilder forTarget(String target) {
        return new NettyChannelBuilder(target);
    }
    

    io.grpc.internal.AbstractManagedChannelImplBuilder的构造函数最终会是把参数赋值给target变量。

    protected AbstractManagedChannelImplBuilder(String target) {
        this.target = Preconditions.checkNotNull(target, "target");
        this.directServerAddress = null;
    }
    

    build方法

    从前文可以看到,实际初始化的io.grpc.ManagedChannelBuilder实际上是io.grpc.netty.NettyChannelBuilder,其build方法实现在其父类io.grpc.internal.AbstractManagedChannelImplBuilder中。

    public ManagedChannel build() {
        return new ManagedChannelOrphanWrapper(new ManagedChannelImpl(
            this,
            buildTransportFactory(),
            // TODO(carl-mastrangelo): Allow clients to pass this in
            new ExponentialBackoffPolicy.Provider(),
            SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
            GrpcUtil.STOPWATCH_SUPPLIER,
            getEffectiveInterceptors(),
            TimeProvider.SYSTEM_TIME_PROVIDER));
    }
    

    这里的io.grpc.internal.ManagedChannelOrphanWrapperio.grpc.internal.ManagedChannelImpl其实都是io.grpc.ManagedChannel的实现。io.grpc.internal.ManagedChannelOrphanWrapper从功能上分析没有任何作用,io.grpc.internal.ManagedChannelOrphanWrapper会为io.grpc.ManagedChannel创建弱引用,并被放置到ReferenceQueue中。如果Channel是单例的,那么意义不大;如果客户端被重复创建却没有被关闭,那么ReferenceQueue中会留下相应的引用记录,可能有助于排查问题。

    io.grpc.internal.ManagedChannelImpl构造方法的几个参数中,除了第一个参数是builder本身,第二个参数是用来创建Transport的Factory,第三个参数是后台连接重试策略,第四个参数是gRPC的全局线程池,第五个和第七个都是和时间相关的对象,主要用于日志中,第六个是客户端调用时的interceptor。在io.grpc.netty.NettyChannelBuilder中,buildTransportFactory方法会创建一个io.grpc.netty.NettyChannelBuilder.NettyTransportFactory

    服务发现

    前文的入门示例中直接写了target,只能连接单个Server。如果有多个可以提供服务的Server,那么就需要有一种方式通过单个target发现这些Server。在io.grpc.ManagedChannelBuilder中有一个nameResolverFactory方法,可以用来指定如何解析target地址,发现多个服务端。

    nameResolverFactory方法

    这个方法的实现也在io.grpc.internal.AbstractManagedChannelImplBuilder中,如果用户有自己的io.grpc.NameResolver.Factory实现的话可以通过nameResolverFactory方法指定,gRPC就会使用用户自己的io.grpc.NameResolver.Factroy实现代替gRPC自己的默认实现,否则会使用io.grpc.NameResolverRegistry中的默认实现。

    io.grpc.NameResolverRegistry会通过硬编码加载io.grpc.NameResolverProvider实现,并创建一个与之有关的io.grpc.NameResolver.Factory的实现。目前硬编码加载的io.grpc.NameResolverProvider实现只有io.grpc.internal.DnsNameResolverProvider一种。

    private final NameResolver.Factory factory = new NameResolverFactory();
    @GuardedBy("this")
    private final LinkedHashSet<NameResolverProvider> allProviders = new LinkedHashSet<>();
    
    private synchronized void addProvider(NameResolverProvider provider) {
        checkArgument(provider.isAvailable(), "isAvailable() returned false");
        allProviders.add(provider);
    }
    
    public static synchronized NameResolverRegistry getDefaultRegistry() {
        if (instance == null) {
            List<NameResolverProvider> providerList = ServiceProviders.loadAll(
                NameResolverProvider.class,
                getHardCodedClasses(),
                NameResolverProvider.class.getClassLoader(),
                new NameResolverPriorityAccessor());
            if (providerList.isEmpty()) {
                logger.warning("No NameResolverProviders found via ServiceLoader, including for DNS. This "
                + "is probably due to a broken build. If using ProGuard, check your configuration");
            }
            instance = new NameResolverRegistry();
            for (NameResolverProvider provider : providerList) {
                logger.fine("Service loader found " + provider);
                if (provider.isAvailable()) {
                    instance.addProvider(provider);
                }
            }
            instance.refreshProviders();
        }
        return instance;
    }  
    
    public NameResolver.Factory asFactory() {
        return factory;
    }
    
    @VisibleForTesting
    static List<Class<?>> getHardCodedClasses() {
        ArrayList<Class<?>> list = new ArrayList<>();
        try {
            list.add(Class.forName("io.grpc.internal.DnsNameResolverProvider"));
        } catch (ClassNotFoundException e) {
            logger.log(Level.FINE, "Unable to find DNS NameResolver", e);
        }
        return Collections.unmodifiableList(list);
    }
    
    private final class NameResolverFactory extends NameResolver.Factory {
        @Override
        @Nullable
        public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
            List<NameResolverProvider> providers = providers();
            for (NameResolverProvider provider : providers) {
                NameResolver resolver = provider.newNameResolver(targetUri, args);
                if (resolver != null) {
                    return resolver;
                }
            }
            return null;
        }
    
        @Override
        public String getDefaultScheme() {
            List<NameResolverProvider> providers = providers();
            if (providers.isEmpty()) {
                return "unknown";
            }
            return providers.get(0).getDefaultScheme();
        }
    }
    

    getDefaultSchema会匹配target中的schema(如dns),如果匹配的上,就使用相应的NameResolver.Factory,返回NameResolver决定真正的服务访问地址。

    io.grpc.NameResolver

    我们来看io.grpc.NameResolver

    public abstract class NameResolver {
    
        public abstract String getServiceAuthority();
        
        public void start(final Listener listener) {
        if (listener instanceof Listener2) {
                start((Listener2) listener);
            } else {
                start(new Listener2() {
                    @Override
                    public void onError(Status error) {
                    listener.onError(error);
                    }
    
                    @Override
                    public void onResult(ResolutionResult resolutionResult) {
                        listener.onAddresses(resolutionResult.getAddresses(), resolutionResult.getAttributes());
                    }
                });
            }
        }
    
        public void start(Listener2 listener) {
            start((Listener) listener);
        }
        
        public abstract void shutdown();
        
        public void refresh() {}
        
        @ThreadSafe
        public interface Listener {
    
            void onAddresses(List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes);
    
            void onError(Status error);
        }
    
        public abstract static class Listener2 implements Listener {
    
            @Override
            public final void onAddresses(
                List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes) {
                onResult(
                ResolutionResult.newBuilder().setAddresses(servers).setAttributes(attributes).build());
            }
    
            public abstract void onResult(ResolutionResult resolutionResult);
    
            @Override
            public abstract void onError(Status error);
        }
    
        public static final class ResolutionResult {
            private final List<EquivalentAddressGroup> addresses;
            @ResolutionResultAttr
            private final Attributes attributes;
            @Nullable
            private final ConfigOrError serviceConfig;
    
            ResolutionResult(
                List<EquivalentAddressGroup> addresses,
                @ResolutionResultAttr Attributes attributes,
                ConfigOrError serviceConfig) {
                this.addresses = Collections.unmodifiableList(new ArrayList<>(addresses));
                this.attributes = checkNotNull(attributes, "attributes");
                this.serviceConfig = serviceConfig;
            }
    
            public static Builder newBuilder() {
              return new Builder();
            }
    
            public Builder toBuilder() {
              return newBuilder()
                  .setAddresses(addresses)
                  .setAttributes(attributes)
                  .setServiceConfig(serviceConfig);
            }
    
            public List<EquivalentAddressGroup> getAddresses() {
              return addresses;
            }
    
            @ResolutionResultAttr
            public Attributes getAttributes() {
              return attributes;
            }
    
            @Nullable
            public ConfigOrError getServiceConfig() {
              return serviceConfig;
            }
    
            @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
            public static final class Builder {
                private List<EquivalentAddressGroup> addresses = Collections.emptyList();
                private Attributes attributes = Attributes.EMPTY;
                @Nullable
                private ConfigOrError serviceConfig;
                Builder() {}
    
                public Builder setAddresses(List<EquivalentAddressGroup> addresses) {
                    this.addresses = addresses;
                    return this;
                }
    
                public Builder setAttributes(Attributes attributes) {
                    this.attributes = attributes;
                    return this;
                }
    
                public Builder setServiceConfig(@Nullable ConfigOrError serviceConfig) {
                    this.serviceConfig = serviceConfig;
                    return this;
                }
    
                public ResolutionResult build() {
                    return new ResolutionResult(addresses, attributes, serviceConfig);
                }
            }
        }
    }
    

    在客户端首次连接服务端的时候会调用Listener2start方法,需要更新的时候会调用refresh方法。当Listener2接收到服务端地址时,会调用onResult方法。

    io.grpc.internal.DnsNameResolver

    由于gRPC支持长连接,所以如果直连的话只会访问一个域名下的一台服务器,即首次连接时通过DNS返回IP地址。io.grpc.internal.DnsNameResolverProvider是对io.grpc.internal.DnsNameResolver的简单封装,只支持以dns:///开头的地址。io.grpc.internal.DnsNameResolver会根据target获取该host下所有关联的IP,即通过DNS解析出所有的服务端IP地址。

    public final class DnsNameResolverProvider extends NameResolverProvider {
    
      private static final String SCHEME = "dns";
    
      @Override
      public DnsNameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
        if (SCHEME.equals(targetUri.getScheme())) {
          String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
          Preconditions.checkArgument(targetPath.startsWith("/"),
              "the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
          String name = targetPath.substring(1);
          return new DnsNameResolver(
              targetUri.getAuthority(),
              name,
              args,
              GrpcUtil.SHARED_CHANNEL_EXECUTOR,
              Stopwatch.createUnstarted(),
              InternalServiceProviders.isAndroid(getClass().getClassLoader()));
        } else {
          return null;
        }
      }
    
      @Override
      public String getDefaultScheme() {
        return SCHEME;
      }
    
      @Override
      protected boolean isAvailable() {
        return true;
      }
    
      @Override
      protected int priority() {
        return 5;
      }
    }
    

    可以看到io.grpc.internal.DnsNameResolver中的startrefresh方法都调用的是resolve方法,而resolve方法是执行了一个继承自RunnableResolve接口。

    DnsNameResolver

    在有代理的情况下,ResolveresolveInternal会根据代理返回的ProxiedSocketAddress创建EquivalentAddressGroup作为服务端列表返回,并设置空config;否则会调用resolveAll方法获取服务端列表,并调用parseServiceConfig方法设置config。resolveAll方法返回的ResolutionResults有三个变量addressestxtRecordsbalancerAddresses

    @VisibleForTesting
    static ResolutionResults resolveAll(
        AddressResolver addressResolver,
        @Nullable ResourceResolver resourceResolver,
        boolean requestSrvRecords,
        boolean requestTxtRecords,
        String name) {
        List<? extends InetAddress> addresses = Collections.emptyList();
        Exception addressesException = null;
        List<EquivalentAddressGroup> balancerAddresses = Collections.emptyList();
        Exception balancerAddressesException = null;
        List<String> txtRecords = Collections.emptyList();
        Exception txtRecordsException = null;
    
        try {
            addresses = addressResolver.resolveAddress(name);
        } catch (Exception e) {
        addressesException = e;
        }
        if (resourceResolver != null) {
            if (requestSrvRecords) {
                try {
                    balancerAddresses =
                        resourceResolver.resolveSrv(addressResolver, GRPCLB_NAME_PREFIX + name);
                } catch (Exception e) {
                    balancerAddressesException = e;
                }
            }
            if (requestTxtRecords) {
                boolean balancerLookupFailedOrNotAttempted =
                    !requestSrvRecords || balancerAddressesException != null;
                boolean dontResolveTxt =
                    (addressesException != null) && balancerLookupFailedOrNotAttempted;
                if (!dontResolveTxt) {
                    try {
                        txtRecords = resourceResolver.resolveTxt(SERVICE_CONFIG_NAME_PREFIX + name);
                    } catch (Exception e) {
                    txtRecordsException = e;
                    }
                }   
            }
        }
        try {
            if (addressesException != null
                && (balancerAddressesException != null || balancerAddresses.isEmpty())) {
                Throwables.throwIfUnchecked(addressesException);
                throw new RuntimeException(addressesException);
            }
        } finally {
            if (addressesException != null) {
                logger.log(Level.FINE, "Address resolution failure", addressesException);
            }
            if (balancerAddressesException != null) {
                logger.log(Level.FINE, "Balancer resolution failure", balancerAddressesException);
            }
            if (txtRecordsException != null) {
                logger.log(Level.FINE, "ServiceConfig resolution failure", txtRecordsException);
            }
        }
        return new ResolutionResults(addresses, txtRecords, balancerAddresses);
    }
    

    addressResolverresolveAddress方法实际是调用JDK的java.net.InetAddressgetAllByName方法,即根据host通过DNS返回一系列服务端列表。resourceResolver根据LDAP协议获取指定命名空间下的服务端列表地址。txtRecordsbalancerAddresses是和LDAP相关的参数,方法入参requestSrvRecordsrequestTxtRecords的默认值都是false。由于LDAP不是特别常用,这里就不深入展开了。

    NameResolverListeneronResult

    NameResolverListener获取解析结果后会调用onResult方法,进而会调用io.grpc.LoadBalancerhandleResolvedAddresses方法。

    获取解析结果后调用handleResolvedAddresses方法

    负载均衡

    io.grpc.ManagedChannel初始化的时候可以通过defaultLoadBalancingPolicy方法指定负载均衡策略,实际是根据defaultLoadBalancingPolicy创建了一个io.grpc.internal.AutoConfiguredLoadBalancerFactory对象。io.grpc.internal.AutoConfiguredLoadBalancerFactory则通过io.grpc.LoadBalancerRegistry获取对应名称的负载均衡策略。io.grpc.LoadBalancerProvidergetPolicyName方法指定负载均衡策略名称,newLoadBalancer返回负载均衡io.grpc.LoadBalancer的具体实现。如果想要添加自定义负载均衡策略,需要调用io.grpc.LoadBalancerRegistryregistry方法,并自己实现io.grpc.LoadBalancerProviderio.grpc.LoadBalancer,并指定负载均衡策略名称即可。

    defaultLoadBalancingPolicy方法

    io.grpc.LoadBalancer.SubchannelPicker

    io.grpc.LoadBalancer的核心逻辑实际在SubchannelPicker中。pickSubchannel方法会返回的PickResult中包含真正可用的subchannel,用来进行后续的数据传输。

    public abstract static class SubchannelPicker {
        /**
        * Make a balancing decision for a new RPC.
        *
        * @param args the pick arguments
        * @since 1.3.0
        */
        public abstract PickResult pickSubchannel(PickSubchannelArgs args);
    }
    

    gRPC默认提供了两种负载均衡实现策略:prick_firstround_robin。前者总会使用第一个可用的服务端,后者则是简单轮询。

    handleResolvedAddresses

    当服务端列表更新时,会调用io.grpc.LoadBalancerhandleResolvedAddresses方法更新可用的subchannel。

    @Override
    public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
        List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
        if (subchannel == null) {
            final Subchannel subchannel = helper.createSubchannel(
            CreateSubchannelArgs.newBuilder()
                .setAddresses(servers)
                .build());
            subchannel.start(new SubchannelStateListener() {
                @Override
                public void onSubchannelState(ConnectivityStateInfo stateInfo) {
                    processSubchannelState(subchannel, stateInfo);
                }
            });
            this.subchannel = subchannel;
            helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
            subchannel.requestConnection();
        } else {
            subchannel.updateAddresses(servers);
        }
    }
    

    如果是首次调用(subchannel == null) 会创建subchannel,其实现是io.grpc.internal.ManagedChannelImpl.SubchannelImpl,创建的过程中会创建io.grpc.internal.InternalSubchannel。然后调用io.grpc.internal.ManagedChannelImplupdateBalancingState方法,把subchannelPicker更新为实现Picker,然后开启subchannel的连接。

    开启subchannel连接

    在开启subchannel的连接过程中,会调用io.grpc.internal.InternalSubchannelobtainActiveTransport方法。

    这里的transportFactory就是上面提到io.grpc.ManagedChannelBuilder调用build初始化时调用buildTransportFactory方法返回的,依赖于Transport层的具体实现。在netty实现中,返回的是io.grpc.netty.NettyClientTransport

    传输

    gRPC客户端发起Request时,stub会调用ClientCallsstartCall方法,最终会调用io.grpc.internal.ManagedChannelImpl.ChannelTransportProviderget方法获取io.grc.internal.ClientTransport

    gRPC客户端发起Request时调用ChannelTransportProvider的get方法
    public ClientTransport get(PickSubchannelArgs args) {
    SubchannelPicker pickerCopy = subchannelPicker;
        if (shutdown.get()) {
            return delayedTransport;
        }
        if (pickerCopy == null) {
            final class ExitIdleModeForTransport implements Runnable {
                @Override
                public void run() {
                    exitIdleMode();
                }
            }
            syncContext.execute(new ExitIdleModeForTransport());
            return delayedTransport;
        }
        PickResult pickResult = pickerCopy.pickSubchannel(args);
        ClientTransport transport = GrpcUtil.getTransportFromPickResult(
            pickResult, args.getCallOptions().isWaitForReady());
        if (transport != null) {
            return transport;
        }
        return delayedTransport;
    }
    

    如果subchannelPicker存在,会使用subchannelPicker进行选择;如果是首次访问服务端时subchannel肯定不存在,会使用syncContext异步执行exitIdleMode方法初始化。syncContext是一个单线程执行队列,可以保证先提交的任务先执行。delayedTransport的执行也依赖于syncContext,这就保证了delayedTransport中的方法执行一定会在exitIdleMode方法之后。

    首次访问服务端时执行exidIdleMode方法

    exitIdleMode方法会初始化NameResolverLoadBalancer,并会启动NameResolverListener。当解析完成后会调用NameResolverListeneronResult方法,进而调用LoadBalancerhandleResolvedAddresses方法创建subchannelPicker、创建并连接subchannel。

    @VisibleForTesting
    void exitIdleMode() {
        syncContext.throwIfNotInThisSynchronizationContext();
        if (shutdown.get() || panicMode) {
            return;
        }
        if (inUseStateAggregator.isInUse()) {
            cancelIdleTimer(false);
        } else {
            rescheduleIdleTimer();
        }
        if (lbHelper != null) {
            return;
        }
        channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
        LbHelperImpl lbHelper = new LbHelperImpl();
        lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
        this.lbHelper = lbHelper;
    
        NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
        nameResolver.start(listener);
        nameResolverStarted = true;
    }
    

    Request

    发送Request时会调用ConnectionClientTransportnewStream方法返回一个io.grpc.internal.ClientStream对象,而首次调用会通过delayedTransport延迟调用newStream方法。

    调用newStream的调用栈

    netty实现会返回一个io.grpc.netty.shaded.io.grpc.netty.NettyClientStream对象。io.grpc.internal.ClientStream下有两个子类,TransportState负责处理传输状态,Sink负责写入数据。

    在进行一系列http2相关设置后,会调用io.grpc.internal.ClientStreamstart方法,为TransportState设置监听并通过Sink写入Header。

    @Override
    public final void start(ClientStreamListener listener) {
        transportState().setListener(listener);
        if (!useGet) {
            abstractClientStreamSink().writeHeaders(headers, null);
            headers = null;
        }
    }
    

    初始化结束后,调用requestObserver的onNext方法会调用io.grpc.internal.ClientCallImplsendMessage方法,将protobuf对象转换成InputStream,并作为参数调用io.grpc.internal.ClientStreamwriteMessage方法,进而调用io.grpc.internal.MessageFramerwritePayload方法,最终调用writeToOutputStream方法将内容写入Http的OutputStream。如果是参数是stream形式会继续调用flush。

    onNext

    调用requestObserver的onCompleted方法会调用io.grpc.internal.ClientCallImplhalfClose方法,进而会调用io.grpc.internal.MessageFramerendOfMessages,flush并结束发送消息。

    onComplete

    Response

    onNext

    客户端接受到Response会调用ClientStreamListener的messagesAvailable方法,并通过同步线程池最终调用StreamObserver的onNext方法接收数据。

    response-on-complete.png

    当返回结束时会调用TransportState的transportReportStatus方法关闭请求,进而调用ClientStreamListener的closed方法关闭监听,进而调用StreamObserver的onClose方法。

    gRPC通信格式

    gRPC发送的请求发送方法是POST,路径是/{serviceName}/{methodName},content-type为content-type = application/grpc+proto。

    Request

    HEADERS (flags = END_HEADERS)
    :method = POST
    :scheme = http
    :path = /RouteGuide/getPoint
    grpc-timeout = 1S
    content-type = application/grpc+proto
    grpc-encoding = gzip
    
    DATA (flags = END_STREAM)
    <Length-Prefixed Message>
    

    Response

    HEADERS (flags = END_HEADERS)
    :status = 200
    grpc-encoding = gzip
    content-type = application/grpc+proto
    
    DATA
    <Length-Prefixed Message>
    
    HEADERS (flags = END_STREAM, END_HEADERS)
    grpc-status = 0 # OK
    trace-proto-bin = jher831yy13JHy3hc
    

    扩展gRPC

    自定义基于zookeeper的NameResolver.Factory实现

    public class CuratorNameResolver extends NameResolver {
        CuratorFramework curatorFramework;
        String basePath;
        String serviceAuthority;
        private Listener2 listener;
    
        public CuratorNameResolver(CuratorFramework curatorFramework, String basePath, String serviceAuthority) {
            this.curatorFramework = curatorFramework;
            this.basePath = basePath;
            this.serviceAuthority = serviceAuthority;
        }
    
        @Override
        public void start(Listener2 listener) {
            this.curatorFramework.start();
            this.listener = listener;
            refresh();
        }
    
        @Override
        public void refresh() {
            List<EquivalentAddressGroup> servers = new ArrayList<>();
            try {
                List<EquivalentAddressGroup> addresses = curatorFramework.getChildren()
                        .forPath(basePath)
                        .stream().map(address ->{
                            try {
                                URI uri = new URI("http://" + address);
                                return new EquivalentAddressGroup(
                                    new InetSocketAddress(uri.getHost(), uri.getPort()));
                            }catch (Exception e){
                                listener.onError(Status.INTERNAL);
                                return null;
                            }
                        }).collect(Collectors.toList());
                listener.onResult(ResolutionResult.newBuilder().setAddresses(addresses).build());
    
            } catch (Exception e) {
                listener.onError(Status.INTERNAL);
            }
        }
    
        @Override
        public String getServiceAuthority() {
            return this.serviceAuthority;
        }
    
        @Override
        public void shutdown() {
            this.curatorFramework.close();
        }
    
        public static class Factory extends NameResolver.Factory{
            @Override
            public NameResolver newNameResolver(URI targetUri, Args args) {
                String address = targetUri.getHost() + ":" + targetUri.getPort();
                String authority = null == targetUri.getAuthority() ? address : targetUri.getAuthority();
                CuratorFramework curator = CuratorFrameworkFactory.builder()
                        .connectString(address)
                        .retryPolicy(new ExponentialBackoffRetry(1000, 5))
                        .connectionTimeoutMs(1000)
                        .sessionTimeoutMs(60000)
                        .build();
                return new CuratorNameResolver(curator, targetUri.getPath(), authority);
            }
    
            @Override
            public String getDefaultScheme() {
                return "zookeeper";
            }
        }
    }
    

    自定义随机负载均衡实现

    public class RandomLoadBalancer extends LoadBalancer{
        LoadBalancer.Helper helper;
    
        private final Map<EquivalentAddressGroup, Subchannel> subchannels =
                new HashMap<>();
        static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
                Attributes.Key.create("state-info");
    
        public RandomLoadBalancer(LoadBalancer.Helper helper) {
            this.helper = helper;
        }
        @Override
        public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
            List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
            for(EquivalentAddressGroup server : servers){
                List<EquivalentAddressGroup> serverSingletonListt = Collections.singletonList(server);
                Subchannel exists = subchannels.getOrDefault(server, null);
                if(null != exists){
                    exists.updateAddresses(serverSingletonListt);
                    continue;
                }
                Attributes.Builder subchannelAttrs = Attributes.newBuilder()
                        .set(STATE_INFO,
                                new Ref<>(ConnectivityStateInfo.forNonError(IDLE)));
                final Subchannel subchannel = helper.createSubchannel(CreateSubchannelArgs.newBuilder()
                                .setAddresses(serverSingletonListt)
                                .setAttributes(subchannelAttrs.build())
                                .build());
                subchannels.put(server, subchannel);
                subchannel.start(new SubchannelStateListener() {
                    @Override
                    public void onSubchannelState(ConnectivityStateInfo state) {
                        for(Map.Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()){
                            if(subchannel == entry.getValue()){
                                if (state.getState() == SHUTDOWN) {
                                    subchannels.remove(entry.getKey());
                                }
                                if (state.getState() == IDLE) {
                                    subchannel.requestConnection();
                                }
                                subchannel.getAttributes().get(STATE_INFO).value = state;
                                updateBalancingState();
                                return;
                            }
                        }
                    }
                });
                subchannel.requestConnection();
            }
            updateBalancingState();
        }
        @Override
        public void handleNameResolutionError(Status error) {
            shutdown();
            helper.updateBalancingState(TRANSIENT_FAILURE, new SubchannelPicker() {
                @Override
                public PickResult pickSubchannel(PickSubchannelArgs args) {
                    return PickResult.withError(error);
                }
            });
        }
    
        private  void updateBalancingState(){
            boolean ready = true;
            for(Subchannel subchannel : this.subchannels.values()){
                if(subchannel.getAttributes().get(STATE_INFO).value.getState() != READY){
                    helper.updateBalancingState(CONNECTING, new RandomSubchannelPick(subchannels.values()));
                    return;
                }
            }
            helper.updateBalancingState(ConnectivityState.READY, new RandomSubchannelPick(subchannels.values()));
        }
    
        @Override
        public void shutdown() {
            for(Iterator<Map.Entry<EquivalentAddressGroup, Subchannel>> itr = subchannels.entrySet().iterator(); itr.hasNext();){
                Map.Entry<EquivalentAddressGroup, Subchannel> e = itr.next();
                e.getValue().shutdown();
                itr.remove();
            }
    
        }
    
        class RandomSubchannelPick extends SubchannelPicker{
            Subchannel[] subchannels;
            Random random = new Random(System.currentTimeMillis());
    
            public RandomSubchannelPick(Collection<Subchannel> subchannels) {
                this.subchannels = subchannels.stream().toArray(Subchannel[]::new);
            }
    
            @Override
            public PickResult pickSubchannel(PickSubchannelArgs args) {
                int idx = random.nextInt(subchannels.length);
                return PickResult.withSubchannel(subchannels[idx]);
            }
        }
    
        public static class Provider extends LoadBalancerProvider{
    
            @Override
            public boolean isAvailable() {
                return true;
            }
    
            @Override
            public int getPriority() {
                return 100;
            }
    
            @Override
            public String getPolicyName() {
                return "random";
            }
    
            @Override
            public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
                return new RandomLoadBalancer(helper);
            }
        }
    
        static final class Ref<T> {
            T value;
    
            Ref(T value) {
                this.value = value;
            }
        }
    }
    

    服务端初始化

    服务端需要把自己的服务地址注册到zookeeper。

    private final int port;
    private final Server server;
    private String registryPath;
    private String address;
    CuratorFramework curator = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 5))
            .connectionTimeoutMs(1000)
            .sessionTimeoutMs(60000)
            .build();;
    
    public GreetingServer(int port, String registryPath) throws IOException {
        this.port = port;
        server = ServerBuilder.forPort(port).addService(new GreetingService())
                .build();
        this.registryPath = registryPath;
        this.address =  "localhost:" + port;    //本机网卡不能正确显示地址,直接写死localhost
    }
    
    /**
     * Start server.
     */
    public void start() throws Exception {
        this.curator.start();
        server.start();;
        this.curator.create()
                .creatingParentContainersIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(registryPath + "/" + address, ("http://" + address).getBytes());
    
        System.out.println("Server started, listening on " + address);
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                GreetingServer.this.stop();
            }
        });
    }
    

    客户端初始化

    客户端需要注册自定义的NameResolverFactory和LoadBalancer。

    public GreetingClient(String host, int port, String path) {
        String target = "zookeeper://" + host + ":" + port + path;
        CuratorNameResolver.Factory factory = new CuratorNameResolver.Factory();
    
        LoadBalancerRegistry.getDefaultRegistry().register(new RandomLoadBalancer.Provider());
        ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
                .forTarget(target)
                .nameResolverFactory(factory)
                .defaultLoadBalancingPolicy("random")
                .usePlaintext();
        channel = channelBuilder.build();
        blockingStub = GreetingGrpc.newBlockingStub(channel);
    }
    

    参考资料

    相关文章

      网友评论

          本文标题:gRPC客户端详解

          本文链接:https://www.haomeiwen.com/subject/egknpctx.html