在多个ActiveMQ Broker组成的集群(Network of broker)中,是通过Network Bridge机制来实现的。在阅读源码前,我一直有误解,以为AMQ之间在duplex=false的场景下互相通讯是互相建立消费者客户端,如果生产者连接Broker A,而消费者连接Broker B,那么应该是Broker B先从Broker A上消费消息再发给自己的消费者。可是实际看起来并不是这种简单的机制。今天这篇源码解析,由于牵涉的代码比较多,我尽量用较少的源码来整理出每个方法的逻辑。
AMQ的通讯的确是通过一个虚拟的客户端实现的,但是这个客户端在AMQ的源码里称为Network Bridge。在集群建立的时候,如果使用duplex=false(一般在mesh结构,也就是N个Broker完全互连的场景下)的配置,每个Broker上都会出现两个连接:
- 一个连接是自己与自己的连接,负责从本地接收指令及消息并处理。
- 另一个连接是自己与其他MQ的连接,负责从另一个Broker接收指令及消息并处理,也负责往其他MQ发送指令和消息。
这里所谓的指令指的是例如消费者创建,Broker异常,连接异常等控制指令,Network Bridge接收到这些指令后需要进行相应的处理。而消息则是Message,收到后需要进行ack或者转发给自己的消费者。
看看源码吧,在AMQ的源码中起到主要作用(default配置)的是DemandForwardingBridgeSupport类。这个类中有几个主要的方法:
- start(),stop(),启动和停止bridge
- serviceLocalCommand 处理本地指令和消息
- serviceRemoteCommand 处理其他MQ传过来的指令和消息
先来看start()方法吧,主要功能:
1. 判断是否是双向连接,如果是就多建一个监听器
2. 建立local和remote的transportListener监听器
3. 启动network bridge
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
if (brokerService == null) {
throw new IllegalArgumentException("BrokerService is null on " + this);
}
networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
//如果是双向连接,额外建立一个监听器
if (isDuplex()) {
duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
Command command = (Command) o;
serviceLocalCommand(command);
}
@Override
public void onException(IOException error) {
serviceLocalException(error);
}
});
duplexInboundLocalBroker.start();
}
// 设定Local监听器,处理本地指令和消息
localBroker.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
Command command = (Command) o;
serviceLocalCommand(command);
}
@Override
public void onException(IOException error) {
if (!futureLocalBrokerInfo.isDone()) {
LOG.info("error with pending local brokerInfo on: " + localBroker, error);
futureLocalBrokerInfo.cancel(true);
return;
}
serviceLocalException(error);
}
});
// 设定Remote监听器,处理别的MQ的指令和消息
remoteBroker.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
Command command = (Command) o;
serviceRemoteCommand(command);
}
@Override
public void onException(IOException error) {
if (!futureRemoteBrokerInfo.isDone()) {
LOG.info("error with pending remote brokerInfo on: " + remoteBroker, error);
futureRemoteBrokerInfo.cancel(true);
return;
}
serviceRemoteException(error);
}
});
// remoteBroker和localBroker的类其实是transport,也就是说这俩货
// 其实是两个连接。
remoteBroker.start();
localBroker.start();
// 如果连接未被抛弃
if (!disposed.get()) {
try {
// 启动
triggerStartAsyncNetworkBridgeCreation();
} catch (IOException e) {
LOG.warn("Caught exception from remote start", e);
}
} else {
LOG.warn("Bridge was disposed before the start() method was fully executed.");
throw new TransportDisposedIOException();
}
}
}
启动bridge的方法是triggerStartAsyncNetworkBridgeCreation(),在该方法中主要是启动一个做了两件事:
1. 调用collectBrokerInfos()方法收集本地和异地的brokerInfo信息,在该方法中有逻辑来处理remoteBroker和localBroker是相同Broker的情况,防止出现环。
2. 调用doStartLocalAndRemoteBridges()方法启动本地和异地的连接(也就是localBroker和remoteBroker)。在该方法中主要是通过startLocalBridge()和startRemoteBridge()来实现连接的启动。
startLocalBridge和startRemoteBridge()稍有点复杂,源码就不上了。我整理了一下,startLocalBridge主要实现的事情包括了:
1. 往本地MQ建立了一个ClientID为 NetworkConnectorName_RemoteBrokerName_inbound_LocalBrokerNam的连接信息connectionInfo,同时建立会话信息sessionInfo,并告知本地broker,注意:这个连接信息会出现在要被转发的topic或者queue的消费者列表里。
2. 如果是双向连接,还需要再建立一个连接
3. 建立一个network bridge的连接监听器NetworkBridgeListener
4. 告知本地Broker有其他MQ试图建立连接
而startRemoteBridge主要实现的事情包括了:
1. 处理dynamicallyIncludedDestinationsKey和staticallyIncludedDestinationsKey的配置的事宜
2. 建立一个clientID为NetworkName_LocalBrokerName_outbound的连接信息connectionInfo,同时建立会话信息sessionInfo,生产者producerInfo,并告知连接另一端的Broker。
3. 建立一个监听对端Broker(以下称为RB)的所有消费者建立和断开的Advisory Topic系统Topic:ActiveMQ.Advisory.Consumer.> 的消费者信息,并发送给RB。
startRemoteBridge()中第2、3两部分的代码如下:
// 把之前没关闭连接的先告知RB关闭了。
if (remoteConnectionInfo != null) {
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
}
// 新建一个remoteConnectionInfo
remoteConnectionInfo = new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
remoteConnectionInfo.setUserName(configuration.getUserName());
remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo);
// 新建一个sessionInfo,我理解是用于建立生产者
SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
remoteBroker.oneway(remoteSessionInfo);
// 基于建立的session建立生产者
producerInfo = new ProducerInfo(remoteSessionInfo, 1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
// Listen to consumer advisory messages on the remote broker to determine demand.
// 如果在xml中没有配置staticBridge,就表示是demanForwardBridge,需要建立监听RB的消费者连接和断开的消费者。
if (!configuration.isStaticBridge()) {
// 发现仍然使用remoteSessionInfo来建立消费者。话说我记得好像同一个session里不能同时有生产者和消费者?
demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
// always dispatch advisory message asynchronously so that
// we never block the producer broker if we are slow
demandConsumerInfo.setDispatchAsync(true);
String advisoryTopic = configuration.getDestinationFilter();
if (configuration.isBridgeTempDestinations()) {
advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
}
demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
configureConsumerPrefetch(demandConsumerInfo);
remoteBroker.oneway(demandConsumerInfo);
}
注意:代码里所有的remoteBroker.oneway(command)其实都可以视为一个向RB发送指令和消息的动作,oneway表示异步发送,当然还有同步发送,在startLocalBridge方法里有,往本地发送连接信息的时候用到的:
Object resp = localBroker.request(localConnectionInfo);
此外还有异步发送,但是需要接收并处理反馈的方法,在serviceRemoteCommand方法里,用于发送持久化消息的,代码是这样写的:
remoteBroker.asyncRequest(message, new ResponseCallback() {
@Override
public void onCompletion(FutureResponse future) {
try {
Response response = future.getResult();
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
serviceLocalException(md, er.getException());
} else {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
networkBridgeStatistics.getDequeues().increment();
}
} catch (IOException e) {
serviceLocalException(md, e);
} finally {
sub.decrementOutstandingResponses();
}
}
});
言归正传,从上面的几个方法里其实可以看到,在bridge启动的时候,建立一个通向本地的连接信息localConnectionInfo和会话信息localSessionInfo,建立了一个通向RB的连接信息remoteConnectionInfo、会话remoteSessionInfo、生产者信息producerInfo、和对advisory topic的消费者信息demandConsumerInfo。这些对象都是Info结尾的,因为这些都是JMS中规范中的概念,netwokr bridge里建立这些是用来迎合JMS规范的。实际这些都是抽象的模型。我们只需要记住两个连接已经建立了,分别是localBroker和remoteBroker。
下面是最主要的两个方法serviceLocalCommand和serviceRemoteCommand。由于方法比较长,这里介绍主要的逻辑和重要的代码。
serviceLocalCommand,用于处理localBroker这个连接从本地收到的指令和消息,localBroker中传来的消息主要有几种:
- 消息分发,根据RB告知的其存在的消费者,判断是否需要转发,有几种情况:
a. 双向连接且消息是advisory消息,直接使用remoteBroker.oneway(message);发送
b. 持久化消息或在Broker端配置了始终使用同步发送,使用remoteBroker.asyncRequest()方式发送,也就是异步需反馈的方法发送。等到对端RB发送了收到消息的回执后,会往本地发送一个消息的ack消息localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
c. 如果消息本身就是被异步发送过来的,那直接调用一个remoteBroker.oneway(message)方法把消息发送给对端,并且调用一个localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
方法告知本地。 - BrokerInfo消息,相当于更新本地BrokerInfo,直接保存到一个对象里即可。
futureLocalBrokerInfo.set((BrokerInfo) command);
- Broker Stop信息,相当于本地broker要关闭了,那就执行一个stop方法。
LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
stop();
- ConnectionError连接异常的消息,执行异常处理操作:
ConnectionError ce = (ConnectionError) command;
serviceLocalException(ce.getException());
- 还有其他的一些消息,不负责处理:
switch (command.getDataStructureType()) {
case WireFormatInfo.DATA_STRUCTURE_TYPE:
break;
case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
break;
default:
LOG.warn("Unexpected local command: {}", command);
}
serviceRemoteCommand方法,用于处理remoteBroker这个连接从RB收到的指令和消息,localBroker中传来的消息主要有几种:
- 消息分发。由于remoteBroker只负责advisory的监听,所以只会处理advisory message。serviceRemoteConsumerAdvisory这个方法会根据收到的consumer建立,销毁等动作,将RB的消费者信息加入或移除出本地的network consumer列表。
safeWaitUntilStarted();
MessageDispatch md = (MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
ackAdvisory(md.getMessage());
- BrokerInfo消息,相当于更新异地BrokerInfo,直接保存到一个对象里即可。
futureRemoteBrokerInfo.set((BrokerInfo) command);
- BrokerSubscriptionInfo,处理dynamicallyIncludedDestinations等特殊性质的订阅。
- ConnectionError连接异常的消息,执行异常处理操作
- 双向连接发来的信息,需要单独处理一遍,不赘述
- 其他类型消息,均不作处理,包括KeepAliveInfo,WireFormatInfo,ShutdownInfo等等。
在serviceRemoteCommand方法中,有个很重要的方法是serviceRemoteConsumerAdvisory,该方法用于处理收到的advisoryMessage,主要是消费者的建立和断开,主要是关于networkTTL的判断,避免出现环路。还有对非持久化订阅和持久化订阅的分别处理。代码奉上:
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
final int networkTTL = configuration.getConsumerTTL();
// 收到消费者信息,判断是否需要建一个新的本地network订阅。
if (data.getClass() == ConsumerInfo.class) {
// Create a new local subscription
ConsumerInfo info = (ConsumerInfo) data;
BrokerId[] path = info.getBrokerPath();
if (info.isBrowser()) {
LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
return;
}
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, networkTTL, info
});
return;
}
if (contains(path, localBrokerPath[0])) {
// Ignore this consumer as it's a consumer we locally sent to the broker.
LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, info
});
return;
}
if (!isPermissableDestination(info.getDestination())) {
// ignore if not in the permitted or in the excluded list
LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
});
return;
}
// in a cyclic network there can be multiple bridges per broker that can propagate
// a network subscription so there is a need to synchronize on a shared entity
// if duplicate suppression is required
if (isDuplicateSuppressionOff(info)) {
addConsumerInfo(info);
} else {
synchronized (brokerService.getVmConnectorURI()) {
addConsumerInfo(info);
}
}
} else if (data.getClass() == DestinationInfo.class) {
// It's a destination info - we want to pass up information about temporary destinations 说实话没太明白这个是干嘛的
final DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path = destInfo.getBrokerPath();
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
configuration.getBrokerName(), destInfo, networkTTL
});
return;
}
if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
return;
}
destInfo.setConnectionId(localConnectionInfo.getConnectionId());
if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
// re-set connection id so comes from here
ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
});
if (destInfo.isRemoveOperation()) {
// Serialize with removeSub operations such that all removeSub advisories
// are generated
serialExecutor.execute(new Runnable() {
@Override
public void run() {
try {
localBroker.oneway(destInfo);
} catch (IOException e) {
LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
}
}
});
} else {
localBroker.oneway(destInfo);
}
} else if (data.getClass() == RemoveInfo.class) {
// 如果是消费断开连接的信息,则判断是否从subscription中移除
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
if (forcedDurableRemoteId.remove(id)) {
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
boolean removed = ds.removeForcedDurableConsumer(id);
if (removed) {
cleanupDurableSub(ds, i);
}
}
}
} else if (data.getClass() == RemoveSubscriptionInfo.class) {
RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
if (removed) {
cleanupDurableSub(ds, i);
}
}
}
}
至此我们大致可以明白MQ的连接之间的套路了。两台MQ之间建立连接,Network Bridge会出现两个transport,一个是localBroker,一个是remoteBroker。一个负责从本地接收,另一个负责监听对端,并处理对端发来的执行。
如果是一个mesh的配置,3台MQ组成的集群,每台MQ上会出现两个localBroker,两个remoteBroker。4台MQ则各是3个共6个……
感觉mesh方式来搭建MQ的集群,对MQ的集群压力还是挺大的啊。。
网友评论