美文网首页
8.gRPC四种数据传输

8.gRPC四种数据传输

作者: 未知的证明 | 来源:发表于2019-02-28 16:34 被阅读0次

1.proto文件的编写(gRPC基于proto3语法)

四种方式的数据传输:

  • Unary RPCs where the client sends a single request to the server and gets a single response back, just like a normal function call.
  • Server streaming RPCs where the client sends a request to the server and gets a stream to read a sequence of messages back. The client reads from the returned stream until there are no more messages. gRPC guarantees message ordering within an individual RPC call.
  • Client streaming RPCs where the client writes a sequence of messages and sends them to the server, again using a provided stream. Once the client has finished writing the messages, it waits for the server to read them and return its response. Again gRPC guarantees message ordering within an individual RPC call.
  • Bidirectional streaming RPCs where both sides send a sequence of messages using a read-write stream. The two streams operate independently, so clients and servers can read and write in whatever order they like: for example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes. The order of messages in each stream is preserved.
    具体形式分别如下代码:
syntax = "proto3";

package com.liyuanfeng.proto;

option java_package = "com.liyuanfeng.proto";
option java_outer_classname = "StudentProto";
option java_multiple_files = true;


service StudentService {
    rpc getRealNameByUsername (MyRequest) returns (MyResponse) {
    }
    rpc GetStudentByAge (StudentRequest) returns (stream StudentResponse) {
    }
    rpc GetStudentsWrapperByAges (stream StudentRequest) returns (StudentResponseList) {
    }
    rpc BiTalk(stream StreamRequest) returns (StreamResponse){}
}


message MyRequest {
    string username = 1;
}

message MyResponse {
    string realname = 2;
}

message StudentRequest {
    int32 age = 1;
}
message StudentResponse {
    string name = 1;
    int32 age = 2;
    string city = 3;
}

message StudentResponseList {
    repeated StudentResponse studentResponse = 1;
}


message StreamRequest{
    string request_info = 1;
}
message StreamResponse{
    string response_info = 1;
}

2.配置gradle文件,使其stub和server代码生成到合适的位置

apply plugin: 'java'
apply plugin: 'com.google.protobuf'
group 'com.liyuanfeng'
version '1.0'
sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    maven { url 'https://maven.aliyun.com/repository/central' }
    maven { url 'https://maven.aliyun.com/repository/jcenter' }
    maven {//配置Maven仓库的地址
        url "http://repo.springsource.org/libs-milestone-local"
    }
}
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    // https://mvnrepository.com/artifact/io.netty/netty-all
    compile group: 'io.netty', name: 'netty-all', version: '4.1.6.Final'
    // https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java
    compile group: 'com.google.protobuf', name: 'protobuf-java', version: '3.3.1'
    // https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java-util
    compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.3.1'
    // https://mvnrepository.com/artifact/org.apache.thrift/libthrift
    compile group: 'org.apache.thrift', name: 'libthrift', version: '0.12.0'

    compile 'io.grpc:grpc-netty-shaded:1.18.0'
    compile 'io.grpc:grpc-protobuf:1.18.0'
    compile 'io.grpc:grpc-stub:1.18.0'

}


buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.5'
    }
}

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.5.1-1"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.18.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {
                setOutputSubDir  "java"
            }
        }
    }

    generateProtoTasks.generatedFilesBaseDir = "src"
}


tasks.withType(JavaCompile){
    options.encoding = "UTF-8"
}

sourceSets{
    main{
        proto{
            srcDir 'src/main/proto'
            srcDir 'src/main'
        }
    }
}

这里需要注意:proto文件的存放位置是有特殊要求,具体存放位置截图如下

proto文件存放位置

3.生成相应的proto的java代码,具体命令行是:

D:\Java\netty_lecture>gradle clean generateProto
命令行截图
生成代码截图

4.Server端的编写

package com.liyuanfeng.grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class GrpcServer {

    private Server server;

    private void start() throws IOException {
        this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();
        System.out.println("server started!");
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {

            System.out.println("关闭JVM");
            GrpcServer.this.stop();
        }));
        System.out.println("执行到这里");
    }

    private void stop() {
        if (null != server) {
            this.server.shutdown();
        }
    }

    private void awaitTermination() throws InterruptedException {
        if (null != server) {
            this.server.awaitTermination();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {

        GrpcServer grpcServer = new GrpcServer();
        grpcServer.start();
        grpcServer.awaitTermination();

    }
}

5.Service部分的代码编写

package com.liyuanfeng.grpc;

import com.liyuanfeng.proto.*;
import io.grpc.stub.StreamObserver;

import java.util.UUID;

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
    @Override
    public void getRealNameByUsername(MyRequest request, StreamObserver<MyResponse> responseObserver) {
        System.out.println("接收到客户端信息:" + request.getUsername());
        responseObserver.onNext(MyResponse.newBuilder().setRealname("李远锋").build());
        responseObserver.onCompleted();
    }

    @Override
    public void getStudentByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
        System.out.println("接收到了客户端信息:" + request.getAge());

        responseObserver.onNext(StudentResponse.newBuilder().setName("周杰伦").setAge(35).setCity("中国").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("林俊杰").setAge(35).setCity("台湾").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("李连杰").setAge(35).setCity("香港").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("林志颖").setAge(35).setCity("中国").build());
        responseObserver.onCompleted();

    }

    @Override
    public StreamObserver<StudentRequest> getStudentsWrapperByAges(StreamObserver<StudentResponseList> responseObserver) {
        return new StreamObserver<StudentRequest>() {
            @Override
            public void onNext(StudentRequest value) {
                System.out.println("onNext" + value.getAge());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                StudentResponse studentResponse1 = StudentResponse.newBuilder().setName("zhangsan").setAge(20).setCity("Beijing").build();
                StudentResponse studentResponse2 = StudentResponse.newBuilder().setName("lisi").setAge(20).setCity("Beijing").build();
                StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse1).addStudentResponse(studentResponse2).build();
                responseObserver.onNext(studentResponseList);
                responseObserver.onCompleted();
            }
        };
    }

    @Override
    public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
        return new StreamObserver<StreamRequest>() {
            @Override
            public void onNext(StreamRequest value) {
                System.out.println(value.getRequestInfo());

                responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
                responseObserver.onCompleted();
            }
        };
    }
}

6.客户端代码的编写

package com.liyuanfeng.grpc;

import com.liyuanfeng.proto.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.time.LocalDateTime;
import java.util.Iterator;

public class GrpcClient {
    public static void main(String[] args) throws InterruptedException {

        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
                .usePlaintext().build();//usePlaintext没有加密
        StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(managedChannel);


        StudentServiceGrpc.StudentServiceStub studentServiceStub = StudentServiceGrpc.newStub(managedChannel);

        MyResponse myResponse = blockingStub.getRealNameByUsername(MyRequest.newBuilder().setUsername("秦子豪").build());
        System.out.println(myResponse.getRealname());

        System.out.println("--------------------------------------");

        Iterator<StudentResponse> studentByAge = blockingStub.getStudentByAge(StudentRequest.newBuilder().setAge(20).build());

        while (studentByAge.hasNext()) {
            StudentResponse next = studentByAge.next();
            System.out.println(next.getName() + "," + next.getCity() + "," + next.getAge());
        }


        System.out.println("------------------------------------------------------------------------------");

        StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
            @Override
            public void onNext(StudentResponseList value) {

                value.getStudentResponseList().forEach(studentResponse -> {
                    System.out.println(studentResponse.getName());
                    System.out.println(studentResponse.getAge());
                    System.out.println(studentResponse.getCity());
                    System.out.println("********************");
                });

            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
            }
        };


        StreamObserver<StudentRequest> studentsWrapperByAges = studentServiceStub.getStudentsWrapperByAges(studentResponseListStreamObserver);
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(10).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(20).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(30).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(40).build());
        studentsWrapperByAges.onCompleted();




        StreamObserver<StreamRequest> streamRequestStreamObserver = studentServiceStub.biTalk(new StreamObserver<StreamResponse>() {
            @Override
            public void onNext(StreamResponse value) {
                System.out.println(value.getResponseInfo());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("lalala");
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
            }
        });

        for (int i = 0; i < 10; i++) {            streamRequestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
        }
        Thread.sleep(100000);
    }
}

相关文章

  • 8.gRPC四种数据传输

    1.proto文件的编写(gRPC基于proto3语法) 四种方式的数据传输: Unary RPCs where ...

  • 2018-01-11

    airodump-ng [选项] 无线网卡:采用无线信号进行数据传输的终端。无线网卡通常包括四种模式,分别是广 ...

  • node.js中的流

    node.js中的流是一种数据传输手段,流是有顺序的。流不关心整体流程,只管取出数据,获取数据后的操作 流有四种基...

  • ElasticSearch数据传输机制

    ElasticSearch的数据传输服务TransportService ElasticSearch的数据传输服务...

  • ###JAVA I/O

    I/O流用于解决设备之间的数据传输问题。比如内存和硬盘之间的数据传输或者网络之间的数据传输 一、字节流的传输 输入...

  • 2019-06-09 计算机网络 自顶向下方法 第三章

    运输层解决的问题是,数据传输的问题。数据传输分为不可靠传输和可靠数据传输,分别对应的是UDP协议和TCP协议。 运...

  • 传输层

    TCP 传输层解决的问题是,数据传输的问题。数据传输分为不可靠传输和可靠数据传输,分别对应的是UDP协议和TCP协...

  • γ运输层

    运输层解决的问题 运输层解决的问题是,数据传输的问题。数据传输分为不可靠传输和可靠数据传输,分别对应的是U...

  • Xamarin Essentials教程数据传输DataTran

    Xamarin Essentials教程数据传输DataTransfer 通过数据传输功能,应用程序可以将文本或网...

  • 一只产品旺眼里的技术名词(一)

    数据传输:前台页面与后台controller的数据传输 用户对前端界面进行操作,通过http or https请求...

网友评论

      本文标题:8.gRPC四种数据传输

      本文链接:https://www.haomeiwen.com/subject/euckuqtx.html