美文网首页
SOFARPC 源码分析3 - 各种 Config 配置类

SOFARPC 源码分析3 - 各种 Config 配置类

作者: 原水寒 | 来源:发表于2018-11-24 14:41 被阅读35次

    本节大致记录 SOFARPC 中各个 Config 类的配置属性及方法,浏览一遍,基本上就对 SOFARPC 的核心功能有了印象。同时,对各个配置类的整体设计也会比较清楚。后续在分析到各种配置的时候,可以回来看一下。


    image.png

    关于配置优先级等,可以参考:http://www.sofastack.tech/sofa-rpc/docs/Publish-And-Reference

    一、 注册中心配置类 RegistryConfig

    public class RegistryConfig extends AbstractIdConfig implements Serializable {
        // 注册中心协议:默认是 zookeeper
        private String                protocol         = getStringValue(DEFAULT_REGISTRY);
        // 指定注册中心的地址, 和index必须填一个,address优先
        private String                address;
        // 指定注册中心寻址服务的地址, 和address必须填一个
        private String                index            = getStringValue(REGISTRY_INDEX_ADDRESS);
        // 是否注册,如果是false只订阅不注册,默认true
        private boolean               register         = getBooleanValue(SERVICE_REGISTER);
        // 是否订阅服务,默认true
        private boolean               subscribe        = getBooleanValue(SERVICE_SUBSCRIBE);
        // 调用注册中心超时时间,默认10000
        private int                   timeout          = getIntValue(REGISTRY_INVOKE_TIMEOUT);
        // 连接注册中心超时时间,默认20000
        private int                   connectTimeout   = getIntValue(REGISTRY_CONNECT_TIMEOUT);
        // 保存到本地文件的位置,默认$HOME下
        private String                file;
        // 是否批量操作,默认为true
        private boolean               batch            = getBooleanValue(REGISTRY_BATCH);
        // 定时批量检查时的条目数,默认为10
        private int                   batchSize        = getIntValue(REGISTRY_BATCH_SIZE);
        // Consumer给Provider发心跳的间隔,默认为30000
        protected int                 heartbeatPeriod  = getIntValue(REGISTRY_HEARTBEAT_PERIOD);
        // Consumer给Provider重连的间隔,默认为30000
        protected int                 reconnectPeriod  = getIntValue(REGISTRY_RECONNECT_PERIOD);
        // The Parameters. 自定义参数
        protected Map<String, String> parameters;
    }
    

    二、 通信服务配置类 ServerConfig

    public class ServerConfig extends AbstractIdConfig implements Serializable {
        /*------------- 参数配置项开始-----------------*/
        // 通信协议,默认为bolt
        protected String                          protocol         = getStringValue(DEFAULT_PROTOCOL);
    
        // 实际监听IP,与网卡对应,默认为0.0.0.0
        protected String                          host             = getStringValue(SERVER_HOST);
    
        // 监听端口,默认为12200
        protected int                             port             = getIntValue(SERVER_PORT_START);
    
        // 基本路径,默认为""
        protected String                          contextPath      = getStringValue(SERVER_CONTEXT_PATH);
    
        // io线程池大小,默认为0
        protected int                             ioThreads        = getIntValue(SERVER_IOTHREADS);
    
        // 线程池类型,默认是 cached
        protected String                          threadPoolType   = getStringValue(SERVER_POOL_TYPE);
    
        // 业务线程池大小,默认是20
        protected int                             coreThreads      = getIntValue(SERVER_POOL_CORE);
    
        // 业务线程池大小,默认是200
        protected int                             maxThreads       = getIntValue(SERVER_POOL_MAX);
    
        // 是否允许telnet,针对自定义协议,默认是true
        protected boolean                         telnet           = getBooleanValue(SERVER_TELNET);
    
        // 线程池类型,默认普通线程池 normal
        protected String                          queueType        = getStringValue(SERVER_POOL_QUEUE_TYPE);
    
        // 业务线程池队列大小,默认为0
        protected int                             queues           = getIntValue(SERVER_POOL_QUEUE);
    
        // 默认业务线程池回收时间,默认60000
        protected int                             aliveTime        = getIntValue(SERVER_POOL_ALIVETIME);
    
        // 线程池是否初始化核心线程,默认false
        protected boolean                         preStartCore     = getBooleanValue(SERVER_POOL_PRE_START);
    
        // 服务端允许客户端建立的连接数,默认100000
        protected int                             accepts          = getIntValue(SERVER_ACCEPTS);
    
        // 序列化方式,默认hessian2
        protected String                          serialization    = getStringValue(DEFAULT_SERIALIZATION);
    
        // The Parameters. 自定义参数
        protected Map<String, String>             parameters;
    
        // 镜像ip,例如监听地址是1.2.3.4,告诉注册中心的确是3.4.5.6
        protected String                          virtualHost;
    
        // 镜像端口
        protected Integer                         virtualPort;
    
        // 连接事件监听器实例,连接或者断开时触发
        protected transient List<ChannelListener> onConnect;
    
        // 是否启动epoll,默认为false
        protected boolean                         epoll            = getBooleanValue(SERVER_EPOLL);
    
        // 是否hold住端口,true的话随主线程退出而退出,false的话则要主动退出,默认为true
        protected boolean                         daemon           = getBooleanValue(SERVER_DAEMON);
    
        // 端口是否自适应,如果端口被占用,自动+1;默认false
        protected boolean                         adaptivePort     = getBooleanValue(SEVER_ADAPTIVE_PORT);
    
        // 网络层协议,默认为netty4
        protected String                          transport        = getStringValue(DEFAULT_TRANSPORT);
    
        // 服务端是否自动启动,默认为true
        protected boolean                         autoStart        = getBooleanValue(SEVER_AUTO_START);
    
        // 服务端关闭超时时间,默认20000
        protected int                             stopTimeout      = getIntValue(SERVER_STOP_TIMEOUT);
    
        // 是否维持长连接,默认为true
        protected boolean                         keepAlive        = getBooleanValue(TRANSPORT_SERVER_KEEPALIVE);
    
        /*------------- 参数配置项结束-----------------*/
        // 服务端对象
        private transient volatile Server         server;
    
        // 绑定的地址。是某个网卡,还是全部地址
        private transient String                  boundHost;
    
        // 启动服务
        public synchronized Server buildIfAbsent() {
            if (server != null) {
                return server;
            }
            // 创建并初始化 Server 实例
            server = ServerFactory.getServer(this);
            return server;
        }
    
        // 关闭服务
        public synchronized void destroy() {
            if (server != null) {
                server.destroy();
            }
        }
    }
    

    三、接口公共配置类 AbstractInterfaceConfig

    /**
     * 接口级的公共配置
     * @param <T> the interface
     * @param <S> the sub class of AbstractInterfaceConfig
     */
    public abstract class AbstractInterfaceConfig<T, S extends AbstractInterfaceConfig> extends AbstractIdConfig<S> implements Serializable {
        /*-------------配置项开始----------------*/
        // 应用信息
        protected ApplicationConfig application = new ApplicationConfig();
        // 服务接口:做为服务唯一标识的组成部分, 不管普通调用和泛化调用,都是设置实际的接口类名称,
        protected String interfaceId;
        // 服务标签:做为服务唯一标识的组成部分, 默认为""
        protected String uniqueId = getStringValue(DEFAULT_UNIQUEID);
        // 过滤器配置实例
        protected transient List<Filter> filterRef;
        // 过滤器配置别名,多个用逗号隔开
        protected List<String> filter;
        // 注册中心配置,可配置多个
        protected List<RegistryConfig> registry;
        // 方法配置,可配置多个
        protected Map<String, MethodConfig> methods;
        // 默认序列化, 默认hessian2
        protected String serialization = getStringValue(DEFAULT_SERIALIZATION);
        // 是否注册,如果是false只订阅不注册,默认true
        protected boolean register = getBooleanValue(SERVICE_REGISTER);
        // 是否订阅服务,默认true
        protected boolean subscribe = getBooleanValue(SERVICE_SUBSCRIBE);
        // 代理类型,默认javassist
        protected String proxy = getStringValue(DEFAULT_PROXY);
        // 服务分组:不做为服务唯一标识的一部分,请直接使用 {@link #uniqueId} 代替
        @Deprecated
        protected String group = getStringValue(DEFAULT_GROUP);
        // 服务版本:不做为服务唯一标识的一部分,请直接使用 {@link #uniqueId} 代替
        protected String version = getStringValue(DEFAULT_VERSION);
        // 结果缓存实现类
        protected transient Cache cacheRef;
        // Mock实现类
        protected transient T mockRef;
        // 自定义参数
        protected Map<String, String> parameters;
    
        /*-------- 下面是方法级配置 --------*/
        // 是否启动结果缓存
        protected boolean cache;
        // 是否开启mock
        protected boolean mock;
        // 是否开启参数验证(jsr303)
        protected boolean validation;
        // 压缩算法,为空则不压缩
        protected String compress;
        /*-------------配置项结束----------------*/
    
        // 方法名称和方法参数配置的map,不需要遍历list
        protected transient volatile Map<String, Object> configValueCache = null;
        // 代理接口类,和T对应,主要针对泛化调用
        protected transient volatile Class proxyClass;
        // 服务配置的listener
        protected transient volatile ConfigListener configListener;
        
        // Gets proxy class.
        protected abstract Class<?> getProxyClass();
        // 构造关键字方法 @return 唯一标识 string
        protected abstract String buildKey();
        // 是否有超时配置
        public abstract boolean hasTimeout();
        // 是否有并发限制配置
        public abstract boolean hasConcurrents();
    
        // 除了判断自己,还有判断下面方法的自定义判断
        public boolean hasValidation() {
            if (validation) {
                return true;
            }
            if (CommonUtils.isNotEmpty(methods)) {
                for (MethodConfig methodConfig : methods.values()) {
                    if (CommonUtils.isTrue(methodConfig.getValidation())) {
                        return true;
                    }
                }
            }
            return false;
        }
    
        // 是否有缓存
        public boolean hasCache() {
            if (isCache()) {
                return true;
            }
            if (CommonUtils.isNotEmpty(methods)) {
                for (MethodConfig methodConfig : methods.values()) {
                    if (CommonUtils.isTrue(methodConfig.getCache())) {
                        return true;
                    }
                }
            }
            return false;
        }
    
        // 是否有token配置
        public boolean hasToken() {
            // .token 配置
            if (getParameter(RpcConstants.HIDDEN_KEY_TOKEN) != null) {
                return true;
            }
            if (CommonUtils.isNotEmpty(methods)) {
                for (MethodConfig methodConfig : methods.values()) {
                    if (methodConfig.getParameter(RpcConstants.HIDDEN_KEY_TOKEN) != null) {
                        return true;
                    }
                }
            }
            return false;
        }
    }
    

    四、提供端配置类 ProviderConfig

    public class ProviderConfig<T> extends AbstractInterfaceConfig<T, ProviderConfig<T>> implements Serializable {
        /*---------- 参数配置项开始 ------------*/
        // 接口实现类引用
        protected transient T ref;
        // 配置的 Server 列表
        protected List<ServerConfig> server;
        // 服务发布延迟,单位毫秒,默认为-1
        protected int delay = getIntValue(PROVIDER_DELAY);
        // 服务端权重,默认为100
        protected int weight = getIntValue(PROVIDER_WEIGHT);
        // 发布方法:默认全部 *
        protected String include = getStringValue(PROVIDER_INCLUDE);
        // 不发布的方法列表,逗号分隔,默认为""
        protected String exclude = getStringValue(PROVIDER_EXCLUDE);
        // 是否动态注册,默认为true,配置为false代表不主动发布,需要到管理端进行上线操作
        protected boolean dynamic = getBooleanValue(PROVIDER_DYNAMIC);
        // 服务优先级,越大越高
        protected int priority = getIntValue(PROVIDER_PRIORITY);
        // 启动辅助类 key
        protected String bootstrap;
        // 自定义线程池
        protected transient ThreadPoolExecutor executor;
    
        /*-------- 下面是方法级可覆盖配置 --------*/
        // 服务端执行超时时间(毫秒),不会打断执行线程,只是打印警告,默认为0,表示不判断
        protected int timeout = getIntValue(PROVIDER_INVOKE_TIMEOUT);
        // 接口下每方法的最大可并行执行请求数,配置-1关闭并发过滤器,等于0表示开启过滤但是不限制,默认为0
        protected int concurrents = getIntValue(PROVIDER_CONCURRENTS);
        /*---------- 参数配置项结束 ------------*/
    
        // 方法名称:是否可调用
        protected transient volatile ConcurrentMap<String, Boolean> methodsLimit;
        // 服务提供者辅助类,加载 key=bootstrap 的 ProviderBootstrap 实现
        protected transient ProviderBootstrap providerBootstrap;
    
        // Gets proxy class.
        @Override
        public Class<?> getProxyClass() {
            if (proxyClass != null) {
                return proxyClass;
            }
            // 加载 interfaceId 的接口
            this.proxyClass = ClassUtils.forName(interfaceId);
            return proxyClass;
        }
    
        /**
         * Build key.
         *
         * @return the string
         */
        @Override
        public String buildKey() {
            return interfaceId + ":" + uniqueId;
        }
        
        @Override
        public boolean hasTimeout() {
            if (timeout > 0) {
                return true;
            }
            if (CommonUtils.isNotEmpty(methods)) {
                for (MethodConfig methodConfig : methods.values()) {
                    if (methodConfig.getTimeout() > 0) {
                        return true;
                    }
                }
            }
            return false;
        }
    
        /**
         * 是否有并发控制需求,有就打开过滤器
         * 配置-1关闭并发过滤器,等于0表示开启过滤但是不限制
         */
        @Override
        public boolean hasConcurrents() {
            if (concurrents > 0) {
                return true;
            }
            if (CommonUtils.isNotEmpty(methods)) {
                for (MethodConfig methodConfig : methods.values()) {
                    if (methodConfig.getConcurrents() != null
                            && methodConfig.getConcurrents() > 0) {
                        return true;
                    }
                }
            }
            return false;
        }
        
        /**
         * 发布服务
         */
        public synchronized void export() {
            if (providerBootstrap == null) {
                providerBootstrap = Bootstraps.from(this);
            }
            providerBootstrap.export();
        }
    
        /**
         * 取消发布服务
         */
        public synchronized void unExport() {
            if (providerBootstrap != null) {
                providerBootstrap.unExport();
            }
        }
    }
    

    可以看到服务发布是 ProviderConfig 唤起的

    五、消费端配置类 ConsumerConfig

    public class ConsumerConfig<T> extends AbstractInterfaceConfig<T, ConsumerConfig<T>> implements Serializable {
        // 通信协议,默认为bolt
        protected String protocol = getStringValue(DEFAULT_PROTOCOL);
        // 直连调用地址
        protected String directUrl;
        // 是否泛化调用
        protected boolean generic;
        // 调用方式,默认为sync
        protected String invokeType = getStringValue(CONSUMER_INVOKE_TYPE);
        // consumer连provider超时时间,默认5000
        protected int connectTimeout = getIntValue(CONSUMER_CONNECT_TIMEOUT);
        // consumer断开时等待结果的超时时间,默认10000
        protected int disconnectTimeout = getIntValue(CONSUMER_DISCONNECT_TIMEOUT);
        // 集群处理,默认是failover
        protected String cluster = getStringValue(CONSUMER_CLUSTER);
        // 连接管理器, 默认all,表示连接全部建立长连接
        protected String connectionHolder = getStringValue(CONSUMER_CONNECTION_HOLDER);
        // 地址管理器,默认singleGroup,单分组
        protected String addressHolder = getStringValue(CONSUMER_ADDRESS_HOLDER);
        // 负载均衡,默认random
        protected String loadBalancer = getStringValue(CONSUMER_LOAD_BALANCER);
        // 是否延迟建立长连接(第一次调用时新建,注意此参数可能和check冲突,开启check后lazy自动失效)
        // 默为false
        protected boolean lazy = getBooleanValue(CONSUMER_LAZY);
        // 粘滞连接,一个断开才选下一个: change transport when current is disconnected
        // 默认false
        protected boolean sticky = getBooleanValue(CONSUMER_STICKY);
        // 是否jvm内部调用(provider和consumer配置在同一个jvm内,则走本地jvm内部,不走远程)
        // 默认false
        protected boolean inJVM = getBooleanValue(CONSUMER_INJVM);
        // 是否强依赖(即没有服务节点就启动失败,注意此参数可能和lazy冲突,开启check后lazy自动失效)
        // 默认false
        protected boolean check = getBooleanValue(CONSUMER_CHECK);
        // 长连接个数,不是所有的框架都支持一个地址多个长连接
        // 默认1
        protected int connectionNum = getIntValue(CONSUMER_CONNECTION_NUM);
        // Consumer给Provider发心跳的间隔
        // 默认30000
        protected int heartbeatPeriod = getIntValue(CONSUMER_HEARTBEAT_PERIOD);
        // Consumer给Provider重连的间隔
        // 默认10000
        protected int reconnectPeriod = getIntValue(CONSUMER_RECONNECT_PERIOD);
        // 路由配置别名
        protected List<String> router;
        // 路由规则引用,多个用英文逗号隔开。List<Router>
        protected transient List<Router> routerRef;
        // 返回时的回调函数
        protected transient SofaResponseCallback onReturn;
        // 连接事件监听器实例,连接或者断开时触发
        @Unstable
        protected transient List<ChannelListener> onConnect;
        // 客户端状态变化监听器实例,状态可用和不可以时触发
        @Unstable
        protected transient List<ConsumerStateListener> onAvailable;
        // 启动器
        protected String bootstrap;
        // 客户端获取地址等待时间(毫秒),-1表示一直等
        protected int addressWait = getIntValue(CONSUMER_ADDRESS_WAIT);
    
        /*-------- 下面是方法级可覆盖配置 --------*/
        // consumer调用provider超时时间(毫秒), 默认5000
        protected int timeout = getIntValue(CONSUMER_INVOKE_TIMEOUT);
        // 失败后重试次数,默认是0
        protected int retries = getIntValue(CONSUMER_RETRIES);
        // 接口下每方法的最大可并行执行请求数,配置-1关闭并发过滤器,等于0表示开启过滤但是不限制,默认是0
        protected int concurrents = getIntValue(CONSUMER_CONCURRENTS);
        /*---------- 参数配置项结束 ------------*/
    
        // 服务消费者启动类
        private transient ConsumerBootstrap<T> consumerBootstrap;
        // 服务列表的listener
        private transient volatile ProviderInfoListener providerInfoListener;
    
        // Build key.
        @Override
        public String buildKey() {
            return protocol + "://" + interfaceId + ":" + uniqueId;
        }
    
        // Gets proxy class.
        @Override
        public Class<?> getProxyClass() {
            if (proxyClass != null) {
                return proxyClass;
            }
            // 泛化
            if (generic) {
                return GenericService.class;
            }
            // 正常接口
            this.proxyClass = ClassUtils.forName(interfaceId);
            return proxyClass;
        }
    
    
        /**
         * 引用服务
         * @return 服务代理类 t
         */
        public T refer() {
            if (consumerBootstrap == null) {
                consumerBootstrap = Bootstraps.from(this);
            }
            return consumerBootstrap.refer();
        }
    
        /**
         * 取消引用服务
         */
        public void unRefer() {
            if (consumerBootstrap != null) {
                consumerBootstrap.unRefer();
            }
        }
    }
    

    可以看到服务引用是 ConsumerConfig 唤起的

    六、应用配置类 ApplicationConfig

    public class ApplicationConfig implements Serializable {
        // The App name.
        protected String appName;
        // The App id.
        protected String appId;
        // The Ins id. 实例ID
        protected String insId;
    }
    

    七、方法配置类 MethodConfig

    /**
     * 方法级配置
     */
    public class MethodConfig implements Serializable {
        /*-------------配置项开始----------------*/
        // 方法名称,无法做到重载方法的配置
        private String                 name;
        // 自定义参数
        protected Map<String, String>  parameters;
        // 远程调用超时时间(毫秒)
        protected Integer              timeout;
        // 失败后重试次数
        protected Integer              retries;
        // 调用方式
        protected String               invokeType;
        // 是否jsr303验证
        protected Boolean              validation;
        // 返回时的回调函数
        protected SofaResponseCallback onReturn;
        // 最大并发执行(不管服务端还是客户端)
        protected Integer              concurrents;
        // 是否启用客户端缓存
        protected Boolean              cache;
        // 是否启动压缩
        protected String               compress;
        /*-------------配置项结束----------------*/
    }
    

    相关文章

      网友评论

          本文标题:SOFARPC 源码分析3 - 各种 Config 配置类

          本文链接:https://www.haomeiwen.com/subject/bzvsqqtx.html