美文网首页dotNET
netmq VS redis 订阅发布性能研究

netmq VS redis 订阅发布性能研究

作者: 年少_年少 | 来源:发表于2018-09-05 22:11 被阅读9次

    简单介自己绍下:

    .NET开发,从事手游开发有8年多,真爱呀 有木有

    代码前往这里 https://github.com/GuojieChen/netmq-patterns-sample

    目前项目在确定业务需要的技术过程中遇到了一个选择
    消息队列使用哪种方式好,性能又是如何

    目前网上找到几篇文章
    国内:https://www.cnblogs.com/pasoraku/p/4673039.html
    国外:https://gist.github.com/hmartiro/85b89858d2c12ae1a0f9

    国外这篇文章中提到

    ZeroMQ, Pub/Sub: 481,000 msg/s, latency <1 ms
    Redis Pub/Sub (async via libevent): 59,000 msg/s, latency <1 ms

    对于这个结论(基于C++)这个暂时没有模拟去测试

    用例介绍

    生产者:10个线程,不断产生数据,这里用时间的记时周期数DateTime.Now.Ticks
    消费者:接收数据,计算消耗的时间
    每10s统计一次每秒的平均值

    netmq

    PubSub.Server 订阅方,消费者

    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using NetMQ;
    using NetMQ.Sockets;
    
    namespace PubSub.Server
    {
        class Program
        {
    
            static void Main(string[] args)
            {
                SubscriberSocket socket = new SubscriberSocket(">tcp://localhost:1012");
                socket.Subscribe("aaa");
                socket.ReceiveReady += Socket_ReceiveReady;
                NetMQ.NetMQPoller poller = new NetMQ.NetMQPoller();
                poller.Add(socket);
                poller.RunAsync();
    
                Task.Factory.StartNew(() => 
                {
                    while (true)
                    {
                        Thread.Sleep(1*1000);
    
                        var tmp = d;
                        d = new List<TimeSpan>();
    
                        if (tmp.Any())
                            Console.WriteLine($"{DateTime.Now}\t{tmp.Count / 1}/s\t{tmp.Average(x => x.TotalMilliseconds)}");
                        else
                            Console.WriteLine("-");
                    }
                });
    
                Console.Read();
            }
    
            private static List<TimeSpan> d = new List<TimeSpan>();
            
            private static void Socket_ReceiveReady(object sender, NetMQ.NetMQSocketEventArgs e)
            {
                var topic = e.Socket.ReceiveFrameString();
                var dt = e.Socket.ReceiveFrameString();
                //Console.WriteLine(dt);
                var t = DateTime.Now.Subtract(new DateTime(Convert.ToInt64(dt)));
                d.Add(t);
            }
        }
    }
    

    PubSub.Proxy 中间代理 模拟正式环境中的Proxy层,也等同redis server

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using NetMQ;
    using NetMQ.Sockets;
    
    namespace PubSub.Proxy
    {
        class Program
        {
            static void Main(string[] args)
            {
                NetMQ.Proxy proxy = new NetMQ.Proxy(new XSubscriberSocket("@tcp://*:1011"),new XPublisherSocket("@tcp://*:1012"));
                Console.WriteLine("running...");
                proxy.Start();
            }
        }
    }
    
    

    PubSub.Client 消费方,生产者

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using NetMQ;
    using NetMQ.Sockets;
    
    namespace PubSub.Client
    {
        class Program
        {
            private static PublisherSocket socket = new PublisherSocket(">tcp://localhost:1011");
            private static NetMQQueue<string> queue = new NetMQQueue<string>();
            static void Main(string[] args)
            {
                queue.ReceiveReady += (s, e) =>
                {
                    var msg = e.Queue.Dequeue();
                    socket.SendMoreFrame("aaa").SendFrame(msg);
                };
    
                NetMQPoller poller = new NetMQPoller();
                poller.Add(queue);
                poller.RunAsync();
    
                for (var i = 0; i < 100; i++)
                {
                    Task.Factory.StartNew(Run, i);
                }
    
                Console.WriteLine("running...");
                Console.Read();
            }
    
            private static void Run(object obj)
            {
                while (true)
                {
                    queue.Enqueue(Convert.ToString(DateTime.Now.Ticks));
    
                    Thread.Sleep(1);
                }
            }
        }
    }
    
    

    redis测试模型

    RedisPubSub.Server

    using StackExchange.Redis;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RedisPubSub.Server
    {
        class Program
        {
            private static ConnectionMultiplexer ConnectionMultiplexer = ConnectionMultiplexer.Connect("127.0.0.1:5379,127.0.0.1:5380,password=123456");
    
    
            private static List<TimeSpan> d = new List<TimeSpan>();
            
            static void Main(string[] args)
            {
                var date = DateTime.Now;
    
                ConnectionMultiplexer.GetSubscriber().Subscribe("aaa", (c, m) =>
                { 
                    var t = DateTime.Now.Subtract(new DateTime(Convert.ToInt64(m)));
    
                    d.Add(t);
                });
    
    
                Task.Factory.StartNew(() => 
                {
                    while (true)
                    {
                        Thread.Sleep(1 * 1000);
    
                        var tmp = d;
                        d = new List<TimeSpan>();
    
                        if (tmp.Any())
                            Console.WriteLine($"{DateTime.Now}\t{tmp.Count / 1}/s\t{tmp.Average(x => x.TotalMilliseconds)}");
                        else
                            Console.WriteLine("-");
                    }
                });
                Console.Read();
            }
        }
    }
    
    

    RedisPubSub.Client

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using StackExchange.Redis;
    
    namespace RedisPubSub.Client
    {
        class Program
        {
            private static ConnectionMultiplexer ConnectionMultiplexer = ConnectionMultiplexer.Connect("127.0.0.1:5379,127.0.0.1:5380,password=123456");
    
            static void Main(string[] args)
            {
                for (var i = 0; i < 100; i++)
                {
                    Task.Factory.StartNew(Run,i);
                }
    
                Console.WriteLine("running...");
                Console.Read();
            }
    
            private static void Run(object obj)
            {
                while (true)
                {
                    ConnectionMultiplexer.GetSubscriber().Publish("aaa", DateTime.Now.Ticks);
                    Thread.Sleep(1);
                }
            }
        }
    }
    
    

    netmq 运行结果

    2018/9/5 21:38:07       4810/s  0.0773472247630134
    2018/9/5 21:38:17       4953/s  0.0345690630488765
    2018/9/5 21:38:27       4956/s  0.0315224333756985
    2018/9/5 21:38:37       4838/s  0.0788584731751667
    2018/9/5 21:38:47       4968/s  0.0267414818244033
    2018/9/5 21:38:57       4927/s  0.0456701723053194
    2018/9/5 21:39:07       4921/s  0.0432689302042063
    2018/9/5 21:39:17       4927/s  0.0381800129878038
    2018/9/5 21:39:27       4816/s  0.0886798555027297
    2018/9/5 21:39:37       4666/s  0.118044937753114
    2018/9/5 21:39:47       4938/s  0.0384998096665115
    2018/9/5 21:39:57       4956/s  0.029773836282561
    2018/9/5 21:40:07       4947/s  0.0333202635781134
    2018/9/5 21:40:17       4846/s  0.0800275512730572
    

    redis 测试结果

    2018/9/5 21:41:18       4948/s  0.0181999898967447
    2018/9/5 21:41:28       4987/s  0.0271869203472404
    2018/9/5 21:41:38       4988/s  0.0162716218925421
    2018/9/5 21:41:48       4995/s  0.0106067283246252
    2018/9/5 21:41:58       4991/s  0.00916983653191234
    2018/9/5 21:42:08       4983/s  0.0864149871578777
    2018/9/5 21:42:18       4989/s  0.0139901451076282
    2018/9/5 21:42:28       4993/s  0.0140108803620779
    2018/9/5 21:42:38       4988/s  0.0111812829507868
    2018/9/5 21:42:48       4994/s  0.00812532134705482
    2018/9/5 21:42:58       4991/s  0.0198666673345088
    2018/9/5 21:43:08       4987/s  0.025591579622687
    2018/9/5 21:43:18       4988/s  0.0872425344264263
    2018/9/5 21:43:28       4989/s  0.0124017437665357
    2018/9/5 21:43:38       4992/s  0.0293738252613868
    2018/9/5 21:43:48       4972/s  0.0869289167286693
    

    结论就大家自己总结拉,而且目前是小量数据的测试
    以上有不妥的地方还希望各位大佬指出,感谢

    相关文章

      网友评论

        本文标题:netmq VS redis 订阅发布性能研究

        本文链接:https://www.haomeiwen.com/subject/nqezwftx.html