grpc实战——服务端流式调用

作者: 程序员Sunny | 来源:发表于2018-06-10 18:31 被阅读230次
    本文地址:https://www.jianshu.com/p/cf2c58817568
    传送门:

    还记得很久之前Sunny有和大家聊过如何用grpc实现一个简单的名称解析服务,当时用的grpc简单调用。这次我们本着从易到难的原则,对上次的更进一步,实现服务端流式调用。之后还会继续出客户端流式调用和双向流式调用的文章,喜欢的朋友可以继续关注。

    这次我们的背景还是构建一个名称解析服务,但是有所不同的是,我们这次一个名称可能对应多个ip(这在实际生活中也有应用,比如DNS负载均衡)。

    服务端

    首先还是来看pom.xml文件,这次我们和上次有所不同,我直接使用grpc-all依赖一次性导入所需的依赖,具体依赖部分代码如下:

            <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-all</artifactId>
                <version>${grpc.version}</version>
            </dependency>
    

    这里我们版本还是和之前选择的一样,为1.12.0。其他部分和之前的一样,想了解的童鞋可以看看源码也可以看传送门中的上一篇文章。
    然后就是比较重要的proto文件了,定义了我们的服务,这里和grpc简单服务略有不同:

    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_package = "io.grpc.examples.nameservers";
    option java_outer_classname = "NameProto";
    option objc_class_prefix = "NSS";
    
    package nameservers;
    
    // 定义服务
    service NameServers {
        // 服务中的方法,用于根据Name类型的参数获得一系列ip,以流的方式返回
        rpc getIpsByName (Name) returns (stream Ip) {}
    }
    //定义Name消息类型,其中name为其序列为1的字段
    message Name {
        string name = 1;
    }
    //定义Ip消息类型,其中ip为其序列为1的字段
    message Ip {
        string ip = 1;
    }
    

    细心的童鞋可能发现了,这个基本上和上次一模一样,除了服务的名称还有配置项可能有所不同,其他就是在returns中,Ip前面多了一个stream。这个就是我们所说的服务端流式的服务定义方式了。接着还是利用maven插件来进行编译得到相应的java代码。


    图片.png

    这里我们还是从服务端开始写,也是分为两个部分,一个用于开启远程调用服务,接收客户端发来的调用请求,类名为NameServer;另一个则是实现真正的服务,类名为NameServersImplBaseImpl。这里NameServer代码变化不大,直接贴上代码:

    public class NameServer {
    
        private Logger logger = Logger.getLogger(NameServer.class.getName());
    
        private static final int DEFAULT_PORT = 8088;
    
        private int port;//服务端口号
    
        private Server server;
    
    
        public NameServer(int port) {
            this(port,ServerBuilder.forPort(port));
        }
    
        public NameServer(int port, ServerBuilder<?> serverBuilder){
    
            this.port = port;
    
            //构造服务器,添加我们实际的服务
            server = serverBuilder.addService(new NameServersImplBaseImpl()).build();
    
        }
    
        private void start() throws IOException {
            server.start();
            logger.info("Server has started, listening on " + port);
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
    
                    NameServer.this.stop();
    
                }
            });
        }
    
        private void stop() {
    
            if(server != null)
                server.shutdown();
    
        }
    
        //阻塞到应用停止
        private void blockUntilShutdown() throws InterruptedException {
            if (server != null) {
                server.awaitTermination();
            }
        }
    
        public static void main(String[] args) throws IOException, InterruptedException {
    
            NameServer nameServer;
    
            if(args.length > 0){
                nameServer = new NameServer(Integer.parseInt(args[0]));
            }else{
                nameServer = new NameServer(DEFAULT_PORT);
            }
    
            nameServer.start();
    
            nameServer.blockUntilShutdown();
    
        }
    }
    

    实际提供服务的代码有所改变,首先,我们之前用map来存储Name和Ip的映射关系,现在因为存在同Name多Ip的情况,我们改用list来存储,如果数据实际存储于数据库就看具体字段的约束。然后增加了一个类DataType用于表示Name和Ip数据对。在实际的服务中对每次请求都需要遍历一遍list,将Name符合的Ip返回给客户端,这里的返回就是以流的方式返回的。

    public class NameServersImplBaseImpl extends NameServersGrpc.NameServersImplBase {
    
        //记录名称内容的list,实际项目中应该放置在数据库
        private List<DataType> list = new ArrayList<DataType>();
    
        //构造方法中加入一些条目
        public NameServersImplBaseImpl() {
    
            list.add(new DataType(Name.newBuilder().setName("Sunny").build(),Ip.newBuilder().setIp("125.216.242.51").build()));
            list.add(new DataType(Name.newBuilder().setName("Sunny").build(),Ip.newBuilder().setIp("126.216.242.51").build()));
            list.add(new DataType(Name.newBuilder().setName("David").build(),Ip.newBuilder().setIp("117.226.178.139").build()));
            list.add(new DataType(Name.newBuilder().setName("David").build(),Ip.newBuilder().setIp("117.227.178.139").build()));
            list.add(new DataType(Name.newBuilder().setName("Tom").build(),Ip.newBuilder().setIp("111.222.336.11").build()));
            list.add(new DataType(Name.newBuilder().setName("Tom").build(),Ip.newBuilder().setIp("111.333.336.11").build()));
            list.add(new DataType(Name.newBuilder().setName("Tom").build(),Ip.newBuilder().setIp("111.222.335.11").build()));
    
        }
    
    
        @Override
        public void getIpsByName(Name requestName, StreamObserver<Ip> responseObserver) {
    
            Iterator<DataType> iter = list.iterator();
    
            while (iter.hasNext()){
    
                DataType data = iter.next();
    
                if(requestName.equals(data.getName())){
    
                    System.out.println("get " + data.getIp() + " from " + requestName);
    
                    responseObserver.onNext(data.getIp());
                }
            }
            responseObserver.onCompleted();
        }
    }
    

    DataType类如下:

    class DataType{
        private Name name;
        private Ip ip;
    
        public DataType(Name name, Ip ip) {
            this.name = name;
            this.ip = ip;
        }
    
        public Name getName() {
            return name;
        }
    
        public void setName(Name name) {
            this.name = name;
        }
    
        public Ip getIp() {
            return ip;
        }
    
        public void setIp(Ip ip) {
            this.ip = ip;
        }
    }
    

    上面代码中,list使用迭代器的方式遍历,这个大家也可以使用下标的方式,大家需要关注的是,这里使用了Name的equals方法,Sunny怎么就敢用equals方法,确定Name类中有重写equals方法吗?确实重写了,我们来看它的equals方法长什么样。

    public boolean equals(final java.lang.Object obj) {
        if (obj == this) {
         return true;
        }
        if (!(obj instanceof io.grpc.examples.nameservers.Name)) {
          return super.equals(obj);
        }
        io.grpc.examples.nameservers.Name other = (io.grpc.examples.nameservers.Name) obj;
    
        boolean result = true;
        result = result && getName()
            .equals(other.getName());
        result = result && unknownFields.equals(other.unknownFields);
        return result;
      }
    

    不出意料,首先比较是不是同一个对象,如果是那还说啥,直接true。如果传入的这个对象不是Name类,那么就调用父类的equals方法。紧接着进行了一次类型强转。然后定义了一个boolean类型的变量result,实现起来也比较巧妙,利用&&的方法来确保符合所有两个条件,第一个就是两个消息中name字段是否equals——这里是String类型的,实际调用了String的equals。然后还要比较unknownFields是否也equals,这里unknownFields是Name父类中的一个UnknownFieldSet类型的成员变量。根据名字大概可以猜到这就是未知的字段,我们这里的unknownFields中维护的fields对象应该就是null了。因此实际上判断两个Name类型是否equals,只需要判断它们维护的name这个String类型的值是否equals。
    到这里,服务端基本完成了。


    客户端

    首先是pom.xml,这里我们和服务端一样用grpc-all依赖。然后是proto生成java类,这里我们采取和上一篇不同的方法,上一篇我们是用proto文件重新编译,然后生成的。这里,我们直接将服务端生成的代码拷贝到客户端的目录中。


    图片.png

    客户端NameClient中的代码也有所不同,它需要接收一系列的服务端发过来的流消息。话不多说,我们直接上代码:

    public class NameClient {
    
        private static final String DEFAULT_HOST = "localhost";
    
        private static final int DEFAULT_PORT = 8088;
    
        private ManagedChannel managedChannel;
    
        //服务存根,用于客户端本地调用
        private NameServersGrpc.NameServersBlockingStub nameServiceBlockingStub;
    
        public NameClient(String host, int port) {
    
            this(ManagedChannelBuilder.forAddress(host,port).usePlaintext(true).build());
    
        }
    
        public NameClient(ManagedChannel managedChannel) {
            this.managedChannel = managedChannel;
            this.nameServiceBlockingStub = NameServersGrpc.newBlockingStub(managedChannel);
        }
    
        public void shutdown() throws InterruptedException {
            managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
        }
    
        public List<Ip> getIpsByName(String n){
    
            List<Ip> result = new ArrayList<Ip>();
    
            Name name = Name.newBuilder().setName(n).build();
    
            Iterator<Ip> iterator = nameServiceBlockingStub.getIpsByName(name);
    
            while (iterator.hasNext()){
    
                result.add(iterator.next());
    
            }
            return result;
        }
    
        public static void main(String[] args) {
    
            NameClient nameClient = new NameClient(DEFAULT_HOST,DEFAULT_PORT);
    
            for(String arg : args){
    
                List<Ip> result = nameClient.getIpsByName(arg);
    
                for(int i=0;i<result.size();i++){
                    System.out.println("get result from server: " + result.get(i) + " as param is " + arg);
                }
            }
        }
    }
    

    还是一样,上一篇说过的我们就不再老生常谈了,想了解的童鞋请移步传送门。我们来看不一样的地方,getIpsByName方法有所变化:

    public List<Ip> getIpsByName(String n){
    
            List<Ip> result = new ArrayList<Ip>();
    
            Name name = Name.newBuilder().setName(n).build();
    
            Iterator<Ip> iterator = nameServiceBlockingStub.getIpsByName(name);
    
            while (iterator.hasNext()){
    
                result.add(iterator.next());
    
            }
            return result;
        }
    
    

    这里我们的返回值不是简单的Ip了,而是一个Ip类型的迭代器,这里我们还是选择再得到迭代器后把结果包装进一个list中,然后返回这个list。相应地,这样在main函数中调用getIpsByName的方法的时候因为返回值有所不同了,所以也需要有相应的改变。


    运行验证结果

    我们首先运行服务端NameServer,得到结果:

    Jun 10, 2018 6:24:19 PM com.sunny.NameServer start
    信息: Server has started, listening on 8088
    

    服务在8088端口上启动了,只要有客户端连接这个端口并发送请求即可。
    然后启动客户端,注意在启动前加入程序参数

    Sunny David Tom
    
    图片.png

    启动客户端NameClient,得到了我们请求的结果:

    get result from server: ip: "125.216.242.51"
     as param is Sunny
    get result from server: ip: "126.216.242.51"
     as param is Sunny
    get result from server: ip: "117.226.178.139"
     as param is David
    get result from server: ip: "117.227.178.139"
     as param is David
    get result from server: ip: "111.222.336.11"
     as param is Tom
    get result from server: ip: "111.333.336.11"
     as param is Tom
    get result from server: ip: "111.222.335.11"
     as param is Tom
    

    同样在服务端的控制台,我们也可以看到我们打印的服务调用信息:

    get ip: "125.216.242.51"
     from name: "Sunny"
    
    get ip: "126.216.242.51"
     from name: "Sunny"
    
    get ip: "117.226.178.139"
     from name: "David"
    
    get ip: "117.227.178.139"
     from name: "David"
    
    get ip: "111.222.336.11"
     from name: "Tom"
    
    get ip: "111.333.336.11"
     from name: "Tom"
    
    get ip: "111.222.335.11"
     from name: "Tom"
    

    至此,一个grpc服务端流式调用就做完了。
    再次说明一下,点击grpc源码可以看到本项目的源码,里面也包括了上一篇的源码。欢迎大家fork实践体验。
    另外,欢迎大家转载,转载时请注明出处,谢谢!


    童鞋们如果有疑问或者想和我交流的话有两种方式:

    第一种

    评论留言

    第二种

    邮箱联系:zsunny@yeah.net

    相关文章

      网友评论

        本文标题:grpc实战——服务端流式调用

        本文链接:https://www.haomeiwen.com/subject/pmtqeftx.html