java gRPC详解

作者: 迷糊银儿 | 来源:发表于2020-07-09 16:04 被阅读0次

0.准备工作

0.1 安装protoc插件

请参考 https://www.jianshu.com/p/1a9fa148d94f

1.定义服务

1.1 proto文件
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld"; #为我们生成 Java 类使用的
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;

// The greeting service definition.   //服务
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}   //方法
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}
1.2 四种service方法

      1.2.1 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。

// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}

      1.2.2 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。

// Obtains the Features available within the given Rectangle.  Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}

      1.2.3 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。

// Accepts a stream of Points on a route being traversed, returning a
 // RouteSummary when traversal is completed.
 rpc RecordRoute(stream Point) returns (RouteSummary) {}

      1.2.4 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。

// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

2.生成客户端和服务器端代码

2.1 用命令mvn install生成代码,在target目录下
2.2 生成的类包含

     2.2.1 包含了所有填充,序列化以及获取请求和应答的消息类型的Feature.java,Point.java, Rectangle.java以及其它类文件。
     2.2.2 RouteGuideGrpc.java 文件包含(以及其它一些有用的代码):
         a. RouteGuide 服务器要实现的一个接口 RouteGuideGrpc.RouteGuide,其中所有的方法都定 义在RouteGuide服务中。
         b. 客户端可以用来和RouteGuide服务器交互的 存根 类。 异步的存根也实现了 RouteGuide 接口。

3. 创建服务器

让 RouteGuide 服务工作有两个部分:
      a. 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”。
      b. 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。

3.1 实现RouteGuide

     3.1.1 我们的服务器有一个实现了生成的 RouteGuideGrpc.Service 接口的RouteGuideService类:

private static class RouteGuideService implements RouteGuideGrpc.RouteGuide {
...
}

     3.1.2 简单rpc方法
最简单的类型 GetFeature,它从客户端拿到一个 Point 对象,然后从返回包含从数据库拿到的feature信息的 Feature。

@Override
    public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
      responseObserver.onNext(checkFeature(request));
      responseObserver.onCompleted();
    }

...

    private Feature checkFeature(Point location) {
      for (Feature feature : features) {
        if (feature.getLocation().getLatitude() == location.getLatitude()
            && feature.getLocation().getLongitude() == location.getLongitude()) {
          return feature;
        }
      }

      // No feature was found, return an unnamed feature.
      return Feature.newBuilder().setName("").setLocation(location).build();
    }

getFeature() 接收两个参数:(1) Point:请求 ; (2)StreamObserver<Feature>: 一个应答的观察者,实际上是服务器调用它应答的一个特殊接口。
要将应答返回给客户端,并完成调用:
如在我们的服务定义中指定的那样,我们组织并填充一个 Feature 应答对象返回给客户端。在这个 例子中,我们通过一个单独的私有方法checkFeature()来实现。
我们使用应答观察者的 onNext() 方法返回 Feature。
我们使用应答观察者的 onCompleted() 方法来指出我们已经完成了和 RPC的交互。

     3.1.3 服务端流式rpc方法
     ListFeatures 是一个服务器端的流式 RPC,所以我们需要将多个 Feature 发回给客户端。

private final Collection<Feature> features;

...

    @Override
    public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
      int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
      int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
      int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
      int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

      for (Feature feature : features) {
        if (!RouteGuideUtil.exists(feature)) {
          continue;
        }

        int lat = feature.getLocation().getLatitude();
        int lon = feature.getLocation().getLongitude();
        if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
          responseObserver.onNext(feature);
        }
      }
      responseObserver.onCompleted();
    }

和简单 RPC 类似,这个方法拿到了一个请求对象(客户端期望从 Rectangle 找到 Feature)和一个应答观察者 StreamObserver。

这次我们得到了需要返回给客户端的足够多的 Feature 对象(在这个场景下,我们根据他们是否在我们的 Rectangle 请求中,从服务的特性集合中选择他们),并且使用 onNext() 方法轮流往响应观察者写入。最后,和简单 RPC 的例子一样,我们使用响应观察者的 onCompleted() 方法去告诉 gRPC 写入应答已完成。
     3.1.4 客户端流式rpc方法
客户端流方法 RecordRoute,我们通过它可以从客户端拿到一个 Point 的流,并且返回一个包括它们路径的信息 RouteSummary。


 @Override
    public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
      return new StreamObserver<Point>() {
        int pointCount;
        int featureCount;
        int distance;
        Point previous;
        long startTime = System.nanoTime();

        @Override
        public void onNext(Point point) {
          pointCount++;
          if (RouteGuideUtil.exists(checkFeature(point))) {
            featureCount++;
          }
          // For each point after the first, add the incremental distance from the previous point
          // to the total distance value.
          if (previous != null) {
            distance += calcDistance(previous, point);
          }
          previous = point;
        }

        @Override
        public void onError(Throwable t) {
          logger.log(Level.WARNING, "Encountered error in recordRoute", t);
        }

        @Override
        public void onCompleted() {
          long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
          responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
              .setFeatureCount(featureCount).setDistance(distance)
              .setElapsedTime((int) seconds).build());
          responseObserver.onCompleted();
        }
      };
    }

如你所见,这次这个方法没有请求参数。相反的,它拿到了一个 RouteGuide_RecordRouteServer 流,服务器可以用它来同时读 和 写消息——它可以用自己的 Recv() 方法接收客户端消息并且用 SendAndClose() 方法返回它的单个响应。
如你所见,我们的方法和前面的方法类型相似,拿到一个 StreamObserver 应答观察者参数,但是这次它返回一个 StreamObserver 以便客户端写入它的 Point。
在这个方法体中,我们返回了一个匿名 StreamObserver 实例,其中我们:
覆写了 onNext() 方法,每次客户端写入一个 Point 到消息流时,拿到特性和其它信息。
覆写了 onCompleted() 方法(在 客户端 结束写入消息时调用),用来填充和构建我们的 RouteSummary。然后我们用 RouteSummary 调用方法自己的的响应观察者的 onNext(),之后调用它的 onCompleted() 方法,结束服务器端的调用。
     3.1.5 双流式rpc方法

@Override
    public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
      return new StreamObserver<RouteNote>() {
        @Override
        public void onNext(RouteNote note) {
          List<RouteNote> notes = getOrCreateNotes(note.getLocation());

          // Respond with all previous notes at this location.
          for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
            responseObserver.onNext(prevNote);
          }

          // Now add the new note to the list
          notes.add(note);
        }

        @Override
        public void onError(Throwable t) {
          logger.log(Level.WARNING, "Encountered error in routeChat", t);
        }

        @Override
        public void onCompleted() {
          responseObserver.onCompleted();
        }
      };
    }

和我们的客户端流的例子一样,我们拿到和返回一个 StreamObserver 应答观察者,除了这次我们在客户端仍然写入消息到 它们的 消息流时通过我们方法的应答观察者返回值。这里读写的语法和客户端流以及服务器流方法一样。虽然每一端都会按照它们写入的顺序拿到另一端的消息,客户端和服务器都可以任意顺序读写——流的操作是互不依赖的。

4.启动服务器

public void start() {
    gRpcServer = NettyServerBuilder.forPort(port)
        .addService(RouteGuideGrpc.bindService(new RouteGuideService(features)))
        .build().start();
    logger.info("Server started, listening on " + port);
    ...
  }

我们用一个 NettyServerBuilder 构建和启动服务器。这个服务器的生成器基于 Netty 传输框架。

为了做到这个,我们需要:

  1. 创建我们服务实现类 RouteGuideService 的一个实例并且将其传给生成的 RouteGuideGrpc 类的静态方法 bindService() 去获得服务定义。
  2. 使用生成器的 forPort() 方法指定地址以及期望客户端请求监听的端口。
  3. 通过传入将 bindService() 返回的服务定义,用生成器注册我们的服务实现到生成器的 addService() 方法。
  4. 调用生成器上的 build()start() 方法为我们的服务创建和启动一个 RPC 服务器。

5. 创建客户端

5.1创建存根

为了调用服务方法,我们需要首先创建一个 存根,或者两个存根:

  • 一个 阻塞/同步 存根:这意味着 RPC 调用等待服务器响应,并且要么返回应答,要么造成异常。
  • 一个 非阻塞/异步 存根可以向服务器发起非阻塞调用,应答会异步返回。你可以使用异步存根去发起特定类型的流式调用。

我们首先为存根创建一个 gRPC channel,指明服务器地址和我们想连接的端口号:

 channel = NettyChannelBuilder.forAddress(host, port)
        .negotiationType(NegotiationType.PLAINTEXT)
        .build();

如你所见,我们用一个 NettyServerBuilder 构建和启动服务器。这个服务器的生成器基于 Netty 传输框架。
我们使用 Netty 传输框架,所以我们用一个 NettyServerBuilder 启动服务器。
现在我们可以通过从 .proto 中生成的 RouteGuideGrpc 类的 newStubnewBlockingStub 方法,使用频道去创建我们的存根。

    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
5.2 调用服务方法

简单 RPC
在阻塞存根上调用简单 RPC GetFeature 几乎是和调用一个本地方法一样直观。

  Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
  Feature feature = blockingStub.getFeature(request);

我们创建和填充了一个请求 protocol buffer 对象(在这个场景下是 Point),在我们的阻塞存根上将其传给 getFeature() 方法,拿回一个 Feature。

相关文章

  • java gRPC详解

    官网文档流程图 自己实现的代码地址:https://gitee.com/neimenggudaxue/grpc3 ...

  • gRpc

    gRPC(google+remote process call) 详解 grpc 简介 grpc是一个高性能、开...

  • gRPC详解

    gRPC是什么? gRPC是什么可以用官网的一句话来概括 A high-performance, open-sou...

  • java实现gRpc服务端-客户端框架代码

    参考官方github代码例子:https://github.com/grpc/grpc-java/tree/mas...

  • gRPC java的编译

    gRPC-java代码生成器 gRPC-java的代码生成器编译需要先编译protobuf,否则会报c++源文件无...

  • FreeMarker | 笔记篇

    java中Freemarker list指令详解 java中Freemarker if else指令详解 java...

  • Java 反射机制

    [1]. java反射详解[2]. Java Reflection(反射机制)详解[3]. 深入理解Java类型...

  • 微服务下跨语言 RPC 实现

    Java 与 Java 之间互调 目前主流的 Java 开发框架 Spring Boot,为了更方便集成 gRPC...

  • gRPC请求中对header进行处理

    gRPC请求设置header(Android、iOS、JavaServer) 1.[Android (Java)]...

  • Java注解

    Java注解(Annotation)详解(一)——概述及JDK自带注解 Java注解(Annotation)详解(...

网友评论

    本文标题:java gRPC详解

    本文链接:https://www.haomeiwen.com/subject/ovzjcktx.html