美文网首页
gRPC的四种rpc服务接口定义方式

gRPC的四种rpc服务接口定义方式

作者: 木木与呆呆 | 来源:发表于2023-06-29 10:05 被阅读0次

1.说明

在gRPC的服务中定义接口的时候,
请求参数和响应参数可以设置为非Stream或者是Stream方式。
Stream流方式,即操作者可以任意的读写流数据,
直到关闭流,
而非Stream方式在写入或者读取后,
数据是不能改变的。

这样一个rpc接口的定义就有如下四种定义方式:

序号 名称 请求(客户端) 响应(服务端)
1 简单RPC 非Stream 非Stream
2 服务器端流式RPC 非Stream Stream
3 客户端流式 RPC Stream 非Stream
4 双向流式 RPC Stream Stream

2.proto定义

新建hello_stream.proto文件如下:

syntax = "proto3";

option java_multiple_files = false;
option java_package = "io.grpc.examples.hello.stream";
option java_outer_classname = "HelloStream";
option objc_class_prefix = "HLWS";

package hellostream;

// The greeting service definition.
service StreamingGreeter {
  // 1.Simple RPC
  rpc SayHello (HelloRequest) returns (HelloReply) {}

  // 2.Server-to-Client streaming RPC
  rpc SayHelloServerStream (HelloRequest) returns (stream HelloReply) {}
  
  // 3.Client-to-Server streaming RPC
  rpc SayHelloClientStream (stream HelloRequest) returns (HelloReply) {}

  // 4.Bidirectional streaming RPC
  rpc SayHelloBidirStream (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

3.同步和异步stub

在上面的proto文件中定义了4种rpc接口,
而GRPC实现rpc接口的时候,
提供了同步和异步调用的stub方法。
同步的stub只能实现1,2这三种接口。
异步的stub能实现全部4种接口。
即1和2两种接口的客户端有同步和异步两种写法,
而3和4两种接口的客户端只有异步一种写法。

4.简单RPC

4.1.客户端-同步

StreamingGreeterClient.sayHelloBlock(String)

4.2.客户端-异步

StreamingGreeterClient.sayHelloAsync(String)

4.3.服务端

StreamingGreeterImpl.sayHello(HelloRequest, StreamObserver<HelloReply>)

5. 服务器端流式RPC

5.1.客户端-同步

StreamingGreeterClient.sayHelloServerStreamBlock(String)

5.2.客户端-异步

StreamingGreeterClient.sayHelloServerStreamAsync(String)

5.3.服务端

StreamingGreeterImpl.sayHelloServerStream(HelloRequest, StreamObserver<HelloReply>)

6.客户端流式 RPC

6.1.客户端-异步

StreamingGreeterClient.sayHelloClientStreamAsync(String...)

6.2.服务端

StreamingGreeterImpl.sayHelloClientStream(StreamObserver<HelloReply>)

7.双向流式 RPC

7.1.客户端-异步

StreamingGreeterClient.sayHelloBidirStreamAsync(String...)

7.2.服务端

StreamingGreeterImpl.sayHelloBidirStream(StreamObserver<HelloReply>)

8.客户端和服务端完整代码

8.1.客户端代码

StreamingGreeterClient.java

package com.ai.grpc.service;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.examples.hello.stream.HelloStream;
import io.grpc.examples.hello.stream.HelloStream.HelloReply;
import io.grpc.examples.hello.stream.HelloStream.HelloRequest;
import io.grpc.examples.hello.stream.StreamingGreeterGrpc;
import io.grpc.examples.hello.stream.StreamingGreeterGrpc.StreamingGreeterBlockingStub;
import io.grpc.examples.hello.stream.StreamingGreeterGrpc.StreamingGreeterStub;
import io.grpc.stub.StreamObserver;

public class StreamingGreeterClient {
    private static Logger LOG = LoggerFactory.getLogger(StreamingGreeterClient.class);

    // 同步的stub
    private StreamingGreeterBlockingStub blockStub;

    // 异步的stub
    private StreamingGreeterStub asyncStub;

    // 初始化GPRC客户端连接
    public void initGrpcStub(String ip, int port) {

        // 获取服务器连接
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(ip, port).usePlaintext().build();

        // 获取同步的stub
        blockStub = StreamingGreeterGrpc.newBlockingStub(managedChannel);

        // 获取异步的stub
        asyncStub = StreamingGreeterGrpc.newStub(managedChannel);

    }

    /**
     * 1.Simple RPC 同步调用
     */
    public String sayHelloBlock(String name) {
        HelloRequest request = HelloRequest.newBuilder().setName(name).build();
        HelloReply reply = blockStub.sayHello(request);
        return reply.getMessage();
    }

    /**
     * 1.Simple RPC 异步调用
     */
    public String sayHelloAsync(String name) {
        HelloRequest request = HelloRequest.newBuilder().setName(name).build();
        BlockingQueue<HelloReply> replys = new ArrayBlockingQueue<HelloReply>(1);
        StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloStream.HelloReply>() {

            @Override
            public void onNext(HelloReply reply) {
                LOG.info("onNext=" + reply);
                replys.offer(reply);
            }

            @Override
            public void onCompleted() {
                LOG.info("onCompleted");
            }

            @Override
            public void onError(Throwable t) {
                LOG.info("onError," + t);
                String error = t.getMessage();
                HelloReply reply = HelloReply.newBuilder().setMessage(error).build();
                replys.offer(reply);
            }
        };
        asyncStub.sayHello(request, responseObserver);

        try {
            // 获取到结果才返回
            return replys.take().getMessage();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }

    /**
     * 2.Server-to-Client streaming RPC 同步调用
     */
    public List<String> sayHelloServerStreamBlock(String name) {
        HelloRequest request = HelloRequest.newBuilder().setName(name).build();
        Iterator<HelloReply> replys = blockStub.sayHelloServerStream(request);

        List<String> messages = new ArrayList<>();
        while (replys.hasNext()) {
            HelloReply reply = replys.next();
            messages.add(reply.getMessage());
        }

        return messages;
    }

    /**
     * 2.Server-to-Client streaming RPC 异步调用
     */
    public List<String> sayHelloServerStreamAsync(String name) {
        HelloRequest request = HelloRequest.newBuilder().setName(name).build();

        // 用于保存返回值
        List<String> messages = new ArrayList<>();
        // 用于判断服务端返回的流是否结束
        BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
        StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloStream.HelloReply>() {

            @Override
            public void onNext(HelloReply reply) {
                LOG.info("onNext=" + reply);
                messages.add(reply.getMessage());
            }

            @Override
            public void onCompleted() {
                LOG.info("onCompleted");
                closeStream(queue);
            }

            @Override
            public void onError(Throwable t) {
                LOG.info("onError," + t);
                closeStream(queue);
            }
        };
        asyncStub.sayHelloServerStream(request, responseObserver);

        waiteStreamClose(queue);

        return messages;
    }

    /**
     * 3.Client-to-Server streaming RPC 异步调用<br/>
     * 对于客户端的流式调用,只能是异步调用
     */
    public String sayHelloClientStreamAsync(String... names) {

        // 用于保存返回值,即使返回值只有一个值,也需要通过容器返回
        List<String> messages = new ArrayList<>();
        // 用于判断服务端返回的流是否结束
        BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
        StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloReply>() {

            @Override
            public void onNext(HelloReply reply) {
                LOG.info("onNext=" + reply);
                messages.add(reply.getMessage());
            }

            @Override
            public void onCompleted() {
                LOG.info("onCompleted");
                closeStream(queue);
            }

            @Override
            public void onError(Throwable t) {
                LOG.info("onError," + t);
                closeStream(queue);
            }
        };
        // 异步调用接口
        StreamObserver<HelloRequest> requestObserver = asyncStub.sayHelloClientStream(responseObserver);

        // 客户端发送消息流
        for (String name : names) {
            HelloRequest request = HelloRequest.newBuilder().setName(name).build();
            requestObserver.onNext(request);
        }
        // 关闭客户端流
        requestObserver.onCompleted();

        waiteStreamClose(queue);

        return messages.get(0);
    }

    /**
     * 4.Bidirectional streaming RPC 异步调用<br/>
     * 对于客户端的流式调用,只能是异步调用
     */
    public String sayHelloBidirStreamAsync(String... names) {

        // 用于保存返回值,也需要通过容器返回
        List<String> messages = new ArrayList<>();
        // 用于判断服务端返回的流是否结束
        CountDownLatch downLatch = new CountDownLatch(1);
        StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloReply>() {

            @Override
            public void onNext(HelloReply reply) {
                LOG.info("onNext=" + reply);
                messages.add(reply.getMessage());
            }

            @Override
            public void onCompleted() {
                LOG.info("onCompleted");
                downLatch.countDown();
            }

            @Override
            public void onError(Throwable t) {
                LOG.info("onError," + t);
                downLatch.countDown();
            }
        };
        // 异步调用接口
        StreamObserver<HelloRequest> requestObserver = asyncStub.sayHelloBidirStream(responseObserver);

        // 客户端发送消息流
        for (String name : names) {
            HelloRequest request = HelloRequest.newBuilder().setName(name).build();
            requestObserver.onNext(request);
        }
        // 关闭客户端流
        requestObserver.onCompleted();

        try {
            // 等待服务端响应结束
            downLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return messages.toString();
    }

    private void closeStream(BlockingQueue<Object> queue) {
        // 表示服务端返回的流结束
        queue.offer(new Object());
    }

    private void waiteStreamClose(BlockingQueue<Object> queue) {
        // 阻塞直到获取到对象,则可以返回结果
        try {
            queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

StreamingGreeterClientTest.java

package com.ai.grpc.service;

import java.util.List;

import org.junit.Test;

public class StreamingGreeterClientTest {

    private StreamingGreeterClient client = new StreamingGreeterClient();

    @Test
    public void testInitGrpcStub() {
        initGrpcStub();
    }

    private void initGrpcStub() {
        String ip = "localhost";
        int port = 50051;
        client.initGrpcStub(ip, port);
    }

    /**
     * 1.Simple RPC 同步调用
     */
    @Test
    public void testSayHelloBlock() {
        initGrpcStub();

        String name = "yuwen";
        String message = client.sayHelloBlock(name);
        System.out.println(message);
    }

    /**
     * 1.Simple RPC 异步调用
     */
    @Test
    public void testSayHelloAsync() {
        initGrpcStub();

        String name = "tom";
        String message = client.sayHelloAsync(name);
        System.out.println(message);
    }

    /**
     * 2.Server-to-Client streaming RPC 同步调用
     */
    @Test
    public void testSayHelloServerStreamBlock() {
        initGrpcStub();

        String name = "yuwen";
        List<String> messages = client.sayHelloServerStreamBlock(name);
        System.out.println(messages);
    }

    /**
     * 2.Server-to-Client streaming RPC 异步调用
     */
    @Test
    public void testSayHelloServerStreamAsync() {
        initGrpcStub();

        String name = "yuwen";
        List<String> messages = client.sayHelloServerStreamAsync(name);
        System.out.println(messages);
    }

    /**
     * 3.Client-to-Server streaming RPC 异步调用
     */
    @Test
    public void testSayHelloClientStreamAsync() {
        initGrpcStub();

        String name = "yuwen";
        String messages = client.sayHelloClientStreamAsync(name, "Tom", "cat", "Jerry", "mouse");
        System.out.println(messages);
    }

    /**
     * 4.Bidirectional streaming RPC 异步调用
     */
    @Test
    public void testSayHelloBidirStreamAsync() {
        initGrpcStub();

        String name = "yuwen";
        String messages = client.sayHelloBidirStreamAsync(name, "Tom", "cat", "Jerry", "mouse");
        System.out.println(messages);
    }
}

8.2.服务端代码

StreamingGreeterImpl.java:

package com.ai.grpc.service.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.examples.hello.stream.HelloStream;
import io.grpc.examples.hello.stream.HelloStream.HelloReply;
import io.grpc.examples.hello.stream.HelloStream.HelloRequest;
import io.grpc.examples.hello.stream.StreamingGreeterGrpc;
import io.grpc.stub.StreamObserver;

public class StreamingGreeterImpl extends StreamingGreeterGrpc.StreamingGreeterImplBase {

    private static Logger LOG = LoggerFactory.getLogger(StreamingGreeterImpl.class);

    /**
     * 1.Simple RPC
     */
    @Override
    public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
        // 处理消息
        LOG.info("request=" + request);
        String message = "hello-" + request.getName();

        // 返回消息
        HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
        // 只能调用一次onNext,第二次调用会报错
        responseObserver.onNext(reply);

        // 结束处理
        responseObserver.onCompleted();
    }

    /**
     * 2.Server-to-Client streaming RPC 服务端实现<br/>
     * 虽然入参定义和Simple RPC基本相同,但是这里的responseObserver能够返回多个响应消息
     */
    @Override
    public void sayHelloServerStream(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
        // 处理消息
        LOG.info("request=" + request);

        // 返回消息,总共返回三个响应消息
        for (int i = 1; i <= 3; i++) {
            String message = i + "-hello-" + request.getName();
            HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
            responseObserver.onNext(reply);
            try {
                // 等待3s后返回下一个消息
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 结束处理
        responseObserver.onCompleted();
    }

    /**
     * 3.Client-to-Server streaming RPC
     */
    @Override
    public StreamObserver<HelloRequest> sayHelloClientStream(StreamObserver<HelloReply> responseObserver) {
        StreamObserver<HelloRequest> requestObserver = new StreamObserver<HelloStream.HelloRequest>() {

            // 缓存客户端发送的消息
            List<String> messages = new ArrayList<>();

            @Override
            public void onNext(HelloRequest request) {
                LOG.info("onNext=" + request);
                messages.add(request.getName());
            }

            @Override
            public void onCompleted() {
                LOG.info("onCompleted");
                // 处理并且返回消息
                String message = "hello-" + messages;
                HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
                // 由于服务端不是流,这里只能返回一次响应消息
                responseObserver.onNext(reply);
                // 结束处理
                responseObserver.onCompleted();
            }

            @Override
            public void onError(Throwable t) {
                LOG.info("onError," + t);
                // 结束处理
                responseObserver.onCompleted();
            }
        };

        return requestObserver;
    }

    /**
     * 4.Bidirectional streaming RPC
     */
    @Override
    public StreamObserver<HelloRequest> sayHelloBidirStream(StreamObserver<HelloReply> responseObserver) {
        StreamObserver<HelloRequest> requestObserver = new StreamObserver<HelloStream.HelloRequest>() {
            @Override
            public void onNext(HelloRequest request) {
                LOG.info("onNext=" + request);
                // 处理并且返回消息
                String message = "hello-" + request.getName();
                HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
                // 服务端返回流处理后的消息
                responseObserver.onNext(reply);
            }

            @Override
            public void onCompleted() {
                LOG.info("onCompleted");
                // 结束处理
                responseObserver.onCompleted();
            }

            @Override
            public void onError(Throwable t) {
                LOG.info("onError," + t);
                // 结束处理
                responseObserver.onCompleted();
            }
        };

        return requestObserver;
    }

}

GRPCServer.java:

package com.ai.grpc;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ai.grpc.service.impl.StreamingGreeterImpl;

import io.grpc.Server;
import io.grpc.ServerBuilder;

/**
 * 
 * 静态订阅模式,采集器作为服务端
 * 
 * gRPC服务启动类,启动时注册添加需要对外提供的服务类
 */
public class GRPCServer {
    private static Logger LOG = LoggerFactory.getLogger(GRPCServer.class);

    private Server server;

    private void start() throws IOException {
        // 服务运行端口
        int port = 50051;
        // 注册暴露对外提供的服务
        server = ServerBuilder.forPort(port).addService(new StreamingGreeterImpl()).build().start();
        LOG.info("Server started, listening on port={} ", port);

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                // 使用标准错误输出,因为日志记录器有可能在JVM关闭时被重置
                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                LOG.info("*** shutting down gRPC server since JVM is shutting down");
                try {
                    GRPCServer.this.stop();
                } catch (InterruptedException e) {
                    e.printStackTrace(System.err);
                }
                System.err.println("*** server shut down complete");
                LOG.info("*** server shut down complete");

            }
        });
    }

    private void stop() throws InterruptedException {
        if (server != null) {
            server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
        }
    }

    /**
     * 在主线程上等待终止,因为grpc库使用守护进程。
     */
    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    /**
     * 启动服务Main方法
     */
    public static void main(String[] args) throws IOException, InterruptedException {
        final GRPCServer server = new GRPCServer();
        server.start();
        server.blockUntilShutdown();
    }
}

9.参考文档

GRPC Java官方手册

相关文章

  • google grpc 其他三种调用方式

    google grpc中定义了四种调用方式,分别是 一元RPC:rpc客户端发送一个请求,服务端返回一个响应。 服...

  • golang grpc快速启动demo(example)

    grpc是后台服务通信最常用的rpc框架。使用grpc需要预先定义协议文件(proto),然后生成相应的接口代码才...

  • grpc初探

    1 grpc的定义 grpc good rpc grpc使用protobuf文件声明服务,服务端和客户端都通使用...

  • websocket与grpc结合

    grpc 服务端: 定义一个业务逻辑服务,定义服务内的接口函数,参数,响应。 结构体服务,实现接口函数,逻辑 ne...

  • gRPC理念

    本文通过gRPC的结构概述和生命周期介绍一些gRPC理念的关键点。 概述 服务定义 就像很多RPC系统一样,gRP...

  • .NET Core gRPC 流式调用

    gRPC 使用 Protocol buffers 作为接口定义语言(IDL)来描述服务接口和输入输出消息的结构,目...

  • gRPC(3):拦截器

    在 gRPC 调用过程中,我们可以拦截 RPC 的执行,在 RPC 服务执行前或执行后运行一些自定义逻辑,这在某些...

  • gRPC 学习笔记

     gRPC 学习笔记,记录gprc一些基本概念.  gRPC正如其他 RPC 系统,gRPC 基于如下思想:定义一...

  • grpc实现文件传输的客户端和服务端例子

    grpc是一个跨语言的rpc框架,通过protobuf定义接口和传输的格式,具有高压缩,高性能(基于http/2的...

  • gRPC之基本概念

    基于google的官方文档,附带一些自己的思考 服务(service) gRPC可以定义四种服务 简单的reque...

网友评论

      本文标题:gRPC的四种rpc服务接口定义方式

      本文链接:https://www.haomeiwen.com/subject/mxvaydtx.html