美文网首页
Grpc升级1.3.0后,服务路由该怎么做

Grpc升级1.3.0后,服务路由该怎么做

作者: linking12 | 来源:发表于2017-05-17 15:45 被阅读1408次
    摘要

    Gprc Java最近1.3.0 Release了

    1. 首先是Keepalived机制
      1)客户端的Keepalives设置可以work了
      2)设置Keepalives后,在整个网络连接建立完成之后,会不断的发送ping消息给服务端
      3)服务端根据Keppavlied的ping消息来自动识别哪些连接是断了的
    2. 服务端可以设置连接的时效了
      1)当设置连接的最大时间到了,该连接将会中断掉
    3. 增加了trace的一些传递
    4. 对LoadBalancers进行了一些变化
    重点说说LoadBalancer的一些变化

    在1.1.0版本,整个Lb的工厂类是这样的

     private static class RoundRobinLoadBalancer<T> extends LoadBalancer<T> {
            private RoundRobinLoadBalancer(TransportManager<T> tm){
                this.tm = tm;
            }
    

    而TransportManager是建立trsnsport的管理通道,而真正做负载均衡是这样的

    @Override
        public T pickTransport(Attributes affinity) {
          final RoundRobinServerList<T> addressesCopy;
          synchronized (lock) {
            if (closed) {
              return tm.createFailingTransport(SHUTDOWN_STATUS);
            }
            if (addresses == null) {
              if (nameResolutionError != null) {
                return tm.createFailingTransport(nameResolutionError);
              }
              if (interimTransport == null) {
                interimTransport = tm.createInterimTransport();
              }
              return interimTransport.transport();
            }
            addressesCopy = addresses;
          }
          return addressesCopy.getTransportForNextServer();
        }
    

    而现在这些都没了,变成了一个叫做Subchannel来做这件事情了
    针对Subchannel在他的注释里写了

     /**
       * A logical connection to a server, or a group of equivalent servers represented by an {@link 
       * EquivalentAddressGroup}.
       *
       * <p>It maintains at most one physical connection (aka transport) for sending new RPCs, while
       * also keeps track of previous transports that has been shut down but not terminated yet.
       *
       * <p>If there isn't an active transport yet, and an RPC is assigned to the Subchannel, it will
       * create a new transport.  It won't actively create transports otherwise.  {@link
       * #requestConnection requestConnection()} can be used to ask Subchannel to create a transport if
       * there isn't any.
       */
    

    大概意思是一个逻辑连接而非真正的物理连接,里面有可能有一个或多个地址来
    他维护了一个真正的物理连接去真正的建立Rpc的连接,而当连接状态还没有变成活动的时候,他将建立一个transport

    所以这里比较有意思:
    首先,grpc针对transport再进行了一次封装,把transport的建立提前了,在从nameresove拿到ip地址后,马上就会进行requestConnection

    @Override
        public void requestConnection() {
          subchannel.obtainActiveTransport();
        }
    
     @Nullable
      ClientTransport obtainActiveTransport() {
        ClientTransport savedTransport = activeTransport;
        if (savedTransport != null) {
          return savedTransport;
        }
        try {
          synchronized (lock) {
            savedTransport = activeTransport;
            // Check again, since it could have changed before acquiring the lock
            if (savedTransport != null) {
              return savedTransport;
            }
            if (state.getState() == IDLE) {
              gotoNonErrorState(CONNECTING);
              startNewTransport();
            }
          }
        } finally {
          channelExecutor.drain();
        }
        return null;
    

    而真正实现负载均衡的是SubchannelPicker

    /**
       * The main balancing logic.  It <strong>must be thread-safe</strong>. Typically it should only
       * synchronize on its own state, and avoid synchronizing with the LoadBalancer's state.
       *
       * <p>Note: Implementations should override exactly one {@code pickSubchannel}.
       */
      @ThreadSafe
      public abstract static class SubchannelPicker {
        /**
         * Make a balancing decision for a new RPC.
         *
         * @param args the pick arguments
         */
        public abstract PickResult pickSubchannel(PickSubchannelArgs args);
      }
    

    所以,在这里的整个负载均衡机制和原来1.1.0版本已经完全不同,如果想在负载均衡前面再加一次路由规则的限制的话,需要重新修改
    我是这样修改的:
    对SubchannelPicker进行扩展,当选择做负载均衡的时候,把某一些建立的Subchannel给踢掉

     @Override
        public PickResult pickSubchannel(PickSubchannelArgs args) {
            Map<String, Object> affinity = args.getCallOptions().getOption(GrpcClientCall.CALLOPTIONS_CUSTOME_KEY);
            GrpcURL refUrl = (GrpcURL) affinity.get(GrpcClientCall.GRPC_REF_URL);
            if (size > 0) {
                Subchannel subchannel = nextSubchannel(refUrl);
                affinity.put(GrpcClientCall.GRPC_NAMERESOVER_ATTRIBUTES, nameResovleCache);
                return PickResult.withSubchannel(subchannel);
            }
            if (status != null) {
                return PickResult.withError(status);
            }
    
            return PickResult.withNoResult();
        }
    
        private Subchannel nextSubchannel(GrpcURL refUrl) {
            if (size == 0) {
                throw new NoSuchElementException();
            }
            synchronized (this) {
                Subchannel val = list.get(index);
                index++;
                if (index >= size) {
                    index = 0;
                }
               //剔除不合规的Subchannel
                boolean discard = discard(refUrl, val);
                if (discard && index != 0) {
                    nextSubchannel(refUrl);
                }
                return val;
            }
        }
    

    但是这样有一个问题,剔除掉Subchannel,但是其实其Transport其实还在,还是浪费了整个客户端的资源,这个问题暂时也没有好的办法,如果把该Subchannel给shutdown掉,由于在接到了nameresovle的地址列表了,已经建立起了transpot了,如果shutdown了,将会触发nameresovle的refresh,重新获取地址,这样又是一个重复的循环,并不合算,所以只能是暂时浪费了一点内存来做服务路由了

    扩展后代码如下:
    https://github.com/linking12/saluki/blob/master/saluki-core/src/main/java/com/quancheng/saluki/core/grpc/GrpcRoutePicker.java

    文章写的比较乱,主要是在升级Grpc后碰到的一些问题,然后想出的一些解决办法,希望看得懂
    另外,吐槽一下,在选择很多人说自己写个rpc,他搞一下,他也弄一下,其实rpc不是那么简单的传输一个数据就拉倒这么简单,维护连接的状态,传输的数据这些都是要方方面面需要考虑的事情,现在业界有许多rpc的基础组件在了,如果用在生产上就不要自己造个轮子重新实现一套rpc
    基于http2的有grpc、armeria都是一个比较靠谱的选型,而不是一来就我要自己造一个轮子实现以下rpc

    相关文章

      网友评论

          本文标题:Grpc升级1.3.0后,服务路由该怎么做

          本文链接:https://www.haomeiwen.com/subject/etydxxtx.html