美文网首页grpc微服务框架探索
springboot+grpc中的流式传输数据返回探索

springboot+grpc中的流式传输数据返回探索

作者: 朽木亦自雕 | 来源:发表于2019-03-11 17:54 被阅读0次

背景:

接上篇 https://www.jianshu.com/p/c7d390efba29 中使用双向流 解决了grpc传输最大上限问题,避免了因为传输上限导致的错误,但是很显然没有将grpcServer的返回值传回页面,这篇文章我们来解决这个问题。

查看官方demo http://doc.oschina.net/grpc?t=60134页尾,找到

 public void routeChat() throws Exception {
    info("*** RoutChat");
    final SettableFuture<Void> finishFuture = SettableFuture.create();
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            finishFuture.setException(t);
          }

          @Override
          public void onCompleted() {
            finishFuture.set(null);
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 1),
              newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
      requestObserver.onCompleted();

      finishFuture.get();
      info("Finished RouteChat");
    } catch (Exception t) {
      requestObserver.onError(t);
      logger.log(Level.WARNING, "RouteChat Failed", t);
      throw t;
    }
  }

当然官方也只是解决了如何判定传输完了,那么我们改造一下

Code

GrpcServer

@GrpcService(HelloServiceGrpc.class)
public class HelloService extends HelloServiceGrpc.HelloServiceImplBase {


    private static final Logger logger = LoggerFactory.getLogger(HelloService.class);

    @Override
    public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloResponse> responseObserver) {
        return new StreamObserver<HelloRequest>() {
            @Override
            public void onNext(HelloRequest value) {
                String name = value.getName().toStringUtf8();
                logger.info("received name :" + name);
            }

            @Override
            public void onError(Throwable t) {
                logger.warn("Encountered error in recordRoute", t);
            }

            @Override
            public void onCompleted() {
                responseObserver.onNext(HelloResponse.newBuilder().setMessage("welcome to gRPC").build());
                //此处为了说明问题,返回两次Response
                responseObserver.onNext(HelloResponse.newBuilder().setMessage(" second message ").build());
                responseObserver.onCompleted();
            }
        };
    }
}

客户端Controller

@RestController
public class HelloController {

    @Autowired
    private HelloService service;

    @GetMapping("/hello")
    public String sayHello(String name) {
        return service.sendMessage(name);
    }
}

客户端GrpcConfig

@Component
public class GrpcConfig {

    @GrpcClient(value = "local-grpc-server")
    private Channel channel;

    @Bean("helloServiceStub")
    public HelloServiceGrpc.HelloServiceStub getHelloServiceStub() {
        return HelloServiceGrpc.newStub(channel);
    }
}

客户端HelloService

@Service(value = "helloService")
public class HelloService{


    @Autowired
    private  HelloServiceGrpc.HelloServiceStub helloServiceStub;
    public String sendMessage(String name) {
        StringBuffer stringBuffer = new StringBuffer();
        final SettableFuture<Void> finalFuture =SettableFuture.create();
        StreamObserver<HelloRequest> helloRequestStreamObservers = helloServiceStub.sayHello(
                new StreamObserver<HelloResponse>() {
            @Override
            public void onNext(HelloResponse value) {
                System.out.println("onNext : " + value.getMessage());
                //将返回的内容全部追加到一个字符串中
                stringBuffer.append(value.getMessage());
            }
            @Override
            public void onError(Throwable t) {
                finalFuture.setException(t);
            }
            @Override
            public void onCompleted() {
                finalFuture.set(null);
            }
        });
        for (int i = 0; i < 10; i++) {
            helloRequestStreamObservers.onNext(
                    HelloRequest.newBuilder()
                            .setName(ByteString.copyFrom(name,Charset.forName("UTF-8")))
                            .build()
            );
        }
        helloRequestStreamObservers.onCompleted();
        try {
            finalFuture.get();
            while(finalFuture.isDone()){
                System.out.println("is done");
                return stringBuffer.toString();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
}

关于com.google.common.util.concurrent.SettableFuture源码,详细看这个帖子:https://blog.csdn.net/lb7758zx/article/details/74322074

接口请求结果
控制台执行顺序

第二种方式:通过HttpServletResponse直接写出

代码:

@RestController
public class HelloController {

    @Autowired
    private HelloService service;

    @GetMapping("/hello")
    public void sayHello(String name, HttpServletResponse response) {
        service.sendMessage(name, response);
    }

}
@Service(value = "helloService")
public class HelloService{


    @Autowired
    private  HelloServiceGrpc.HelloServiceStub helloServiceStub;
    public void sendMessage(String name, HttpServletResponse response) {
        try {
            final PrintWriter printWriter = response.getWriter();
            final SettableFuture<Void> finalFuture =SettableFuture.create();
            StreamObserver<HelloRequest> helloRequestStreamObservers = helloServiceStub.sayHello(
                    new StreamObserver<HelloResponse>() {
                        @Override
                        public void onNext(HelloResponse value) {
                            System.out.println("onNext : " + value.getMessage());
                            if(null != printWriter){
                                printWriter.write(value.getMessage());
                            }
                        }

                        @Override
                        public void onError(Throwable t) {
                            finalFuture.setException(t);
                        }

                        @Override
                        public void onCompleted() {
                            finalFuture.set(null);
                        }
                    });
            for (int i = 0; i < 10; i++) {
                helloRequestStreamObservers.onNext(
                        HelloRequest.newBuilder()
                                .setName(ByteString.copyFrom(name,Charset.forName("UTF-8")))
                                .build()
                );
            }
            helloRequestStreamObservers.onCompleted();
            //阻塞,等待通知
            finalFuture.get();
            if(finalFuture.isDone()){
                printWriter.close();
            }
        } catch (InterruptedException | ExecutionException | IOException e) {
            e.printStackTrace();
        }
    }
}

效果和上面基本一致,大家根据需要,自行参考,欢迎多多交流。
原创帖,转载注明出处!

相关文章

网友评论

    本文标题:springboot+grpc中的流式传输数据返回探索

    本文链接:https://www.haomeiwen.com/subject/snkbpqtx.html