RabbitMQ是一个开源的消息中间件,自带管理界面友好、开发语言支持广泛、没有对其它中间件的依赖,而且社区非常活跃,特别适合中小型企业拿来就用。这篇文章主要探讨提升RabbitMQ消费速度的一些方法和实践,比如增加消费者、提高Prefetch count、多线程处理、批量Ack等。
增加消费者
这个道理比较容易理解,多个人搬砖的速度肯定比一个人要快很多。
不过实际情况中还需要面对一些技术挑战,比如后端处理能力、并发冲突,以及处理顺序。
后端处理能力:比如多个消费者都要操作数据库,那么数据库连接的并发数和读写吞吐量就是后端处理能力,如果达到了数据库的最大处理能力,增加再多的消费者也没有用,甚至会因为数据库拥塞导致整体消费速度的下降。这个问题还存在另一种情况,就是消费者是否真正的发挥了后端服务的处理能力,比如使用Redis时是否采用了多线程、IO复用等方式来进一步提升吞吐量。
并发冲突:比如两个消费者都要去修改用户的积分,单个消费者的做法可能就是取出来、改下字段的值、最后再update到数据库,多个消费者时如果同时取出了相同的数据,还这样处理的话就会出问题了。这时候可能需要修改下SQL语句,直接在SQL语句中修改积分,由数据库写入事务来处理并发冲突;或者搞一个分布式锁,对于具体的某个用户同时只能有一个消费者来处理其积分。
处理顺序:如果消息需要被顺序处理,那么各个消费者之间还需要增加一个同步机制。比如基于GPS定位的电子围栏,在出围栏的某个时段,先产生了围栏内定位消息、然后产生了围栏外定位消息;如果围栏外定位消息先被一个消费者处理,则判定为出围栏,这没有问题;然后围栏内定位消息被另一个消费者处理,则会被判定为入围栏,这个就属于误判了。这时候可能要同步一个已处理定位时间,早于这个时间的定位就抛弃掉;或者同一个设备的定位消息通过某种算法控制只能由某个消费者进行处理。
解决后边两个问题的方法不可避免的要引入多个消费者之间的协商机制,如果这些协商机制设计不好会对处理速度带来很大影响。因此多人搬砖速度快的前提是多个人搬砖时不需要大家频繁的坐下来协商谁搬哪块砖,否则就会浪费很多时间在相互协调上,反而不能提升搬砖的速度。
所以通过增加消费者提升消费速度得以成立的前提是消费者业务并发处理能力要足够,消费者依赖的后端服务处理能力也要足够。这是此种方式的关键点。
提高Prefetch count
消息消费速度主要受到发送消息时间、消费者处理时间、消息Ack时间这几个时间的影响,如果一个消息走完这个流程再发送另一个的话,效率将会非常低。可以让消息在这几个时间内恰当的分配,让消息总是连续不断的被消费者接收处理,就可以提升消费者的消费速度。
根据如上描述,有些消息可能正在被消费者处理,有些可能在等待消费者处理,有的消息可能还在网络传输中,而如果不限制传输的数量,消费者端可能因处理能力补足会堆积大量的消息,首先内存使用将不可控制,其次此时也无法将这些消息再分配给别的消费者。因此才有了Prefetch count,用于控制消息发送给消费者的速度;这个方案需要配合Ack使用,消费者回复消息Ack后,RabbitMQ才会继续发送同等数量的消息到消费者。提高Prefetch count到一个合适的值可以提升消息的消费速度。这个值的设定可能还要实时参考上边提到的三个时间,这有点类似TCP的流控措施。这个值的计算方法请看下文:
RabbitMQ关于吞吐量,延迟和带宽的一些理论
参考文档:https://blog.csdn.net/gbbqrglvir3dyi82/article/details/78663828
多线程处理
多线程处理和增加消费者有异曲同工之妙。多线程处理不需要建立多个到RabbitMQ的连接,它在收到队列消息后将其放入不同的线程中进行处理,这样进程中就会有多个消息同时处理,增加了消费吞吐量,从而提升了消费速度。
来看一个例子:
consumer.Received += (o, e) =>
{
ThreadPool.QueueUserWorkItem(newWaitCallback(ProcessSingleContextMessage), e);
};
在这个例子中波斯码将收到的消息放入线程池队列进行处理,注意这里需要配合上一节提到的Prefetch count,设置一个合适的值,消费者就可以同时处理多条消息了。
多线程处理也存在多消费者处理时的问题,只不过在一个进程中处理并发冲突和消息顺序的成本可能更低一些。下边的代码片段展示了一个解决消息顺序处理问题的方案:
// 接收消息存入列表,当接收数量达到prefetchCount/2时就加入处理队列;
// 1/2是考虑了消息从RabbitMQ到消费者的传输时间,不需要等所有的消息都到达了才开始处理。
consumer.Received += (o, e) =>
{
lock(receiveLocker){
basicDeliverEventArgsList.Add(e);
if(basicDeliverEventArgsList.Count >= prefetchCount/2)
{
vardeliverEventArgs = basicDeliverEventArgsList.ToArray();
basicDeliverEventArgsList.Clear();
EnProcessQueue(deliverEventArgs);
}
}
};
// 此处省略数据出队列的代码,请自行脑补
....
// 然后这个方法是用来处理消息的,将消息根据数据Key分成若干组,放到多个任务中并行处理;
// 相同数据Key的消息将分配到一个组中,在这个组中数据被顺序处理
privatevoidProcess(BasicDeliverEventArgs[] args)
{
if(args.Length <=0)
{
return;
}
try
{
vartasks = CreateParallelProcessTasksByDataKey(args);
Task.WaitAll(tasks);
}
catch(Exception ex)
{
ToLog("处理任务发生异常", ex);
}
}
// 创建并行处理多条消息的任务
privateTask[] CreateParallelProcessTasksByDataKey(BasicDeliverEventArgs[] args)
{
// 根据dataKey进行分组,dataKey可以放到消息的header中进行传输,这里就不给出具体的分组方法了
Dictionary> eDic = GetMessgeGroupByDataKey(args);
// 任务数量
varparalleTaskNum =this.parallelNum;
if(paralleTaskNum > eDic.Count)
{
paralleTaskNum = eDic.Count;
}
// 每个任务处理的消息数量
varperTaskNum = (int)Math.Ceiling(args.Length / (double)paralleTaskNum);
// 任务数组
List tasks =newList();
vartaskArgs =newList();
for(int j = eDic.Count -1; j >=0; j--)
{
varcurrentElement = eDic.ElementAt(j);
taskArgs.AddRange(currentElement.Value);
eDic.Remove(currentElement.Key);
if(taskArgs.Count >= perTaskNum || j ==0)
{
// 创建任务,并处理分配的消息
vartaskList = taskArgs.Select(d => d).ToList();
taskArgs.Clear();
vartask = Task.Factory.StartNew(() =>
{
// 这这里处理分组中的消息
...
});
tasks.Add(task);
}
}
returntasks.ToArray();
}
上边这段代码中解决问题的关键就是将消息进行分组,同组内的消息顺序处理,分组间并行处理,既通过多线程提升了消息整体的处理速度,又能支持消息的顺序处理。
批量Ack
这种方式有效的原理是:每条消息分别Ack的情况下,RabbitMQ收到一个Ack才发送一条消息,这中间就会有很多的时间在等待Ack回来,通过批量Ack的方式,减少了很多Ack传输的时间。注意这里隐含的方式是RabbitMQ通过设置的Prefetch count连续向消费者发送多条消息,否则这个批量就没意义了。
下边的代码片段给出其使用方式:
channel.BasicAck(e.DeliveryTag,true);
第2个参数为true就是指示采用批量Ack的方式,凡是delivery-tag比第1个参数小的消息都会被Ack。
这里需要注意:如果消费者在处理某条消息时失败了,业务上又要求不能丢失任何消息,这时就不能对所有的消息进行批量Ack,否则RabbitMQ就不会再次投递这条消息了,这需要根据自己的实际情况进行取舍。解决此问题的一个简单方法是,跟踪所有消息的处理结果,如果全部成功则使用批量Ack,如果部分成功则有两个选择:如果不关注顺序则退化为每个消息发送Ack或Reject的方式;如果关注顺序则本次接收到Prefetch count数量的消息全部nack,否则reject的消息再次投递时顺序就不对了,这时候业务还要做好处理重复数据的逻辑。
总结
通过分析上边的这些方法,在使用RabbitMQ消费时可以遵循这样一个路径:
启用Prefetch count设置;
先1个消费者,1次只接收1条,处理完毕后再传输下一条,这样可以避免并发冲突和消息顺序问题;
如果消费速度不满足要求,则1次接收多条,按接收顺序处理;
如果消费速度还是不满足要求,则1次接收多条,并行处理;
如果消费速度还是不满足要求,则启动多个消费者,并行处理。
如果消费速度还是不满足要求,改需求,或者换别的中间件。
在这个过程中需要始终关注优化消费者及后端程序处理能力,比如优化SQL语句、使用缓存、使用负载均衡等等,加快处理速度就能提升消费速度,而且很多时候就是程序处理太耗时了。
关于重复数据、并发冲突、顺序处理问题的处理:
随时做好处理重复数据的准备,因为不只消费者端可能会触发消息的重复投递,发送端也可能重复发送消息,这个很难避免。
对于并发冲突问题,消费者进程内可以使用锁,跨消费者引入第三方机制来处理,比如使用Redis原子操作、数据库原子操作或者分布式锁。
对于顺序处理问题,最好没有这个需求;在同一个消费者内可以分组处理;在多个消费者时使用队列分组,每个队列绑定不同的Route key,不同Route key代表的消息之间没有顺序关联。波斯码再次提醒还要注意处理失败时的逻辑,避免重新投递消息的顺序问题。
网友评论