3 grpc流

作者: 轻舞凋零 | 来源:发表于2022-08-30 21:45 被阅读0次

    1 grpc的四种通信模式

    四种通信模式

    • 一元模式
    • 客户端流模式
    • 服务的流模式
    • 双向流模式

    2 一元模式

    前面已经实现了,这里不再赘述

    3 流模式

    在grpc里面引入了流的概念,其实和rxjava很像哦

    3.1 proto 定义

    syntax = "proto3";
    
    option java_package = "cn.beckbi.pb";
    option java_outer_classname = "AdInfo";
    
    
    
    message Ad {
      int32 id = 1;
      string name = 2;
      string description = 3;
      float price = 4;
    }
    message AdList {
      string createAt = 1;
      string traceId = 2;
      repeated Ad ads = 3;
    }
    
    message AdIdList{
      repeated int32 id =1;
    }
    
    service AdStream {
      //stream
      rpc addAdList(stream AdList) returns(AdIdList);
      rpc getAdList(AdIdList) returns(stream AdList);
    
    }
    

    3.2 服务端处理

    syntax = "proto3";
    
    option java_package = "cn.beckbi.pb";
    option java_outer_classname = "AdInfo";
    
    
    
    message Ad {
      int32 id = 1;
      string name = 2;
      string description = 3;
      float price = 4;
    }
    message AdList {
      string createAt = 1;
      string traceId = 2;
      repeated Ad ads = 3;
    }
    
    message AdIdList{
      repeated int32 id =1;
    }
    
    service AdStream {
      //stream
      rpc addAdList(stream AdList) returns(AdIdList);
      rpc getAdList(AdIdList) returns(stream AdList);
    
    }
    
    package cn.beckbi.server;
    
    import cn.beckbi.pb.AdInfo;
    import cn.beckbi.pb.AdStreamGrpc;
    
    import java.util.*;
    import java.util.logging.Logger;
    
    import io.grpc.Status;
    import io.grpc.StatusException;
    import io.grpc.stub.StreamObserver;
    
    
    /**
     * @program: kgrpc
     * @description:
     * @author: bikang
     * @create: 2022-08-28 08:02
     */
    public class AdStreamGrpcImpl extends AdStreamGrpc.AdStreamImplBase {
    
        private static final Logger logger = Logger.getLogger(AdStreamGrpcImpl.class.getName());
    
        private final Random random = new Random();
    
        private final Map<Integer, AdInfo.Ad> adInfoMap = new HashMap<>(50);
    
        @Override
        public io.grpc.stub.StreamObserver<AdInfo.AdList> addAdList(
                io.grpc.stub.StreamObserver<AdInfo.AdIdList> responseObserver) {
    
            return new StreamObserver<AdInfo.AdList>() {
    
                private AdInfo.AdIdList adIdList = AdInfo.AdIdList.newBuilder().build();
    
                @Override
                public void onNext(AdInfo.AdList value) {
                    if (value != null) {
                        logger.info("time:"+value.getCreateAt());
                        logger.info("traceId:"+value.getTraceId());
                        List<AdInfo.Ad> adList = value.getAdsList();
                        adList.stream().forEach( adInfo -> {
                            int id = random.nextInt(100000000);
                            AdInfo.Ad ad = AdInfo.Ad.newBuilder()
                                    .setId(id)
                                    .setName(adInfo.getName())
                                    .setPrice(adInfo.getPrice())
                                    .setDescription(adInfo.getDescription())
                                    .build();
                            adInfoMap.put(id, ad);
                            adIdList = adIdList.toBuilder().addId(id).build();
                            logger.info("create ad success : " + id );
                        });
                    }
                }
    
                @Override
                public void onError(Throwable t) {
                    logger.info("ad create error " + t.getMessage());
                }
                @Override
                public void onCompleted() {
                    logger.info("create  ad  success");
                    responseObserver.onNext(adIdList);
                    responseObserver.onCompleted();
                }
            };
        }
    
    
        @Override
        public void getAdList(AdInfo.AdIdList request,
                              io.grpc.stub.StreamObserver<AdInfo.AdList> responseObserver) {
    
            List<AdInfo.Ad> adList = new ArrayList<>(30);
    
            logger.info("request:"+request.getIdList().toString());
            request.getIdList().forEach( id ->
                {
                    if (adInfoMap.containsKey(id)) {
                        adList.add(adInfoMap.get(id));
                        logger.info( "result:"+adInfoMap.get(id).toString());
                    }
                }
            );
            logger.info("ad list"+adList.toString());
            responseObserver.onNext(AdInfo.AdList.newBuilder().addAllAds(adList).build());
            responseObserver.onCompleted();
        }
    
    }
    

    3.3 客户端代码

    package cn.beckbi.client;
    
    import cn.beckbi.pb.AdInfo;
    import cn.beckbi.pb.AdStreamGrpc;
    import com.alibaba.fastjson2.JSON;
    import io.grpc.ManagedChannel;
    import io.grpc.ManagedChannelBuilder;
    import io.grpc.stub.StreamObserver;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.logging.Logger;
    
    
    /**
     * @program: kgrpc
     * @description:
     * @author: bikang
     * @create: 2022-08-27 23:56
     */
    public class AdClient {
    
        private static final Logger logger = Logger.getLogger(AdClient.class.getName());
    
        public static void main(String[] args) throws InterruptedException {
            ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11112)
                    .usePlaintext()
                    .build();
            AdStreamGrpc.AdStreamBlockingStub stub = AdStreamGrpc.newBlockingStub(channel);
            AdStreamGrpc.AdStreamStub asyncStub = AdStreamGrpc.newStub(channel);
    
            //create ad in list
            AdInfo.AdIdList adIdList = createAdList(asyncStub);
            logger.info("result:"+ adIdList.toString());
    
    
    
    
            List<AdInfo.AdList> adListList = getAdList(stub, adIdList);
            logger.info("adListList:"+ adListList.toString());
    
    
            channel.shutdown();
        }
    
        public static List<AdInfo.AdList> getAdList(AdStreamGrpc.AdStreamBlockingStub stub, AdInfo.AdIdList adIdList) {
    
            List<AdInfo.AdList> adListList = new ArrayList<>();
    
            Iterator<AdInfo.AdList> iterator = stub.getAdList(adIdList);
            while (iterator.hasNext()) {
                AdInfo.AdList adList = iterator.next();
                adListList.add(adList);
            }
            return adListList;
        }
    
        public static AdInfo.AdIdList createAdList(AdStreamGrpc.AdStreamStub asyncStub ) {
    
            List<Integer> allIdList = new ArrayList<>(30);
    
            
            AdInfo.Ad ad1 = AdInfo.Ad.newBuilder()
                    .setName("cpl1")
                    .setDescription("cpl ad")
                    .setPrice(11.21f)
                    .build();
            AdInfo.Ad ad2 = AdInfo.Ad.newBuilder()
                    .setName("cp21")
                    .setDescription("cpl ad")
                    .setPrice(21.21f)
                    .build();
            AdInfo.Ad ad3 = AdInfo.Ad.newBuilder()
                    .setName("cpl3")
                    .setDescription("cpl ad")
                    .setPrice(31.21f)
                    .build();
            AdInfo.Ad ad4 = AdInfo.Ad.newBuilder()
                    .setName("cpl1")
                    .setDescription("cpl ad")
                    .setPrice(41.21f)
                    .build();
            AdInfo.Ad ad5 = AdInfo.Ad.newBuilder()
                    .setName("cpl1")
                    .setDescription("cpl ad")
                    .setPrice(51.21f)
                    .build();
    
    
            final CountDownLatch countDownLatch = new CountDownLatch(1);
    
            StreamObserver<AdInfo.AdIdList> adIdListStreamObserver = new StreamObserver<AdInfo.AdIdList>() {
                @Override
                public void onNext(AdInfo.AdIdList adIdList) {
                    adIdList.getIdList().stream().forEach(
                            id ->   {
                                logger.info("create ad success : " + id);
                                allIdList.add(id);
                            });
                }
    
                @Override
                public void onError(Throwable t) {
                    logger.info("error:"+t.getMessage());
                }
    
                @Override
                public void onCompleted() {
                    logger.info("create success");
                    countDownLatch.countDown();
                }
            };
    
            StreamObserver<AdInfo.AdList> createAdObserver = asyncStub.addAdList(adIdListStreamObserver);
    
            AdInfo.AdList adList1 = AdInfo.AdList.newBuilder()
                    .addAds(ad1)
                    .addAds(ad2)
                    .addAds(ad3)
                    .addAds(ad4)
                    .addAds(ad5)
                    .setTraceId("a")
                    .setCreateAt("2022-01-01 01:01:01")
                    .build();
    
            AdInfo.AdList adList2 = AdInfo.AdList.newBuilder()
                    .addAds(ad1)
                    .addAds(ad2)
                    .addAds(ad3)
                    .addAds(ad4)
                    .addAds(ad5)
                    .setTraceId("b")
                    .setCreateAt("2022-02-02 01:01:01")
                    .build();
    
            AdInfo.AdList adList3 = AdInfo.AdList.newBuilder()
                    .addAds(ad1)
                    .addAds(ad2)
                    .addAds(ad3)
                    .addAds(ad4)
                    .addAds(ad5)
                    .setTraceId("c")
                    .setCreateAt("2022-03-03 01:01:01")
                    .build();
            createAdObserver.onNext(adList1);
            createAdObserver.onNext(adList2);
            createAdObserver.onNext(adList3);
    
            if (countDownLatch.getCount() == 0) {
                logger.warning("rpc error");
                return null;
            }
            createAdObserver.onCompleted();
    
    
            try {
                if (!countDownLatch.await(10, TimeUnit.SECONDS)) {
                    logger.warning("failed after 10 seconds");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return AdInfo.AdIdList.newBuilder().addAllId(allIdList).build();
        }
    }
    

    4 附录

    项目源码: https://github.com/beckbikang/kgrpc

    本地试过可以运行

    相关文章

      网友评论

          本文标题:3 grpc流

          本文链接:https://www.haomeiwen.com/subject/xvtcnrtx.html