前言碎语
上次发文到现在已经三个月了, 本来计划每个月2篇的美梦就这样成了泡影, 个中缘由不想详述, 或者说不想找借口. 如此看来, 时光匆匆, 需珍惜, 切记切记.

RabbitMQ是啥?
最早接触RabbitMQ还是在3年前, 当时接手一个系统迁移到中国的落地任务. 系统很大, 架构很复杂, 功能也很强大. 过程中填了无数的坑, 但是也学到很多.
现在回想起来, 令我印象最深的就是RabbitMQ. RabbitMQ是什么? 简单的一句话总结 "RabbitMQ是基于AMQP协议的队列服务". 由于它是Erlang这个天生分布式的语言所开发, 所以分布式, 集群, 高可用这些武功它统统都会.

有童鞋忍不住要问了, 这么高大上的东西, 我用它作甚? 截取一段CSDN中大神anzhsoft的描述:
对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通信?这和传统的IPC有很大的区别。传统的IPC很多都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如:
1)信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何方式丢失?
2)如何降低发送者和接收者的耦合度?
3)如何让Priority高的接收者先接到数据?
4)如何做到load balance?有效均衡接收者的负载?
5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不同的数据,如何做有效的filter。
6)如何做到可扩展,甚至将这个通信模块发到cluster上?
7)如何保证接收者接收到了完整,正确的数据?
是的, 对于这些问题RabbitMQ都能给你一个答案.

动手实践
闲话不多说, 让我们操练起来.
首先安装RabbitMQ的服务, 下载地址在这里
提示一下, 安装后把服务启动起来通过执行下面的命令把管理插件打开你会感觉格外的舒爽.
rabbitmq-plugins enable rabbitmq_management
打开管理插件后在浏览器输入http://localhost:15672 即可打开管理页面, 视图如下

然后需要用Nuget导入RabbitMQ客户端. 我使用的开发工具是vs code, 安装包相对来说会比较复杂, 如果你用Visual Studio会简单很多.
好了, 我说下vs code 中具体的操作步骤. 首先在扩展中搜索nuget, 可以看到一个叫.net core project manager的扩展, 安装后快捷键Ctrl+Shift+P 打开命令工具输入nuget即可看到一个nuget: add new package的提示, 选中后搜索rabbitmq, 会看到列出的几个版本号, 我选的4.1.1
简单码一段代码看看效果.
-
配置, 创建Client
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "test";
factory.Password = "test";
factory.VirtualHost = "test";
factory.HostName = "127.0.0.1";IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); channel.ExchangeDeclare("test", "topic"); channel.QueueDeclare("test",true,true,false,null); channel.QueueBind("test", "test", "test", null);
-
编写生产者的代码
Timer t =new Timer((a)=>{
var i=0;
while (i++<100)
{
try
{
channel.BasicPublish("test","test",true,null, messageBodyBytes);
}
catch (System.Exception ex)
{
System.Console.WriteLine(ex.Message);
}
}
},null,0,1000);
用了一个Timer, 可以定时发送数据. -
编写消费者代码
EventingBasicConsumer c = new EventingBasicConsumer(channel);
c.Received += (ch, ea) =>
{
System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(ea.Body));
Thread.Sleep(1000);
channel.BasicAck(ea.DeliveryTag, false);
};
string consumerTag = channel.BasicConsume("test", false, c);
RabbitMQ的消费者有几种, 我这里用的是EventingBasicConsumer 它是被事件触发的, 不需要主动轮询去消费, 而且也是目前RabbitMQ官方推荐的.
需要注意这一句
string consumerTag = channel.BasicConsume("test", false, c);
第二个参数我设置为false, 意思是我要主动向server确认信息的接收. 这样做的好处有两点. 第一, 消费者的入口容易控制, 不会把消费者压死. 第二, 可以通过RabbitMQ的控制台直观的了解到消费者的处理能力. 看了后面的内容我相信你会理解我的Point.

代码码好了, 来run一下瞧瞧.

由于我是单线程消费, 并且每次消费过程中Sleep一秒, 这样我每秒的处理速度高达 1

进阶一点点
看到这有人会说了, 你写个这么烂的代码也好意思出来晒? 好吧, 我是要循序渐进的!

废话不多说, 上代码!
Task.Factory.StartNew(()=>{
System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(ea.Body));
Thread.Sleep(1000);
channel.BasicAck(ea.DeliveryTag, false);
});


贴了两张图, 看官们可以看到, 处理速度有一个爬坡的过程, 而且运行到一段时间以后基本平稳在了100个/秒上下. 川酷不满意的是, 此时的cpu才仅仅占用了10%不到. 对于我这种抓到蛤蟆攥出尿来的人, 不压榨压榨实在不甘心. 对消费者进行了一下改造, 且看.
public static ConcurrentQueue<BasicDeliverEventArgs> Queue1 = new ConcurrentQueue<BasicDeliverEventArgs>();
EventingBasicConsumer c = new EventingBasicConsumer(channel);
c.Received += (ch, ea) =>
{
Queue1.Enqueue(ea);
};
string consumerTag = channel.BasicConsume("test", false, c);
var j=0;
while (j++<150)
{
Task.Factory.StartNew(()=>{
BasicDeliverEventArgs bdea=null;
while (true)
{
if(Queue1.TryDequeue(out bdea)){
System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(bdea.Body));
Thread.Sleep(1000);
channel.BasicAck(bdea.DeliveryTag, false);
}
}
});
}
哈哈, 没错, 起了150个线程去同步处理, 速度就达到了150, 看证据

其实这时候的cpu占用率也不高. 只想做一个抛砖引玉, 当我们直接调用.net为我们提供的傻瓜式接口时, 有时候会遇到结果并不那么如意的情况, 逼着我们开动脑筋. 看官当中肯定会有人问, 如果起10000个线程, 岂不是速度能达到10000个/秒? 自己动手试试呗.

如果您觉得这篇文章对您有那么一丁点益处, 或者从某个角度触动到了您, 请给川酷一些鼓励, 打赏, 点赞, 关注, 哪怕评论区骂我两句, 鄙人都感激涕零.

网友评论