Gradle工程的环境搭建依然和前文一致,参考:gRPC入门-Hello World
-
在
src\main\proto
下新建.proto
契约,如下:syntax = "proto3"; package com.mattie.netty.grpc; option java_package = "com.mattie.netty.grpc"; option java_outer_classname = "HelloWorldProtos"; service HelloService { //双向流 rpc biStream (stream StreamReq) returns (stream StreamResp) {}; } //双向流 message StreamReq { string req_info = 1; } message StreamResp { string resp_info = 1; }
-
执行
gradle clean build
,自动生成gRPC相关代码:HelloServiceGrpc
, 我们需要使用就是它和它的几个内部类。 -
自定义服务类
MyServiceImpl
继承HelloServiceGrpc
的内部类HelloServiceImplBase
并重写契约中定义的服务biStream
。注意到重写的方法接收的参数类型是StreamObserver
(针对response),返回值的类型也是StreamObserver
(针对request)。public class MyServiceImpl extends HelloServiceImplBase { @Override public StreamObserver<HelloWorldProtos.StreamReq> biStream(StreamObserver<HelloWorldProtos.StreamResp> responseObserver) { return new StreamObserver<HelloWorldProtos.StreamReq>() { //接收请求 @Override public void onNext(HelloWorldProtos.StreamReq streamReq) { System.out.println(streamReq.getReqInfo()); //接收请求后就返回一个响应 responseObserver.onNext(HelloWorldProtos.StreamResp.newBuilder().setRespInfo("from server").build()); } @Override public void onError(Throwable throwable) { } //客户端发送数据完毕 @Override public void onCompleted() { //服务端也完毕 responseObserver.onCompleted(); } }; }
RequestObserver在服务端实现,responseObserver在客户端实现。
因此,在服务代码MyServiceImpl
中需要返回一个requestObserver
,实现其中的回调方法:
1. onNext
表示接收到一个request,这里通过responseObserver
的onNext()
立刻返回了一条数据。
2. onCompleted
表示客户端已经发送数据完毕,这里调用responseObserver
的onCompleted()
也告诉客户端连接关闭。
-
客户端不仅需要发送数据,而且需要实现一个
responseObserver
:public void biCommute() { StreamObserver<HelloWorldProtos.StreamReq> reqStreamObserver = this.stub.biStream(new StreamObserver<HelloWorldProtos.StreamResp>() { //接收到服务返回 @Override public void onNext(HelloWorldProtos.StreamResp streamResp) { System.out.println(streamResp.getRespInfo()); } @Override public void onError(Throwable throwable) { } //服务发送完毕 @Override public void onCompleted() { } }); reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server1").build()); reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server2").build()); reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server3").build()); reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server4").build()); reqStreamObserver.onCompleted(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }
- 这里调用服务方法
biStream
使用的是stub
(异步),而非之前的blockingStub
(同步)。 - 调用
biStream
需要传入responseObserver
作为参数,同时返回值是requestObserver
。 - 通过
requestObserver
的onNext()
不断发送数据。
- 执行程序,客户端发送四条数据,服务端每接收到一条数据就响应一条
from server
给客户端。
双向流为了能更好的看到执行效果,可以在程序最后增加
sleep(10000)
,观察服务的异步响应过程。
网友评论