美文网首页
grpc系列 (1) 入门篇

grpc系列 (1) 入门篇

作者: suxin1932 | 来源:发表于2020-07-19 15:49 被阅读0次

    1.windows下基本环境安装

    1.1 安装 protoc 编译器

    https://www.jianshu.com/p/051e48410a9d (参考此文)

    1.2 安装 protoc-gen-grpc-java 插件

    1.2.1 搜索并下载 protoc-gen-grpc-java 插件

    https://search.maven.org

    搜索并下载 protoc-gen-grpc-java 插件.png

    1.2.2 环境变量配置

    #假设下载到的位置是
    E:/rpc/protoc-gen-grpc-java-1.30.2-windows-x86_64.exe
    
    #配置环境变量, 添加 path 即可
    

    2.使用示例

    工程结构.png

    2.1 pom.xml

    <dependencies>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty-shaded</artifactId>
            <version>1.30.2</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>1.30.2</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>1.30.2</version>
        </dependency>
    </dependencies>
    
    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.6.2</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.6.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.30.2:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    

    2.2 编写 proto 文件 ==> Stu.proto

    // 编写.proto文件参考 --> https://developers.google.cn/protocol-buffers/docs/proto3
    
    // 生成grpc代码 --> 参考 https://github.com/grpc/grpc-java
    // For protobuf-based codegen, you can put your proto files in the
    // src/main/proto and src/test/proto directories along with an appropriate plugin.
    
    syntax = "proto3";
    
    option java_package = "com.zy.grpc.proto.server";
    option java_outer_classname = "Student";
    option java_multiple_files = true;
    
    // 这里的出入参 --> 必须是 message, 不能单独用 int32, string 等基本类型
    service StudentService {
        // 1.入参是 对象, 出参也是对象
        rpc QueryStuById(SearchRequest) returns (SearchResponse);
    
        // 2.入参是 对象, 出参是流
        rpc QueryStuByIdStream2Resp(SearchRequest) returns (stream StuResponse);
    
        // 3.入参是 流, 出参是对象: 异步实现
        rpc QueryStuByIdReqByStream(stream SearchRequest) returns (StuResponseList);
    
        // 4.入参是 流, 出参也是流: 异步实现
        rpc QueryStuByIdReqAndResp2Stream(stream StuRequestStream) returns (stream StuResponseStream);
    }
    
    message SearchRequest {
        string id = 1;
    //    int32 page_number = 2;
    //    int32 result_per_page = 3;
    }
    
    message SearchResponse {
        string name = 1;
    }
    
    message StuResponse {
        string id = 1;
        string name = 2;
        string age = 3;
    }
    
    message StuResponseList {
        repeated StuResponse stuResponse = 1;
    }
    
    message StuRequestStream {
        string req = 1;
    }
    
    message StuResponseStream {
        string resp = 1;
    }
    

    2.3 执行命令, 生成代码

    执行命令, 生成代码.png
    #生成message的命令 (注意执行命令时, 所在的目录)
    protoc --java_out=src/main/java src/main/resources/proto/Stu.proto
    
    #生成grpc-service的命令 (注意执行命令时, 所在的目录)
    protoc --plugin=protoc-gen-grpc-java=E:/rpc/protoc-gen-grpc-java-1.30.2-windows-x86_64.exe --grpc-java_out=src/main/java src/main/resources/proto/Stu.proto
    

    2.4 编写 grpc-server, grpc-client

    2.4.1 grpc-server

    package com.zy.grpc.proto.server.peer;
    
    import com.zy.grpc.proto.server.message.*;
    import com.zy.grpc.proto.server.service.StudentServiceGrpc;
    import io.grpc.Server;
    import io.grpc.ServerBuilder;
    import io.grpc.stub.StreamObserver;
    
    import java.io.IOException;
    
    /**
     * server 与 client 参考
     * https://www.grpc.io/docs/quickstart/java/ 生成的 HelloWorldServer 与 HelloWorldClient
     */
    public class GrpcServer {
    
        private Server server;
    
        public static void main(String[] args) throws InterruptedException, IOException {
            GrpcServer grpcServer = new GrpcServer();
            grpcServer.start();
            grpcServer.blockUntilShutdown();
        }
    
        private void start() throws IOException {
            this.server = ServerBuilder.forPort(9090).addService(new StudentServiceImpl()).build().start();
            System.out.println("server started");
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                GrpcServer.this.stop();
                System.err.println("*** server shut down");
            }));
        }
    
        private void stop() {
            if (this.server != null) {
                this.server.shutdown();
            }
        }
    
        private void blockUntilShutdown() throws InterruptedException {
            if (this.server != null) {
                this.server.awaitTermination();
            }
        }
    
        static class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
            /**
             * 1.入参是 对象, 出参也是对象
             *
             * @param request
             * @param responseObserver
             */
            @Override
            public void queryStuById(SearchRequest request, StreamObserver<SearchResponse> responseObserver) {
                System.out.println("客户端请求参数id为: " + request.getId());
                // 当发生异常时, 回调函数为:
                // responseObserver.onError(new RuntimeException("there is an error to request by id"));
                // 当成功时, 回调函数为:
                responseObserver.onNext(SearchResponse.newBuilder().setName("tom" + request.getId()).build());
                // 当完成时, 回调函数为:
                responseObserver.onCompleted();
            }
    
            /**
             * 2.入参是 对象, 出参是流
             *
             * @param request
             * @param responseObserver
             */
            @Override
            public void queryStuByIdStream2Resp(SearchRequest request, StreamObserver<StuResponse> responseObserver) {
                System.out.println("客户端请求参数id为: " + request.getId());
                responseObserver.onNext(StuResponse.newBuilder().setName("stream2Resp --> jerry" + request.getId()).setAge(request.getId()).setId(request.getId()).build());
                responseObserver.onNext(StuResponse.newBuilder().setName("stream2Resp --> john" + request.getId()).setAge(request.getId()).setId(request.getId()).build());
                responseObserver.onNext(StuResponse.newBuilder().setName("stream2Resp --> trump" + request.getId()).setAge(request.getId()).setId(request.getId()).build());
                responseObserver.onCompleted();
            }
    
            /**
             * 3.入参是 流, 出参是对象: 异步实现
             *
             * @param responseObserver
             * @return
             */
            @Override
            public StreamObserver<SearchRequest> queryStuByIdReqByStream(StreamObserver<StuResponseList> responseObserver) {
                return new StreamObserver<SearchRequest>() {
                    @Override
                    public void onNext(SearchRequest value) {
                        System.out.println("onNext --> " + value.getId());
                    }
    
                    @Override
                    public void onError(Throwable t) {
                        System.out.println(t.getMessage());
                    }
    
                    @Override
                    public void onCompleted() {
                        StuResponse response1 = StuResponse.newBuilder().setId("1").setName("tommy1").setAge("10").build();
                        StuResponse response2 = StuResponse.newBuilder().setId("2").setName("tommy2").setAge("20").build();
                        StuResponseList list = StuResponseList.newBuilder().addStuResponse(response1).addStuResponse(response2).build();
                        responseObserver.onNext(list);
                        responseObserver.onCompleted();
                    }
                };
            }
    
            /**
             * 4.入参是 流, 出参也是流: 异步实现
             *
             * @param responseObserver
             * @return
             */
            @Override
            public StreamObserver<StuRequestStream> queryStuByIdReqAndResp2Stream(StreamObserver<StuResponseStream> responseObserver) {
                return new StreamObserver<StuRequestStream>() {
                    @Override
                    public void onNext(StuRequestStream value) {
                        System.out.println("receive request: " + value.getReq());
                        responseObserver.onNext(StuResponseStream.newBuilder().setResp("hello" + value.getReq()).build());
                    }
    
                    @Override
                    public void onError(Throwable t) {
                        System.out.println(t.getMessage());
                    }
    
                    @Override
                    public void onCompleted() {
                        responseObserver.onCompleted();
                    }
                };
            }
        }
    }
    

    2.4.2 grpc-clietn

    package com.zy.grpc.proto.server.peer;
    
    import com.zy.grpc.proto.server.message.*;
    import com.zy.grpc.proto.server.service.StudentServiceGrpc;
    import io.grpc.ManagedChannel;
    import io.grpc.ManagedChannelBuilder;
    import io.grpc.stub.StreamObserver;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    public class GrpcClient {
    
        private final ManagedChannel channel;
        private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub;
        private final StudentServiceGrpc.StudentServiceStub stub;
    
        public GrpcClient(String host, int port) {
            this(ManagedChannelBuilder.forAddress(host, port).usePlaintext().build());
        }
    
        GrpcClient(ManagedChannel channel) {
            this.channel = channel;
            this.blockingStub = StudentServiceGrpc.newBlockingStub(channel);
            this.stub = StudentServiceGrpc.newStub(channel);
        }
    
        /**
         * 1.入参是 对象, 出参也是对象
         * @param id
         */
        private void queryStuById(String id) {
            try {
                SearchResponse response = this.blockingStub.queryStuById(SearchRequest.newBuilder().setId(id).build());
                System.out.println("response --> " + response.getName());
            } catch (Exception e) {
                System.out.println("failed to request queryStuById");
                e.printStackTrace();
            }
        }
    
        /**
         * 2.入参是 对象, 出参是流
         * @param id
         */
        private void queryStuByIdStream2Resp(String id) {
            try {
                Iterator<StuResponse> resp = this.blockingStub.queryStuByIdStream2Resp(SearchRequest.newBuilder().setId(id).build());
                while (resp.hasNext()) {
                    StuResponse next = resp.next();
                    System.out.println(next);
                }
            } catch (Exception e) {
                System.out.println("failed to request queryStuByIdStream2Resp");
                e.printStackTrace();
            }
        }
    
        /**
         * 3.入参是 流, 出参是对象: 异步实现
         * @param list
         */
        private void queryStuByIdReqByStream(List<String> list) {
            if (list == null || list.size() == 0) {
                return;
            }
            StreamObserver<StuResponseList> responseObserver = new StreamObserver<StuResponseList>() {
                @Override
                public void onNext(StuResponseList value) {
                    value.getStuResponseList().forEach(System.out::println);
                }
    
                @Override
                public void onError(Throwable t) {
                    System.out.println(t.getMessage());
                }
    
                @Override
                public void onCompleted() {
                    System.out.println("completed");
                }
            };
    
            StreamObserver<SearchRequest> requestObserver = stub.queryStuByIdReqByStream(responseObserver);
            list.forEach(id -> requestObserver.onNext(SearchRequest.newBuilder().setId(id).build()));
            requestObserver.onCompleted();
        }
    
        /**
         * 4.入参是 流, 出参也是流: 异步实现
         * @param list
         */
        private void queryStuByIdReqAndResp2Stream(List<String> list) {
            if (list == null || list.size() == 0) {
                return;
            }
            StreamObserver<StuResponseStream> responseStreamStreamObserver = new StreamObserver<StuResponseStream>() {
                @Override
                public void onNext(StuResponseStream value) {
                    System.out.println(value.getResp());
                }
    
                @Override
                public void onError(Throwable t) {
                    System.out.println(t.getMessage());
                }
    
                @Override
                public void onCompleted() {
                    System.out.println("completed");
                }
            };
            StreamObserver<StuRequestStream> request = stub.queryStuByIdReqAndResp2Stream(responseStreamStreamObserver);
            list.forEach(req -> request.onNext(StuRequestStream.newBuilder().setReq(req).build()));
            request.onCompleted();
        }
    
        private void shutdown() throws InterruptedException {
            this.channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
        }
    
        public static void main(String[] args) throws InterruptedException {
            GrpcClient grpcClient = new GrpcClient("127.0.0.1", 9090);
            try {
                System.out.println("-----------------------1.入参是 对象, 出参也是对象----------------------");
                grpcClient.queryStuById("1");
                System.out.println("-----------------------2.入参是 对象, 出参是流----------------------");
                grpcClient.queryStuByIdStream2Resp("2");
                System.out.println("-----------------------3.入参是 流, 出参是对象: 异步实现-----------------------");
                List<String> list = new ArrayList<>();
                list.add("1");
                list.add("2");
                list.add("3");
                grpcClient.queryStuByIdReqByStream(list);
                System.out.println("-----------------------4.入参是 流, 出参也是流: 异步实现-----------------------");
                grpcClient.queryStuByIdReqAndResp2Stream(list);
            } finally {
                grpcClient.shutdown();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:grpc系列 (1) 入门篇

          本文链接:https://www.haomeiwen.com/subject/bqxrkktx.html