ActiveMQ源码解析(四)Network Bridge

作者: MisterCH | 来源:发表于2017-10-02 19:56 被阅读58次
    ActiveMQ集群网络

    在多个ActiveMQ Broker组成的集群(Network of broker)中,是通过Network Bridge机制来实现的。在阅读源码前,我一直有误解,以为AMQ之间在duplex=false的场景下互相通讯是互相建立消费者客户端,如果生产者连接Broker A,而消费者连接Broker B,那么应该是Broker B先从Broker A上消费消息再发给自己的消费者。可是实际看起来并不是这种简单的机制。今天这篇源码解析,由于牵涉的代码比较多,我尽量用较少的源码来整理出每个方法的逻辑。

    AMQ的通讯的确是通过一个虚拟的客户端实现的,但是这个客户端在AMQ的源码里称为Network Bridge。在集群建立的时候,如果使用duplex=false(一般在mesh结构,也就是N个Broker完全互连的场景下)的配置,每个Broker上都会出现两个连接:

    • 一个连接是自己与自己的连接,负责从本地接收指令及消息并处理。
    • 另一个连接是自己与其他MQ的连接,负责从另一个Broker接收指令及消息并处理,也负责往其他MQ发送指令和消息。

    这里所谓的指令指的是例如消费者创建,Broker异常,连接异常等控制指令,Network Bridge接收到这些指令后需要进行相应的处理。而消息则是Message,收到后需要进行ack或者转发给自己的消费者。

    看看源码吧,在AMQ的源码中起到主要作用(default配置)的是DemandForwardingBridgeSupport类。这个类中有几个主要的方法:

    1. start(),stop(),启动和停止bridge
    2. serviceLocalCommand 处理本地指令和消息
    3. serviceRemoteCommand 处理其他MQ传过来的指令和消息

    先来看start()方法吧,主要功能:
    1. 判断是否是双向连接,如果是就多建一个监听器
    2. 建立local和remote的transportListener监听器
    3. 启动network bridge

        public void start() throws Exception {
            if (started.compareAndSet(false, true)) {
    
                if (brokerService == null) {
                    throw new IllegalArgumentException("BrokerService is null on " + this);
                }
    
                networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
                //如果是双向连接,额外建立一个监听器
                if (isDuplex()) {
                    duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
                    duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
    
                        @Override
                        public void onCommand(Object o) {
                            Command command = (Command) o;
                            serviceLocalCommand(command);
                        }
    
                        @Override
                        public void onException(IOException error) {
                            serviceLocalException(error);
                        }
                    });
                    duplexInboundLocalBroker.start();
                }
                // 设定Local监听器,处理本地指令和消息
                localBroker.setTransportListener(new DefaultTransportListener() {
    
                    @Override
                    public void onCommand(Object o) {
                        Command command = (Command) o;
                        serviceLocalCommand(command);
                    }
    
                    @Override
                    public void onException(IOException error) {
                        if (!futureLocalBrokerInfo.isDone()) {
                            LOG.info("error with pending local brokerInfo on: " + localBroker, error);
                            futureLocalBrokerInfo.cancel(true);
                            return;
                        }
                        serviceLocalException(error);
                    }
                });
                // 设定Remote监听器,处理别的MQ的指令和消息
                remoteBroker.setTransportListener(new DefaultTransportListener() {
    
                    @Override
                    public void onCommand(Object o) {
                        Command command = (Command) o;
                        serviceRemoteCommand(command);
                    }
    
                    @Override
                    public void onException(IOException error) {
                        if (!futureRemoteBrokerInfo.isDone()) {
                            LOG.info("error with pending remote brokerInfo on: " + remoteBroker, error);
                            futureRemoteBrokerInfo.cancel(true);
                            return;
                        }
                        serviceRemoteException(error);
                    }
                });
    
                // remoteBroker和localBroker的类其实是transport,也就是说这俩货
                // 其实是两个连接。
                remoteBroker.start();
                localBroker.start();
                 
                // 如果连接未被抛弃
                if (!disposed.get()) {
                    try {
                        // 启动
                        triggerStartAsyncNetworkBridgeCreation();
                    } catch (IOException e) {
                        LOG.warn("Caught exception from remote start", e);
                    }
                } else {
                    LOG.warn("Bridge was disposed before the start() method was fully executed.");
                    throw new TransportDisposedIOException();
                }
            }
        }
    

    启动bridge的方法是triggerStartAsyncNetworkBridgeCreation(),在该方法中主要是启动一个做了两件事:
    1. 调用collectBrokerInfos()方法收集本地和异地的brokerInfo信息,在该方法中有逻辑来处理remoteBroker和localBroker是相同Broker的情况,防止出现环。
    2. 调用doStartLocalAndRemoteBridges()方法启动本地和异地的连接(也就是localBroker和remoteBroker)。在该方法中主要是通过startLocalBridge()和startRemoteBridge()来实现连接的启动。

    startLocalBridge和startRemoteBridge()稍有点复杂,源码就不上了。我整理了一下,startLocalBridge主要实现的事情包括了:
    1. 往本地MQ建立了一个ClientID为 NetworkConnectorName_RemoteBrokerName_inbound_LocalBrokerNam的连接信息connectionInfo,同时建立会话信息sessionInfo,并告知本地broker,注意:这个连接信息会出现在要被转发的topic或者queue的消费者列表里。
    2. 如果是双向连接,还需要再建立一个连接
    3. 建立一个network bridge的连接监听器NetworkBridgeListener
    4. 告知本地Broker有其他MQ试图建立连接

    而startRemoteBridge主要实现的事情包括了:
    1. 处理dynamicallyIncludedDestinationsKey和staticallyIncludedDestinationsKey的配置的事宜
    2. 建立一个clientID为NetworkName_LocalBrokerName_outbound的连接信息connectionInfo,同时建立会话信息sessionInfo,生产者producerInfo,并告知连接另一端的Broker。
    3. 建立一个监听对端Broker(以下称为RB)的所有消费者建立和断开的Advisory Topic系统Topic:ActiveMQ.Advisory.Consumer.> 的消费者信息,并发送给RB。

    startRemoteBridge()中第2、3两部分的代码如下:

                  // 把之前没关闭连接的先告知RB关闭了。
                    if (remoteConnectionInfo != null) {
                        remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
                    }
                   // 新建一个remoteConnectionInfo
                    remoteConnectionInfo = new ConnectionInfo();
                    remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
                    remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
                    remoteConnectionInfo.setUserName(configuration.getUserName());
                    remoteConnectionInfo.setPassword(configuration.getPassword());
                    remoteBroker.oneway(remoteConnectionInfo);
                    // 新建一个sessionInfo,我理解是用于建立生产者
                    SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
                    remoteBroker.oneway(remoteSessionInfo);
                    // 基于建立的session建立生产者
                    producerInfo = new ProducerInfo(remoteSessionInfo, 1);
                    producerInfo.setResponseRequired(false);
                    remoteBroker.oneway(producerInfo);
                    // Listen to consumer advisory messages on the remote broker to determine demand.
                    // 如果在xml中没有配置staticBridge,就表示是demanForwardBridge,需要建立监听RB的消费者连接和断开的消费者。
                    if (!configuration.isStaticBridge()) {
                        // 发现仍然使用remoteSessionInfo来建立消费者。话说我记得好像同一个session里不能同时有生产者和消费者?
                        demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
                        // always dispatch advisory message asynchronously so that
                        // we never block the producer broker if we are slow
                        demandConsumerInfo.setDispatchAsync(true);
                        String advisoryTopic = configuration.getDestinationFilter();
                        if (configuration.isBridgeTempDestinations()) {
                            advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
                        }
                        demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
                        configureConsumerPrefetch(demandConsumerInfo);
                        remoteBroker.oneway(demandConsumerInfo);
                    }
    

    注意:代码里所有的remoteBroker.oneway(command)其实都可以视为一个向RB发送指令和消息的动作,oneway表示异步发送,当然还有同步发送,在startLocalBridge方法里有,往本地发送连接信息的时候用到的:
    Object resp = localBroker.request(localConnectionInfo);
    此外还有异步发送,但是需要接收并处理反馈的方法,在serviceRemoteCommand方法里,用于发送持久化消息的,代码是这样写的:

    remoteBroker.asyncRequest(message, new ResponseCallback() {
        @Override
        public void onCompletion(FutureResponse future) {
            try {
              Response response = future.getResult();
              if (response.isException()) {
                   ExceptionResponse er = (ExceptionResponse) response;
                   serviceLocalException(md, er.getException());
              } else {
                   localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                   networkBridgeStatistics.getDequeues().increment();
              }
           } catch (IOException e) {
              serviceLocalException(md, e);
           } finally {
              sub.decrementOutstandingResponses();
           }
      }
    });
    

    言归正传,从上面的几个方法里其实可以看到,在bridge启动的时候,建立一个通向本地的连接信息localConnectionInfo和会话信息localSessionInfo,建立了一个通向RB的连接信息remoteConnectionInfo、会话remoteSessionInfo、生产者信息producerInfo、和对advisory topic的消费者信息demandConsumerInfo。这些对象都是Info结尾的,因为这些都是JMS中规范中的概念,netwokr bridge里建立这些是用来迎合JMS规范的。实际这些都是抽象的模型。我们只需要记住两个连接已经建立了,分别是localBroker和remoteBroker。

    下面是最主要的两个方法serviceLocalCommand和serviceRemoteCommand。由于方法比较长,这里介绍主要的逻辑和重要的代码。

    serviceLocalCommand,用于处理localBroker这个连接从本地收到的指令和消息,localBroker中传来的消息主要有几种:

    1. 消息分发,根据RB告知的其存在的消费者,判断是否需要转发,有几种情况:
      a. 双向连接且消息是advisory消息,直接使用remoteBroker.oneway(message);发送
      b. 持久化消息或在Broker端配置了始终使用同步发送,使用remoteBroker.asyncRequest()方式发送,也就是异步需反馈的方法发送。等到对端RB发送了收到消息的回执后,会往本地发送一个消息的ack消息localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
      c. 如果消息本身就是被异步发送过来的,那直接调用一个remoteBroker.oneway(message)方法把消息发送给对端,并且调用一个localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));方法告知本地。
    2. BrokerInfo消息,相当于更新本地BrokerInfo,直接保存到一个对象里即可。
      futureLocalBrokerInfo.set((BrokerInfo) command);
    3. Broker Stop信息,相当于本地broker要关闭了,那就执行一个stop方法。
    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
    stop();
    
    1. ConnectionError连接异常的消息,执行异常处理操作:
    ConnectionError ce = (ConnectionError) command;
    serviceLocalException(ce.getException());
    
    1. 还有其他的一些消息,不负责处理:
    switch (command.getDataStructureType()) {
    case WireFormatInfo.DATA_STRUCTURE_TYPE:
        break;
    case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
        break;
    default:
        LOG.warn("Unexpected local command: {}", command);
    }
    

    serviceRemoteCommand方法,用于处理remoteBroker这个连接从RB收到的指令和消息,localBroker中传来的消息主要有几种:

    1. 消息分发。由于remoteBroker只负责advisory的监听,所以只会处理advisory message。serviceRemoteConsumerAdvisory这个方法会根据收到的consumer建立,销毁等动作,将RB的消费者信息加入或移除出本地的network consumer列表。
    safeWaitUntilStarted();
    MessageDispatch md = (MessageDispatch) command;
    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
    ackAdvisory(md.getMessage());
    
    1. BrokerInfo消息,相当于更新异地BrokerInfo,直接保存到一个对象里即可。
      futureRemoteBrokerInfo.set((BrokerInfo) command);
    2. BrokerSubscriptionInfo,处理dynamicallyIncludedDestinations等特殊性质的订阅。
    3. ConnectionError连接异常的消息,执行异常处理操作
    4. 双向连接发来的信息,需要单独处理一遍,不赘述
    5. 其他类型消息,均不作处理,包括KeepAliveInfo,WireFormatInfo,ShutdownInfo等等。

    在serviceRemoteCommand方法中,有个很重要的方法是serviceRemoteConsumerAdvisory,该方法用于处理收到的advisoryMessage,主要是消费者的建立和断开,主要是关于networkTTL的判断,避免出现环路。还有对非持久化订阅和持久化订阅的分别处理。代码奉上:

        private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
            final int networkTTL = configuration.getConsumerTTL();
            // 收到消费者信息,判断是否需要建一个新的本地network订阅。
            if (data.getClass() == ConsumerInfo.class) {
                // Create a new local subscription
                ConsumerInfo info = (ConsumerInfo) data;
                BrokerId[] path = info.getBrokerPath();
    
                if (info.isBrowser()) {
                    LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
                    return;
                }
    
                if (path != null && networkTTL > -1 && path.length >= networkTTL) {
                    LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
                            configuration.getBrokerName(), remoteBrokerName, networkTTL, info
                    });
                    return;
                }
    
                if (contains(path, localBrokerPath[0])) {
                    // Ignore this consumer as it's a consumer we locally sent to the broker.
                    LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
                            configuration.getBrokerName(), remoteBrokerName, info
                    });
                    return;
                }
    
                if (!isPermissableDestination(info.getDestination())) {
                    // ignore if not in the permitted or in the excluded list
                    LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
                            configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
                    });
                    return;
                }
    
                // in a cyclic network there can be multiple bridges per broker that can propagate
                // a network subscription so there is a need to synchronize on a shared entity
                // if duplicate suppression is required
                if (isDuplicateSuppressionOff(info)) {
                    addConsumerInfo(info);
                } else {
                    synchronized (brokerService.getVmConnectorURI()) {
                        addConsumerInfo(info);
                    }
                }
            } else if (data.getClass() == DestinationInfo.class) {
                // It's a destination info - we want to pass up information about temporary destinations  说实话没太明白这个是干嘛的
                final DestinationInfo destInfo = (DestinationInfo) data;
                BrokerId[] path = destInfo.getBrokerPath();
                if (path != null && networkTTL > -1 && path.length >= networkTTL) {
                    LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
                            configuration.getBrokerName(), destInfo, networkTTL
                    });
                    return;
                }
                if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
                    LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
                    return;
                }
                destInfo.setConnectionId(localConnectionInfo.getConnectionId());
                if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
                    // re-set connection id so comes from here
                    ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
                    tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
                }
                destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
                LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
                        configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
                });
                if (destInfo.isRemoveOperation()) {
                    // Serialize with removeSub operations such that all removeSub advisories
                    // are generated
                    serialExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                localBroker.oneway(destInfo);
                            } catch (IOException e) {
                                LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
                            }
                        }
                    });
                } else {
                    localBroker.oneway(destInfo);
                }
            } else if (data.getClass() == RemoveInfo.class) {
                // 如果是消费断开连接的信息,则判断是否从subscription中移除
                ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
                removeDemandSubscription(id);
    
                if (forcedDurableRemoteId.remove(id)) {
                    for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                        DemandSubscription ds = i.next();
                        boolean removed = ds.removeForcedDurableConsumer(id);
                        if (removed) {
                            cleanupDurableSub(ds, i);
                        }
                    }
               }
    
            } else if (data.getClass() == RemoveSubscriptionInfo.class) {
                RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
                SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
                for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                    DemandSubscription ds = i.next();
                    boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
                    if (removed) {
                        cleanupDurableSub(ds, i);
                    }
                }
            }
        }
    

    至此我们大致可以明白MQ的连接之间的套路了。两台MQ之间建立连接,Network Bridge会出现两个transport,一个是localBroker,一个是remoteBroker。一个负责从本地接收,另一个负责监听对端,并处理对端发来的执行。

    如果是一个mesh的配置,3台MQ组成的集群,每台MQ上会出现两个localBroker,两个remoteBroker。4台MQ则各是3个共6个……

    感觉mesh方式来搭建MQ的集群,对MQ的集群压力还是挺大的啊。。

    相关文章

      网友评论

        本文标题:ActiveMQ源码解析(四)Network Bridge

        本文链接:https://www.haomeiwen.com/subject/cgptyxtx.html