-
官网文档
流程图 - 自己实现的代码地址:https://gitee.com/neimenggudaxue/grpc3
0.准备工作
0.1 安装protoc插件
请参考 https://www.jianshu.com/p/1a9fa148d94f
1.定义服务
1.1 proto文件
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld"; #为我们生成 Java 类使用的
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition. //服务
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {} //方法
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
1.2 四种service方法
1.2.1 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
1.2.2 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
1.2.3 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
1.2.4 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
2.生成客户端和服务器端代码
2.1 用命令mvn install生成代码,在target目录下
2.2 生成的类包含
2.2.1 包含了所有填充,序列化以及获取请求和应答的消息类型的Feature.java,Point.java, Rectangle.java以及其它类文件。
2.2.2 RouteGuideGrpc.java 文件包含(以及其它一些有用的代码):
a. RouteGuide 服务器要实现的一个接口 RouteGuideGrpc.RouteGuide,其中所有的方法都定 义在RouteGuide服务中。
b. 客户端可以用来和RouteGuide服务器交互的 存根 类。 异步的存根也实现了 RouteGuide 接口。
3. 创建服务器
让 RouteGuide 服务工作有两个部分:
a. 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”。
b. 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。
3.1 实现RouteGuide
3.1.1 我们的服务器有一个实现了生成的 RouteGuideGrpc.Service 接口的RouteGuideService类:
private static class RouteGuideService implements RouteGuideGrpc.RouteGuide {
...
}
3.1.2 简单rpc方法
最简单的类型 GetFeature,它从客户端拿到一个 Point 对象,然后从返回包含从数据库拿到的feature信息的 Feature。
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
getFeature() 接收两个参数:(1) Point:请求 ; (2)StreamObserver<Feature>: 一个应答的观察者,实际上是服务器调用它应答的一个特殊接口。
要将应答返回给客户端,并完成调用:
如在我们的服务定义中指定的那样,我们组织并填充一个 Feature 应答对象返回给客户端。在这个 例子中,我们通过一个单独的私有方法checkFeature()来实现。
我们使用应答观察者的 onNext() 方法返回 Feature。
我们使用应答观察者的 onCompleted() 方法来指出我们已经完成了和 RPC的交互。
3.1.3 服务端流式rpc方法
ListFeatures 是一个服务器端的流式 RPC,所以我们需要将多个 Feature 发回给客户端。
private final Collection<Feature> features;
...
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
和简单 RPC 类似,这个方法拿到了一个请求对象(客户端期望从 Rectangle 找到 Feature)和一个应答观察者 StreamObserver。
这次我们得到了需要返回给客户端的足够多的 Feature 对象(在这个场景下,我们根据他们是否在我们的 Rectangle 请求中,从服务的特性集合中选择他们),并且使用 onNext() 方法轮流往响应观察者写入。最后,和简单 RPC 的例子一样,我们使用响应观察者的 onCompleted() 方法去告诉 gRPC 写入应答已完成。
3.1.4 客户端流式rpc方法
客户端流方法 RecordRoute,我们通过它可以从客户端拿到一个 Point 的流,并且返回一个包括它们路径的信息 RouteSummary。
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
如你所见,这次这个方法没有请求参数。相反的,它拿到了一个 RouteGuide_RecordRouteServer 流,服务器可以用它来同时读 和 写消息——它可以用自己的 Recv() 方法接收客户端消息并且用 SendAndClose() 方法返回它的单个响应。
如你所见,我们的方法和前面的方法类型相似,拿到一个 StreamObserver 应答观察者参数,但是这次它返回一个 StreamObserver 以便客户端写入它的 Point。
在这个方法体中,我们返回了一个匿名 StreamObserver 实例,其中我们:
覆写了 onNext() 方法,每次客户端写入一个 Point 到消息流时,拿到特性和其它信息。
覆写了 onCompleted() 方法(在 客户端 结束写入消息时调用),用来填充和构建我们的 RouteSummary。然后我们用 RouteSummary 调用方法自己的的响应观察者的 onNext(),之后调用它的 onCompleted() 方法,结束服务器端的调用。
3.1.5 双流式rpc方法
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
和我们的客户端流的例子一样,我们拿到和返回一个 StreamObserver 应答观察者,除了这次我们在客户端仍然写入消息到 它们的 消息流时通过我们方法的应答观察者返回值。这里读写的语法和客户端流以及服务器流方法一样。虽然每一端都会按照它们写入的顺序拿到另一端的消息,客户端和服务器都可以任意顺序读写——流的操作是互不依赖的。
4.启动服务器
public void start() {
gRpcServer = NettyServerBuilder.forPort(port)
.addService(RouteGuideGrpc.bindService(new RouteGuideService(features)))
.build().start();
logger.info("Server started, listening on " + port);
...
}
我们用一个 NettyServerBuilder
构建和启动服务器。这个服务器的生成器基于 Netty 传输框架。
为了做到这个,我们需要:
- 创建我们服务实现类
RouteGuideService
的一个实例并且将其传给生成的RouteGuideGrpc
类的静态方法bindService()
去获得服务定义。 - 使用生成器的
forPort()
方法指定地址以及期望客户端请求监听的端口。 - 通过传入将
bindService()
返回的服务定义,用生成器注册我们的服务实现到生成器的addService()
方法。 - 调用生成器上的
build()
和start()
方法为我们的服务创建和启动一个 RPC 服务器。
5. 创建客户端
5.1创建存根
为了调用服务方法,我们需要首先创建一个 存根,或者两个存根:
- 一个 阻塞/同步 存根:这意味着 RPC 调用等待服务器响应,并且要么返回应答,要么造成异常。
- 一个 非阻塞/异步 存根可以向服务器发起非阻塞调用,应答会异步返回。你可以使用异步存根去发起特定类型的流式调用。
我们首先为存根创建一个 gRPC channel,指明服务器地址和我们想连接的端口号:
channel = NettyChannelBuilder.forAddress(host, port)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
如你所见,我们用一个 NettyServerBuilder
构建和启动服务器。这个服务器的生成器基于 Netty 传输框架。
我们使用 Netty 传输框架,所以我们用一个 NettyServerBuilder
启动服务器。
现在我们可以通过从 .proto 中生成的 RouteGuideGrpc
类的 newStub
和 newBlockingStub
方法,使用频道去创建我们的存根。
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
5.2 调用服务方法
简单 RPC
在阻塞存根上调用简单 RPC GetFeature 几乎是和调用一个本地方法一样直观。
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature = blockingStub.getFeature(request);
我们创建和填充了一个请求 protocol buffer 对象(在这个场景下是 Point),在我们的阻塞存根上将其传给 getFeature() 方法,拿回一个 Feature。
网友评论