事情起因:
1、rabbitmq原生的好像不支持多线程消费,而spring boot封装的starter-amqp是支持的。然后,我使用的时候,配置了多线程,但是碰到几个坑:
1、服务启动的时候,消费者sleep无限秒,但是,还是只接收到了一条消息,还让我以为自己配置错了。
2、后来,发现,服务启动后,再发送消息,是起到多线程效果了。然后就感到很奇怪了,来后,就打断点分析一下相关源码。
spring:
rabbitmq:
listener:
simple:
concurrency: 5 #最小消息监听线程数
max-concurrency: 5 #最大消息监听线程数
public void processMessage1(String msg) {
System.out.println("thread name:" + Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return;
}
这是spring ioc初始化的refresh方法,是在最后一个行,finishRefresh调用的。
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
...
// Last step: publish corresponding event.
finishRefresh();
...
}
protected void finishRefresh() {
// Clear context-level resource caches (such as ASM metadata from scanning).
clearResourceCaches();
// Initialize lifecycle processor for this context.
// 初始化声明周期processor
initLifecycleProcessor();
// 就是这里调用的
getLifecycleProcessor().onRefresh();
// Publish the final event.
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
LiveBeansView.registerApplicationContext(this);
}
protected void initLifecycleProcessor() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(LIFECYCLE_PROCESSOR_BEAN_NAME)) {
this.lifecycleProcessor =
beanFactory.getBean(LIFECYCLE_PROCESSOR_BEAN_NAME, LifecycleProcessor.class);
if (logger.isDebugEnabled()) {
logger.debug("Using LifecycleProcessor [" + this.lifecycleProcessor + "]");
}
}
else {
// 这里提供了一个默认的生命周期processor
DefaultLifecycleProcessor defaultProcessor = new DefaultLifecycleProcessor();
defaultProcessor.setBeanFactory(beanFactory);
// 把这个默认生命周期保存在到applicationcontext中
this.lifecycleProcessor = defaultProcessor;
beanFactory.registerSingleton(LIFECYCLE_PROCESSOR_BEAN_NAME, this.lifecycleProcessor);
if (logger.isDebugEnabled()) {
logger.debug("Unable to locate LifecycleProcessor with name '" +
LIFECYCLE_PROCESSOR_BEAN_NAME +
"': using default [" + this.lifecycleProcessor + "]");
}
}
}
LifecycleProcessor接口就两个方法
public interface LifecycleProcessor extends Lifecycle {
/**
* Notification of context refresh, e.g. for auto-starting components.
*/
// spring refresh的时候,自动调用这个方法,启动组件
void onRefresh();
/**
* Notification of context close phase, e.g. for auto-stopping components.
*/
// 同理
void onClose();
}
默认的生命周期process,实现了lifecycleProcess的这个方法,前面的spring的finish调用了这里。
@Override
public void onRefresh() {
startBeans(true);
this.running = true;
}
private void startBeans(boolean autoStartupOnly) {
// 获取实现了lifecycle的所有的类,注意是lifecycle,不是lifecycleProcess
// 这里有两个,rabbitTemplate和internalRabbitListenerEndpointRegistry这个
// rabbit监听器注册器,这个监听器注册器很重要
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
Map<Integer, LifecycleGroup> phases = new HashMap<>();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int phase = getPhase(bean);
LifecycleGroup group = phases.get(phase);
if (group == null) {
// defaultLifecycleProcess维护了一个group组,然后这里把这个每个lifecycleProcess封装一下,放进去
// 这里只有rabbitmq监听器注册器,会放进来
group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
phases.put(phase, group);
}
group.add(beanName, bean);
}
});
if (!phases.isEmpty()) {
List<Integer> keys = new ArrayList<>(phases.keySet());
Collections.sort(keys);
for (Integer key : keys) {
// 关键在这里,调用LifecycleGroup的start方法,这个LifecycleGroup封装了rabbitmq的监听器,注册器
phases.get(key).start();
}
}
}
public void start() {
if (this.members.isEmpty()) {
return;
}
if (logger.isInfoEnabled()) {
logger.info("Starting beans in phase " + this.phase);
}
Collections.sort(this.members);
for (LifecycleGroupMember member : this.members) {
if (this.lifecycleBeans.containsKey(member.name)) {
// 这里调用defaultLifeCycleProcess的dostart方法
doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
}
}
}
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
// 这里获得rabbit监听器注册器
Lifecycle bean = lifecycleBeans.remove(beanName);
if (bean != null && bean != this) {
// 获取rabbitmq监听器依赖的bean,这里没有
String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
for (String dependency : dependenciesForBean) {
doStart(lifecycleBeans, dependency, autoStartupOnly);
}
if (!bean.isRunning() &&
(!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
if (logger.isDebugEnabled()) {
logger.debug("Starting bean '" + beanName + "' of type [" + bean.getClass() + "]");
}
try {
// 这里调用rabbitmq监听器的start方法,
// 这里终于进入重点了
bean.start();
}
catch (Throwable ex) {
throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
}
if (logger.isDebugEnabled()) {
logger.debug("Successfully started bean '" + beanName + "'");
}
}
}
}
再进入重点之前,总结一下lifecycle:
spring的finishRefresh方法,会生成一个默认的lifecycleProcess,并注册到applicationContext中变量中去,然后,调用这个lifecycleProcess的onRefresh方法,然后lifecycleProcess会寻找所有的实现了lifecycle的类,然后调用他们的start方法。
所以,lifecycle就是提供给spring组件,初始化的。
这里进入了重点:
调用rabbit监听器注册器的start方法
@Override
public void start() {
// getListenerContainers获取所有的消费者监听器
// 然后遍历
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
// 启动监听器,重点
startIfNecessary(listenerContainer);
}
}
@Override
protected void doStart() throws Exception {
if (getMessageListener() instanceof ListenerContainerAware) {
Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
if (expectedQueueNames != null) {
String[] queueNames = getQueueNames();
Assert.state(expectedQueueNames.size() == queueNames.length,
"Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
boolean found = false;
for (String queueName : queueNames) {
if (expectedQueueNames.contains(queueName)) {
found = true;
}
else {
found = false;
break;
}
}
Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
}
}
super.doStart();
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
// 这里,会初始化线程池,重点,往下看
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " +
"(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
for (AsyncMessageProcessingConsumer processor : processors) {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
}
protected int initializeConsumers() {
int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
this.cancellationLock.reset();
// this.concurrentConsumers是我们配置的concurrency线程数
// 可以看到,有多个线程数就有多少个,堵塞队列消费者BlockingQueueConsumer的set
// BlockingQueueConsumer就是真正消费消息的
this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
// 有多少个线程,就创建多少个堵塞队列消费者BlockingQueueConsumer
for (int i = 0; i < this.concurrentConsumers; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.add(consumer);
count++;
}
}
}
return count;
}
这是堵塞队列消费者的构造方法
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {
this.connectionFactory = connectionFactory;
this.messagePropertiesConverter = messagePropertiesConverter;
this.activeObjectCounter = activeObjectCounter;
this.acknowledgeMode = acknowledgeMode;
this.transactional = transactional;
this.prefetchCount = prefetchCount;
this.defaultRequeueRejected = defaultRequeueRejected;
if (consumerArgs != null && consumerArgs.size() > 0) {
this.consumerArgs.putAll(consumerArgs);
}
this.noLocal = noLocal;
this.exclusive = exclusive;
this.queues = Arrays.copyOf(queues, queues.length);
// 这里看到,堵塞队列的大小就是prefetchCount设定的,
// prefetchCount默认是250,也就是这个消费者,会堵塞250个消息
this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
}
其实,这里就已经发现了我前面的问题。
解答是:服务一启动的时候,消费者的一个线程,就会一下拉取最大250个消息,所以一下把rabbitmq服务器的消息拉取完了。(其实不是拉取,是有一个线程往这个队列里循环放最大250个消息)
所以,这个消费者线程堵塞了,其他消息者线程也拿不到消息,所以也就没有并发执行。
而,服务启动后,生产者再发送一条消息,则会随机取一个线程执行,也就达到并发的效果。
回到这个dostart方法
protected void doStart() throws Exception {
if (getMessageListener() instanceof ListenerContainerAware) {
Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
if (expectedQueueNames != null) {
String[] queueNames = getQueueNames();
Assert.state(expectedQueueNames.size() == queueNames.length,
"Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
boolean found = false;
for (String queueName : queueNames) {
if (expectedQueueNames.contains(queueName)) {
found = true;
}
else {
found = false;
break;
}
}
Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
}
}
super.doStart();
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
// 这里初始化了所有的堵塞队列消费者
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " +
"(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
// 这里启动所有的堵塞队列消费者
for (BlockingQueueConsumer consumer : this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// getTaskExecutor()获取单线程线程池,然后启动线程,执行process的run方法
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
for (AsyncMessageProcessingConsumer processor : processors) {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
}
总结,设置了多少个concurrency最大监听器线程数,就会有多少个堵塞队列消费者,然后每个堵塞队列消费者,启动一个线程。
@Override
public void run() {
...
// 轮询,查看是否来了消息
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
try {
// 这里重点,有消息则true,没消息则false
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
if (receivedOk) {
if (isActive(this.consumer)) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
consecutiveMessages = 0;
}
}
}
else {
consecutiveMessages = 0;
if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
consecutiveIdles = 0;
}
}
}
long idleEventInterval = getIdleEventInterval();
if (idleEventInterval > 0) {
if (receivedOk) {
updateLastReceive();
}
else {
long now = System.currentTimeMillis();
long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
long lastReceive = getLastReceive();
if (now > lastReceive + idleEventInterval
&& now > lastAlertAt + idleEventInterval
&& SimpleMessageListenerContainer.this.lastNoMessageAlert
.compareAndSet(lastAlertAt, now)) {
publishIdleContainerEvent(now - lastReceive);
}
}
}
}
}
最后,到这里,
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR
Channel channel = consumer.getChannel();
for (int i = 0; i < this.txSize; i++) {
logger.trace("Waiting for message from consumer.");
Message message = consumer.nextMessage(this.receiveTimeout);
if (message == null) {
break;
}
try {
// 有消息,则反射调用我们定义的消费者
executeListener(channel, message);
}
...
}
然后到这里
public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
if (logger.isTraceEnabled()) {
logger.trace("Retrieving delivery for " + this);
}
checkShutdown();
if (this.missingQueues.size() > 0) {
checkMissingQueues();
}
// 从堵塞队列中poll出元素,没有元素,则堵塞,一秒,这个时间receiveTimeout是可以设置的
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
if (message == null && this.cancelled.get()) {
throw new ConsumerCancelledException();
}
return message;
}
至此:整个消费者的过程,分析完了。
总结:每个消费者线程,轮询查看是否有消息,默认每1秒轮询一次。有消息,则调用我们定义的消费者。
关于消费者线程:
假设,有两个消费者,然后消费者线程定义为,则总共有10个消费者线程。
消费者发送消息,会来到这里,然后往我们的堵塞队列消费者,添加消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("Storing delivery for consumerTag: '"
+ consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "
+ BlockingQueueConsumer.this);
}
try {
if (BlockingQueueConsumer.this.abortStarted > 0) {
if (!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body),
BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
// Defensive - should never happen
BlockingQueueConsumer.this.queue.clear();
getChannel().basicNack(envelope.getDeliveryTag(), true, true);
getChannel().basicCancel(consumerTag);
try {
getChannel().close();
}
catch (TimeoutException e) {
// no-op
}
}
}
else {
// 消费者发送消息,会来到这里,然后往我们的堵塞队列消费者,添加消息
BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body));
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
调试过程遇到的问题:
BlockingQueueConsumer的queue,看到发送消息的时候,offer进去了,但是while遍历的时候,queue的size都是0。然后queue的任何地方,打断点都没有停。
结果:这是多线程导致的问题,blockQueueConsumer有多少线程拥有了这个对象,然后看到的是不同对象的BlockingQueueConsumer的queue。
网友评论