美文网首页
RocketMQ 客户端源码分析(一)

RocketMQ 客户端源码分析(一)

作者: coding400 | 来源:发表于2019-11-27 18:51 被阅读0次

MQClientManager

1. 属性:

    a. private ConcurrentHashMap<String,MQClientInstance> factoryTable = xxxx;
        key  为 ClientConfig 的 clientId (10.179.66.194@producer-instance)
        value 为使用 ClientConfig 配置生成的实例 MQClientInstance 
    b. private MQClientManager instance = xxx; 饱汉单例
    c. 使用工厂模式加载 MQClientInstance

2. 方法
    a. getInstance() 获取单例对象 instance 
    b. getAndCreateMQClientInstance(XXX) 工厂方法,用于通过 ClientConfig生成MQClientInstance,(为什么这里要以 ClientConfig 为参数,而不是 DefaultMQProducer 这个类为参数?因为为了通用性,DefaultMQProducer、DefaultMQPushConsumer、都继承于 ClientConfig,方便各自实例的配置)
    c. removeClientFactory()  从factoryTable 中删除 MQClientInstance 实例

MQClientInstance

1. 属性
    a. clientId (ClientConfig 中 buildMQClientId() 生成的客户端ID,IP + @ + instanceName)
    b. ClientConfig clientConfig ,DefaultMQProducer 或者 DefaultMQPushConsumer 等实现类
    c. instanceIndex,通过 MQClientManger 生成的 MQClientInstance 所处的序号,每次实例化一个 MQClientInstance 便将序号加1
    d. ConcurrentMap:producerTable<group,MQProducerInner>、consumerTable<group,MQConsumerInner> 、adminExtTable<group,MQAdminExtInner> 。因为一个 clientId 对应一个MQClientInstance ,所以以上3个 Map 存的是各组对应的 生产者、消费者、管理用户对应的内部包装实现类
    e. NettyClientConfig nettyClientConfig; netty 相关的一些参数
    f. MQClientAPIImpl mQClientAPIImpl; MQ 普通本地客户端,封装了和MQ 业务相关参数及方法
    g. MQAdminImpl mQAdminImpl; MQ 管理员端,封装了 Admin 相关的 MQ业务相关参数及方法。同上
    h.  ConcurrentMap<String,TopicRouteData> topicRouteTable; 缓存当前客户端使用到的 Topic 相关信息
    i. Lock lockNamesrv、lockHeartbeat;显式锁,对远程服务进行“写”时,需保证同一个客户端同一时刻只有一个请求,保证线程安全
    j. ConcurrentMap brokerAddrTable、brokerVersionTable;缓存当前客户端使用到的 Broker 相关信息
    k. ScheduledExecutorService scheduledExecutorService; 只有一个线程的线程池,用来执行定时任务(动态更新 nameSrv 地址(通过一个静态 服务器返回的 nameSrv 的地址与 MQClientAPIImpl 中保存的 nameSrv 地址进行判断)、动态更新 Topic 信息、清除不可用的 Broker)
    l. ClientRemotingProcessor xx; 指令处理器,该类用于向服务器发送请求时,同一进行处理的入口,将请求内容封装成指令(RemotingCommand),根据请求指令码的不同,进入不同的 switch 分支,进入执行不同请求调用的方法。
    m. PullMessageService pullMessageService;  消费者拉取消息
    n. RebalanceService rebalanceService; 平衡消费者
    o. ServiceState serviceState; 服务状态,默认【创建】,其他:【运行】、
    【关闭】、【启动失败】
    
    疑问:
    
    DatagramSocket datagramSocket;用来干嘛的?
        …

MQClientAPIImpl

MQ 业务操作客户端,提供给当前客户端 MQInstanceClient 使用。该类封装了 MQ 相关业务场景,通过持有 RemotingClient ,实现与 Broker 和 Namesrv 服务器之间的调用

NettyRemotingClient

继承于 RemotingClient 接口,使用 高性能的网络框架 Netty 进行封装了 MQ 使用的通讯协议

相关文章

网友评论

      本文标题:RocketMQ 客户端源码分析(一)

      本文链接:https://www.haomeiwen.com/subject/ppuywctx.html