在RocketMq中消费消息有两种方式,push和pull方式:
- push方式会向Consumer注册一个MessageListener,在成功拉取到消息之后会立即进行回调MessageListener的方法。
- pull方式由应用自己主动去调用Consumer拉取消息方法去broker拉取消息,并进行消费处理,主动权都是在broker手中。
但是两种方式都是从broker pull拉取消息到本地来进行消费。consumerGroup是一类consumer的集合名称,他是用来消费同一类消息,并且消费逻辑是一样的。consumerGroup中consumer的数量最好不要超过订阅的topic中的queue数量,不然会有consumer的闲置。
接下来看看client端consumer消费消息的源码实现:
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
//在实例化的时候就设置了consumer的负载均衡策略,这里默认采用的均摊实现
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);
this.pullMessageService = new PullMessageService(this);
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
调用consumer.start()启动了消费者,主要是做了一下几件事情:
- 创建了MQClientInstance实例,主要靠他来进行和broker进行通信发送请求等。
- 实例化OffsetStore,对于不同的消息消费类型生成对应的OffsetStore实例。broadCasting广播类型,消息的offset是存放在本地的,所以实例化LocalFileOffsetStore从本地文件中加载保存的offset信息。clustering集群消费的话,消息的offset是存放在remote broker的,所以实例化的RemoteBrokerOffsetStore从远端broker获取。
- 根据不同的消费类型(并行消费或顺序消费),实例化不同的消息消费类ConsumeMessageOrderlyService(顺序消费)和ConsumeMessageConcurrentlyService(并行消费)。在实例化消费类型的时候,是开启了一个线程池用来处理消费消息的,这里消费消息的线程池线程数量是可以配置的,然后就是开启了定时任务线程池用来清理过期消息等。
- 在实例化MQClientService的时候实例化了PullMessageService,这里是拉取消息的类,在后台进行不停地拉取消息。
public class PullMessageService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
PullMessageService线程不停滴从pullRequestQueue阻塞队列中获取拉取消息请求进行拉取消息。如果拉取消息成功就继续包装PullRequest请求put到pullRequestQueue中拉取下一条消息,拉取失败就用后台定时任务线程,延迟一段时间(3秒)又把它放到pullRequestQueue中再次尝试获取消息。
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispathToConsume);
在拉取消息成功的时候,会把获取的消息存放到processQueue队列中,这个队列是存放获取成功但是还未被消费的消息,然后是把获取到的消息封装成ConsumeRequest提交到相应的ConsumeMessageService中进行消费(并行消费或顺序消费)。
顺序消费消息:顺序消费消息是在ConsumeMessageOrderlyService中处理的,他里面也封装了一个ConsumeRequest,把它丢到阻塞队列里面,在他的线程池里面取任务进行顺序消息的消费。
//这里是控制顺序消费的关键
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
public Object fetchLockObject(final MessageQueue mq) {
Object objLock = this.mqLockTable.get(mq);
if (null == objLock) {
objLock = new Object();
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
if (prevLock != null) {
objLock = prevLock;
}
}
return objLock;
}
在ConsumeRequest中是先去获取MessageQueue的锁,这个很关键,这个是确保顺序消息被顺序消费的关键,只有获取到了锁的线程才能去更进一步去消费消息。这里也可以看出顺序消息也只能有一个consumer才能确保全局的顺序消费,否则的话有多个consumer的话,每一个jvm进程中只能有一个MQClientInstance,这样在两个独立的jvm进程中是无法控制一个messageQueue只会同时被一个consumer消费消息。
并行消息消费:并发消费消息是在ConsumeMessageConcurrentlyService中处理的,他里面也封装了一个ConsumeRequest,也是把它丢到阻塞队列中,后台线程池不停滴从队列中取任务进行并行消费。
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
在ConsumeMessageConcurrentlyService中的ConsumeMessage会进行回调实现MessageListener的方法,即处理了应用程序消费消息的业务逻辑。
consumer端消息过滤:在consumer端拉取到消息之后的回调中是会进行消息的过滤
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
//处理消息,主要是过滤消息
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
//获取到消息后的处理
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
//先根据tag来进行过滤
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
//根据注册的FilterMessage来进行过滤
if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
}
for (MessageExt msg : msgListFilterAgain) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
}
pullResultExt.setMessageBinary(null);
return pullResult;
}
网友评论