美文网首页后端开发者Python消息队列
ZeroMQ指南:第一章——基础

ZeroMQ指南:第一章——基础

作者: lakerszhy | 来源:发表于2016-11-21 17:22 被阅读2712次
    翻译自“http://zguide.zeromq.org/py:all”
    

    拯救世界

    开始的假设

    我们假设你使用ZeroMQ 3.2以上的版本。我们假设你使用Linux或者类似的操作系统。我们假设你或多或少能看懂C语言,因为这是示例的默认语言。我们假设当看到类似PUSH或SUBSCRIBE这样的常亮时,你能知道它们的真名是ZMQ_PUSH或ZMQ_SUBSCRIBE。

    获得示例

    提问和回答

    让我们开始写代码。我们从Hello World示例开始。我们会开发一个客户端和一个服务端。客户端发送“Hello”到服务端,服务端回复“World”。下面是Python的服务端,在5555端口启动一个ZeroMQ socket,从中读取请求,并回复“World”给每一个请求:

    #   Hello World server in Python
    #   Binds REP socket to tcp://*:5555
    #   Expects b"Hello" from client, replies with b"World"
    
    import time
    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")
    
    while True:
        #  Wait for next request from client
        message = socket.recv()
        print("Received request: %s" % message)
    
        #  Do some 'work'
        time.sleep(1)
    
        #  Send reply back to client
        socket.send(b"World")
    

    图2:请求——回复
    <div align=center>


    </div>

    REQ-REP socket对是同步的。客户端在循环中(或者只发起一次)发起zmq_send(),然后发起zmq_recv()。其它任何顺序(比如,一次发送两个消息)都会导致发送或接收返回-1。同样的,服务端按顺序发起zmq_recv(),然后发起zmq_send()。

    下面是客户端的代码:

    #   Hello World client in Python
    #   Connects REQ socket to tcp://localhost:5555
    #   Sends "Hello" to server, expects "World" back
    
    import zmq
    
    context = zmq.Context()
    
    #  Socket to talk to server
    print("Connecting to hello world server…")
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    
    #  Do 10 requests, waiting each time for a response
    for request in range(10):
        print("Sending request %s …" % request)
        socket.send(b"Hello")
    
        #  Get the reply.
        message = socket.recv()
        print("Received reply %s [ %s ]" % (request, message))
    

    在实际应用看起来太简单了,但是向我们已经学过的那样,ZeroMQ有超能力。你可以向这个服务端一次发起几千个客户端,它也会继续快速的工作。为了好玩,先启动客户端,然后启动服务端,它仍然可以正常工作,然后想想这意味着什么。

    让我们简单的解释一下这两个程序做了什么。它们创建了一个ZeroMQ的context和一个socket。不用担心这些名词的意思,稍后会解释。服务端绑定它的REP(回复)socket到5555端口。服务端在循环中等待请求,然后每次响应一个回复。客户端发送一个请求,并从服务端读取回复。

    如果你杀死服务端(Ctrl-C),并重新启动,客户端不会正确的恢复。从崩溃进程中恢复很不容易。开发一个可靠的请求——回复流很复杂,我们会在第四章讨论。

    这个场景背后发生了什么事情,但对于程序员来说代码很简洁,甚至在大量负载下也不会经常崩溃。这就是请求——回复(request-reply)模式,可能是使用ZeroMQ最简单的方式。它对应RPC和经典的客户端/服务端模型。

    字符串小提示

    除了发送的字节数,ZeroMQ不知道你发送的任何数据。这意味着你需要负责格式化数据,让应用程序可以取回数据。格式化对象和负责数据类型是专业库的工作,比如Protocol Buffers。但是对于字符串,你需要小心。

    在C和其它一些语言中,字符串以null结尾。我们发送一个字符串,比如“HELLO”,会额外附加一个null字节:

    zmq_send (requester, "Hello", 6, 0);
    

    如果从其它语言发送字符串,可能不会包括null字节。例如,当用Python发送同样的字符串时:

    socket.send ("Hello")
    

    那么发送到网络上的是一个长度(较短的字符串只需要一个字节),以及字符串的内容作为单独的字符。

    图3:一个ZeroMQ字符串
    <div align=center>


    </div>

    如果你用C语言读取这个字符串,你会得到一个看起来像字符串的东西,可能偶尔行为像字符串(如果幸运的话,这5个字节发现它们后面跟着一个不知不觉潜伏的null),但它不是一个严格意义上的字符串。当你的客户端和服务端的字符串格式不同时,你会得到怪异的结果。

    当你用C语言从ZeroMQ中接收字符串数据时,你不能简单的相信它安全的结束了。每次读取字符串,你应该分配一个新的buffer,包括一个额外的字节,拷贝字符串,并用null正确的结束。

    因此,我们确定了规则,ZeroMQ的字符串是指定长度的,在网络上发送时不带结尾的null。最简单的情况下(我们会在示例中这么做),一个ZeroMQ字符串完整的对应一个ZeroMQ消息帧(message frame),就像图3所示——一个长度加一些字节。

    在C语言中接收一个ZeroMQ字符串,并传递给应用程序一个有效的C语言字符串,我们需要这么做:

    //  Receive ZeroMQ string from socket and convert into C string
    //  Chops string at 255 chars, if it's longer
    static char *
    s_recv (void *socket) {
        char buffer [256];
        int size = zmq_recv (socket, buffer, 255, 0);
        if (size == -1)
            return NULL;
        if (size > 255)
            size = 255;
        buffer [size] = 0;
        return strdup (buffer);
    }
    

    这是一个便捷函数,本着复用的原则,我们写了一个类似的s_send函数,以正确的ZeroMQ格式发送字符串,并打包到头文件中。

    在zhelpers.h中,可以用C语言写一些简短实用的ZeroMQ程序。它的源码很长,只提供给C语言开发者。

    获得版本号

    ZeroMQ does come in several versions and quite often, if you hit a problem, it'll be something that's been fixed in a later version. So it's a useful trick to know exactly what version of ZeroMQ you're actually linking with.
    ZeroMQ有很多版本,并且经常更新。如果你遇到了问题,可能在新版本中已经修复了。因此,了解你正在使用的ZeroMQ版本是一个很有用的技巧。

    以下是获得ZeroMQ版本的代码:

    # Report 0MQ version
    #
    # Author: Lev Givon <lev(at)columbia(dot)edu>
    
    import zmq
    
    print("Current libzmq version is %s" % zmq.zmq_version())
    print("Current  pyzmq version is %s" % zmq.__version__)
    

    对外发送消息

    第二个经典模式是单向数据分发,其中服务端推送更新到一组客户端。让我们看一个推送天气更新的示例,其中包括邮编,温度和相对湿度。我们会随机生成这些值。

    以下是服务端,我们使用5556端口:

    #   Weather update server
    #   Binds PUB socket to tcp://*:5556
    #   Publishes random weather updates
    
    import zmq
    from random import randrange
    
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556")
    
    while True:
        zipcode = randrange(1, 100000)
        temperature = randrange(-80, 135)
        relhumidity = randrange(10, 60)
    
        socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))
    

    There's no start and no end to this stream of updates, it's like a never ending broadcast.
    这个更新流没有开始和结束,像一个永远不会终止的广播。

    以下是客户端程序,监听更新流,并抓取指定邮编的数据,默认值是纽约:

    #   Weather update client
    #   Connects SUB socket to tcp://localhost:5556
    #   Collects weather updates and finds avg temp in zipcode
    
    import sys
    import zmq
    
    #  Socket to talk to server
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    
    print("Collecting updates from weather server…")
    socket.connect("tcp://localhost:5556")
    
    # Subscribe to zipcode, default is NYC, 10001
    zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
    
    # Python 2 - ascii bytes to unicode str
    if isinstance(zip_filter, bytes):
        zip_filter = zip_filter.decode('ascii')
    socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
    
    # Process 5 updates
    total_temp = 0
    for update_nbr in range(5):
        string = socket.recv_string()
        zipcode, temperature, relhumidity = string.split()
        total_temp += int(temperature)
    
    print("Average temperature for zipcode '%s' was %dF" % (
          zip_filter, total_temp / (update_nbr+1))
    )
    

    图4:发布——订阅
    <div align=center>


    </div>

    注意,当你使用SUB socket时,你需要设置使用zmq_setsockopt()和SUBSCRIBE设置一个订阅。如果没有设置任何订阅,你不会收到任何消息。对于初学者,这是一个常见的错误。订阅者可以设置多个订阅。如果一个更新匹配任何一个订阅,订阅者就会收到更新。订阅者也可以取消指定的订阅。一个订阅通常,但不是必须的,是一个可打印的字符串。

    PUB-SUB socket对是异步的。客户端在循环中调用zmq_recv()(或者只调用一次)。发送消息给SUB socket会导致错误。类似的,服务端调用zmq_send(),但是不能在PUB socket上调用zmq_recv()。

    在ZeroMQ的理论中,它不关心哪个终端连接,哪个终端绑定。但是实际中存在不成文的区别,以后会讨论。现在,绑定PUB,连接SUB,除非你的网络设计不支持。

    关于PUB-SUB sockets,还有一件更重要的事情需要知道:你不知道订阅者什么时候开始获取消息。甚至你启动一个订阅者,等待一会儿,然后启动发布者,订阅者总会错过发布者发送的第一条消息。这是因为订阅者连接到发布者(需要花费一点时间),发布者可能已经发送了消息出去。

    很多开发者都遇到了这个“缓慢加入者”(slow joiner)的症状,之后我们会详细解释。记住,ZeroMQ是异步 I/O,比如在后台。你有两个节点做这件事,以如下顺序:

    • 订阅者连接到一个终端,接收并计数消息。
    • 发布者绑定到一个终端,并立即发送1000条消息。

    然后订阅者很可能收不到任何消息。你会眨眼,检查你设置了正确的过滤器,然后再次尝试,但是订阅者仍然收不到任何消息。

    一个TCP连接涉及到握手,根据网络和节点之间的跳转数量,需要花费几毫秒时间。这个时间中,ZeroMQ可以发送很多消息。为了讨论,假设它需要5毫秒建立连接,同一个链接每秒可以处理1M消息。在这5毫秒中,订阅者正在连接到发布者,它让发布者只有1毫秒时间发送1K消息。

    在第2章,我们会解释如何同步发布者和订阅者,这样你不会开始发布数据,知道订阅者真正完成连接。有一个简单愚蠢的方式延迟发布者,也就是sleep。在实际应用程序中不要这么做。因为它极其脆弱,不雅和缓慢。使用sleep验证发生了什么,然后等到第2章看如何正确的解决。

    同步的另一种选择是简单的假设发布的数据流是无限的,没有开始,没有结束。还假设订阅者不关心它启动之前丢失的数据。我们的天气客户端示例基于这种假设。

    客户端订阅它选择的邮编,收集该邮编的100次更新。如果邮编是随机分布的,这意味着服务端大约发布一千万次更新。你可以启动客户端,然后启动服务端,客户端会继续工作。你可以停止和重启服务端,客户端会继续工作。当客户端收到100次更新,它会计算并打印平均值,然后退出。

    发布——订阅(pub-sub)模式的一些要点:

    • 每次使用一个连接调用,一个订阅者可以连接到多个发布者。然后数据会交叉到达(“fair-queued”),因此每个发布者都不会被吞没。
    • 如果发布者没有连接的订阅者,它会简单的丢弃所有消息。
    • 如果你使用TCP,有一个订阅者很慢,消息会在发布者排队等候。之后我们会看如何使用高水位线(high-water mark),让发布者避免这种情况。
    • 从ZeroMQ v3.x开始,当使用已连接的协议(tcp://或ipc://),过滤在发布者端完成。使用epgm://协议,过滤在订阅者端完成。在ZeroMQ v2.x中,所有过滤都在发布者端完成。

    这是在我笔记本电脑上(2011年 Inter i5处理器)接收和过滤10M消息花费的时间,不错,但没什么特别的:

    $ time wuclient
    Collecting updates from weather server...
    Average temperature for zipcode '10001 ' was 28F
    
    real    0m4.470s
    user    0m0.000s
    sys     0m0.008s
    

    分而治之

    图5:并行管道
    <div align=center>


    </div>

    作为最后一个示例,让我们做一点超级运算,然后喝一杯咖啡。我们的超级计算程序是一个相当典型的并行处理模型。我们有:

    • 一个ventilator产生可以并行处理的任务
    • 一组worker处理任务
    • 一个sink从worker进程中收集结果

    实际中,workers运行在超快的盒子中,可能使用GPUs完成大量的运算。以下是ventilator。它产生100个任务,每个消息告诉worker休眠几毫米:

    # Task ventilator
    # Binds PUSH socket to tcp://localhost:5557
    # Sends batch of tasks to workers via that socket
    #
    # Author: Lev Givon <lev(at)columbia(dot)edu>
    
    import zmq
    import random
    import time
    
    try:
        raw_input
    except NameError:
        # Python 3
        raw_input = input
    
    context = zmq.Context()
    
    # Socket to send messages on
    sender = context.socket(zmq.PUSH)
    sender.bind("tcp://*:5557")
    
    # Socket with direct access to the sink: used to syncronize start of batch
    sink = context.socket(zmq.PUSH)
    sink.connect("tcp://localhost:5558")
    
    print("Press Enter when the workers are ready: ")
    _ = raw_input()
    print("Sending tasks to workers…")
    
    # The first message is "0" and signals start of batch
    sink.send(b'0')
    
    # Initialize random number generator
    random.seed()
    
    # Send 100 tasks
    total_msec = 0
    for task_nbr in range(100):
    
        # Random workload from 1 to 100 msecs
        workload = random.randint(1, 100)
        total_msec += workload
    
        sender.send_string(u'%i' % workload)
    
    print("Total expected cost: %s msec" % total_msec)
    
    # Give 0MQ time to deliver
    time.sleep(1)
    

    以下是worker程序。它接收一个消息,休眠接收到的秒数,然后发出信号,它完成了:

    # Task worker
    # Connects PULL socket to tcp://localhost:5557
    # Collects workloads from ventilator via that socket
    # Connects PUSH socket to tcp://localhost:5558
    # Sends results to sink via that socket
    #
    # Author: Lev Givon <lev(at)columbia(dot)edu>
    
    import sys
    import time
    import zmq
    
    context = zmq.Context()
    
    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")
    
    # Socket to send messages to
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5558")
    
    # Process tasks forever
    while True:
        s = receiver.recv()
    
        # Simple progress indicator for the viewer
        sys.stdout.write('.')
        sys.stdout.flush()
    
        # Do the work
        time.sleep(int(s)*0.001)
    
        # Send results to sink
        sender.send(b'')
    

    以下是sink程序。它收集100个任务,然后计算整个处理时间,因此我们可以确定workers是并行运行:

    # Task sink
    # Binds PULL socket to tcp://localhost:5558
    # Collects results from workers via that socket
    #
    # Author: Lev Givon <lev(at)columbia(dot)edu>
    
    import sys
    import time
    import zmq
    
    context = zmq.Context()
    
    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.bind("tcp://*:5558")
    
    # Wait for start of batch
    s = receiver.recv()
    
    # Start our clock now
    tstart = time.time()
    
    # Process 100 confirmations
    for task_nbr in range(100):
        s = receiver.recv()
        if task_nbr % 10 == 0:
            sys.stdout.write(':')
        else:
            sys.stdout.write('.')
        sys.stdout.flush()
    
    # Calculate and report duration of batch
    tend = time.time()
    print("Total elapsed time: %d msec" % ((tend-tstart)*1000))
    

    一次的平均耗时是5秒。当启动1,2或4个works时,从sink获得的结果如下:

    • 1 worker: total elapsed time: 5034 msecs.
    • 2 workers: total elapsed time: 2421 msecs.
    • 4 workers: total elapsed time: 1018 msecs.

    Let's look at some aspects of this code in more detail:
    让我们看下这些代码的细节:

    • worker向上连接到ventilator,向下连接到sink。这意味着你可以随意增加workers。如果workers绑定到它们的终端,你需要(a)更多终端,(b)每次增加一个worker,修改ventilator和/或sink。我们说ventilator和sink是架构的稳定部分,workers是架构的动态部分。
    • 我们需要同步启动所有workers。这是ZeroMQ中相对常见的问题,没有简单的解决方案。zmq_connect方法需要花费一定时间。因此,当一组workers连接到ventilator,第一个worker成功的连接会在短时间内获得所有消息,而其它的也在连接。如果你不同步启动workers,系统根本不会并行运行。移除ventilator中的wait试试,看会发生什么。
    • ventilator的PUSH socket平均的发布任务到workers(假设批处理开始之前它们都已经连接上了)。这叫做负载均衡,之后我们会详细讨论。
    • sink的PULL socket平均的从workers收集结果。这叫做fair-queuing。

    图6:Fair Queuing
    <div align=center>


    </div>

    管道模式也有“缓慢加入者”(slow joiner)症状,导致PUSH sockets不能正确的负载均衡。如果你正在使用PUSH和PULL,如果其中一个worker比其它的接收到更多的消息,这是因为该PULL socket比其它的更快加入,在其它workers连上之前获得更多消息。如果你想完全的负载均衡,你可能想要阅读第3章的负载均衡模式。

    使用ZeroMQ编程

    看完一些示例后,你肯定急于使用ZeroMQ。在你开始之前,深呼吸,放轻松,并仔细考虑一些基础的建议,这会让减轻你的压力和困惑。

    • 按部就班的学习ZeroMQ。它只是一个简单的API,但其中隐藏各种可能性。慢慢掌握每一个可能性。
    • 书写整洁的代码。丑陋的代码隐藏了问题,让其他人很难帮助你。你可能习惯了没有意义的变量名,但阅读你代码的人没有。使用有意义的名称,而不是“我太粗心了,没有告诉你这个变量的实际意思”。使用一致的缩进和整洁的布局。书写整洁的代码,你的世界会更美好。
    • 测试结果就是你需要的。当你的程序不工作时,你应该知道哪五行代码出错了。使用ZeroMQ魔法时尤其如此,头几次尝试时总是不工作。
    • 当你发现程序不按预期工作时,把你的程序分为小片段,测试每一个,看看是否工作。ZeroMQ让你书写模块化代码;让它们为你所用。
    • 需要时创建抽象(类,方法)。如果你复制/粘贴大量代码,同时也复制/粘贴了错误。

    正确获得Context

    ZeroMQ程序总是从创建一个context开始,然后使用它创建sockets。在C语言中,调用zmq_ctx_new()。你应该在线程中正确创建和使用一个context。从技术上讲,context是单个线程中所有sockets的容器,并且是inproc sockets的传送带。如果运行时,一个进程中有两个context,它们就像单独的ZeroMQ实例。如果这确实是你想要的,可以,否则请记住:

    在一个进程启动时调用zmq_ctx_new(),并在结束时调用一次zmq_ctx_destroy()。

    如果你正在使用fork()系统调用,在fork之后,其它子进程代码之前调用zmq_ctx_new()。通常,你想要在子进程中做有趣的事情,在父进程中做无聊的进程管理。

    干净的退出

    优秀的程序员和优秀的杀手有同样的座右铭:完成工作后总是清理干净。当你使用类似Python语言时,资源会自动释放。使用C语言时,当你使用完对象后,你需要小心的释放它们,否则你会获得内存泄漏,不稳定的程序,通常会有不好的报应。

    内存泄漏是一件事,但是ZeroMQ对于如何退出程序十分讲究。原因是技术上的,并且很痛苦,结果是,如果你让任何sockets打开,zmq_ctx_destroy()函数会永远挂起。默认情况下,即使你关闭所有sockets,如果有挂起的连接或者发送,zmq_ctx_destroy()会永远等待,除非在关闭sockets之前设置LINGER为零。

    我们需要关心ZeroMQ的messages,sockets和contexts对象。幸运的是,起码在简单的程序中,它很简单:

    • 尽可能使用zmq_send()和zmq_recv(),因为它们避免了使用zmq_msg_t对象。
    • 如果你确定使用zmq_msg_recv(),当你不再使用时,立即调用zmq_msg_close()释放接收到的消息。
    • 如果你正在打开和关闭很多sockets,很可能你需要重新设计你的程序。某些情况下,socket句柄直到销毁context才会释放。
    • 当你退出程序,关闭你的sockets,然后调用zmq_ctx_destroy()销毁context。

    最起码用C语言开发时需要小心。对于自动释放对象的语言,sockets和contexts会在离开对象域时销毁。如果你没有使用这类语言,你需要在类似“final”块中完成清理工作。

    如果你正在使用多线程,情况会更复杂。我们会在下一章介绍多线程,但是有些人会在完全理解之前试图使用多线程,以下是退出ZeroMQ多线程程序的快速和不讨好的指南。

    首先,不要再多个线程中使用同一个socket。请不要解释你认为这会很有趣,请不要这么做。接着,你需要关闭每个继续请求的socket。正确的方式是设置一个低的LINGER值(1秒),并且关闭socket。当你销毁一个context时,如果你使用的语言绑定没有自动这么做,请发送一个patch。

    最后,销毁context。这会导致附属线程中(比如,共享同一个context)所有阻塞的接收,轮询或者发送返回一个错误。捕获该错误,然后设置一个持续时间,并在该线程中关闭socket,最后退出。不要销毁同一个context两次。主线程中的zmq_ctx_destroy会阻塞,直到所有sockets安全关闭。

    瞧!这足够复杂和痛苦。所有语言绑定作者都会自动完成这项工作,不用再做额外的工作。

    为什么需要ZeroMQ

    你已经使用过ZeroMQ,让我们回头看看为什么使用它。

    现在,很多程序由横跨多种网络(局域网或者互联网)的组件构成。很多开发者倒在消息通讯上。有些开发者使用消息队列产品,但是大部分情况他们自己使用TCP或UDP。这些协议不难使用,但是从A发送一些字节到B,以及通过各种可靠方式传递消息之间,有很大的差异。

    让我们看看使用原始TCP时,会遇到的典型问题。所有可复用的消息层都需要解决所有或大部分这些问题:

    • 我们如何处理I/O?我们的程序会阻塞,还是我们在后台处理I/O?这是一个关键的设计决策。阻塞I/O创建的架构不能很好的扩展。但是后台I/O很难正确使用。
    • 我们如何处理动态组件,比如临时离开的组件?我们把组件在形式上分为“客户端”和“服务端”,并且要求服务端不能消失?如果我想要从服务端连接到服务端呢?我们每隔几秒尝试重新连接吗?
    • 我们如何在网络上表示一个消息?我们如何设计数据,让它可以很容易的读写,安全的从缓存区溢出,对短消息高效,还能胜任很大的视频?
    • 我们如何处理不能立即投递的消息?更具体地说,如果我们正在等待一个组件重新在线,我们会丢弃消息,把它们放到数据库,还是放到内存队列?
    • 我们在哪里存储消息队列?如果组件读取消息的队列很慢会发生什么?如何增强我们的队列?然后我们的策略是什么?
    • 我们如何处理丢失的消息?我们等待新的数据,请求一个重新发送,还是创建一些可靠的层,确保消息不会丢失?如果该层自身崩溃了呢?
    • 如果我们需要使用一个不同的网络传输协议呢?比如多路广播代替TCP单一广播,或者IPv6?我们需要重写应用程序,还是某个层中的抽象传输协议?
    • 我们如何路由消息?能发送同样的消息到多个端吗?能发送回复到原始的请求者吗?
    • 如何为另一种语言编写API?是重新实现网络层协议,还是重新打包一个库?如果是前者,如何保证效率和稳定?如果是后者,如何保证互操作性?
    • 我们如何表现数据,可以在不同架构中读取?我们为数据类型强制指定特定的编码?这是消息系统还是更高层的工作呢?
    • 我们如何处理网络错误?等待然后重试,忽略它们,还是终止?

    打开一个典型的开源项目,比如Hadoop Zookeeper,阅读src/c/src/zookeeper.c中的C API。当我2013年1月阅读代码时,里面有4200行没有注释的,客户端/服务端网络通信协议代码。它很高效,因为使用poll代替了select。但是,Zookeeper应该使用通用消息层和注释清楚的网络层协议。重复造轮子很浪费团队的时间。

    但是如何开发一个可复用的消息层?这么多项目需要这项技术,为什么人们仍然使用TCP sockets这种困难的方式,并且一次次解决列表中这些问题?

    事实证明,开发一个可复用的消息系统真的很难,这也是为什么几乎没有FOSS项目尝试,以及为什么商业消息产品很复杂,很贵,很顽固和很脆弱。在2006年,iMatix设计了AMQP,让FOSS开发者第一次用上了可复用的消息系统。AMQP比其它设计更好,但仍然很复杂,昂贵和脆弱。它需要几周时间学习如何使用,几个月时间创建稳定的架构。

    图7:最初的消息传输
    <div align=center>


    </div>

    绝大部分消息项目,比如AMQP,试图通过创造一个可复用的新概念——“broker”,完成寻址,路由和队列功能来解决列表中的问题。结果是,在一些未经文档化的协议顶部的客户端/服务端协议,或者一组APIs中,允许程序与broker交互。Brokers在降低网络通讯的复杂性上是卓越的。但是在产品(比如Zookeeper)中增加基于broker的消息通信会更糟糕。这意味着增加了一个额外的大盒子,和一个新的故障点。Broker迅速成为瓶颈和风险。如果软件支持它,我们可以增加第二个,第三个和第四个broker,以及故障转移计划。需要开发者完成这项工作。它创建了更多可移动组件,更复杂,更容易发生故障。

    一个broker-centric设置需要自己的运行团队。你需要不加夸张的日夜监控brokers,当它们不正常工作时,你需要解决问题。你需要盒子,需要备份盒子,还需要人员管理这些盒子。有很多可移动组件的,几个团队,几年时间开发的大型程序才值得这么做。

    图8:之后的消息传输
    <div align=center>


    </div>

    所以中小型应用程序开发者被困住了。不管是避免网络编程,或者开发一个不能扩展得应用程序。还是一头扎进网络编程,开发脆弱的,复杂的,难以维护的应用程序。或者他们投注在一个消息产品上,依赖昂贵的,容易出问题的技术上。没有真正好的选择,这也是为什么消息传输还停留在上个世纪,并激起了强烈的情绪:用户的负面情绪,销售许可证和技术支持人员却幸灾乐祸。

    我们需要的是完成消息传输工作,但在任何程序中都可以简单,方便完成工作的方式。它应该是一个库,不需要其他依赖。没有额外的组件,也就没有额外的风险。它应该可以在所有操作系统和编程语言工作。

    这就是ZeroMQ:一个高效的,可嵌入的库,不需要多少成本,就漂亮的解决了网络的大部分问题。

    尤其是:

    • 它在后台线程异步处理I/O。这些与应用程序交互的线程使用无锁的数据结构,因此并发的ZeroMQ应用程序不需要锁,信号,或者其他等待状态。
    • 组件可以动态来去,ZeroMQ会自动重连。这意味着你可以以任意顺序启动组件。你可以创建面向服务的架构(SOAs),服务端可以随意加入和离开。
    • 需要时它会自动排队消息。它会聪明的完成这项工作,尽可能在排列消息之前推送到接受者。
    • 它有多种方式处理多度的队列(称为“高水位”)。当队列满了,ZeroMQ根据你的消息传输类型(称为模式)自动阻塞发送者,或者丢弃消息。
    • 它让你的应用程序可以通过任意传输协议交互:TCP,多路广播,进程内部,进程之间。你不需要修改代码就能使用不同的传输协议。
    • 它安全的处理缓慢/阻塞的读者,根据消息传输模式使用不同的策略。
    • 它让你可以使用各种模式路由消息,比如请求-响应和发布-订阅。这些模式决定了如何创建网络拓扑结构,也就是网络的结构。
    • 它允许你创建代理队列,通过一个调用捕获或转发消息。代理可以降低网络交互的复杂性。
    • 它在网络上使用一个简单的框架,投递整个消息。如果你发送一个10k的消息,你会收到一个10k的消息。
    • 它没有强加任何格式在消息上。它们是从零到千兆字节的二进制数据块。当你想要表示数据,你可以在顶部选择其它产品,比如msgpack,Google的协议缓存,或者其它的。
    • 在某些有意义的情况下,通过自动重试,它可以聪明的处理网络错误。
    • 它减少了你的碳排放量。用更少的CPU完成更多工作,意味着你的电脑消耗更少的电量,可以让你的旧电脑用更长时间。

    实际上,ZeroMQ比这做得更多。它在开发网络应用程序上有颠覆性的影响。表面上看,它是一个受socket启发的API,你在socket上调用zmq_recv()和zmq_send()。但是消息处理迅速成为中心点,你的应用程序很快分解为一组消息处理任务。它是优雅和自然的。它可扩展:这些每个任务对应一个节点,节点之间通过任意传输协议通讯。两个节点在一个进程(节点是线程),两个节点在同一个机器(节点是进程),或者两个在同一个网络(节点是机器),都是一样的,程序代码不用改变。

    Socket的可扩展性

    让我们看看实际中ZeroMQ的扩展性。这是一个启动了一个天气服务端和几个并行客户端的脚本:

    wuserver &
    wuclient 12345 &
    wuclient 23456 &
    wuclient 34567 &
    wuclient 45678 &
    wuclient 56789 &
    

    当客户端运行,我们使用top命令查看激活的进程,看起来像这样(在四核机器上):

    PID  USER  PR  NI  VIRT  RES  SHR S %CPU %MEM   TIME+  COMMAND
    7136  ph   20   0 1040m 959m 1156 R  157 12.0 16:25.47 wuserver
    7966  ph   20   0 98608 1804 1372 S   33  0.0  0:03.94 wuclient
    7963  ph   20   0 33116 1748 1372 S   14  0.0  0:00.76 wuclient
    7965  ph   20   0 33116 1784 1372 S    6  0.0  0:00.47 wuclient
    7964  ph   20   0 33116 1788 1372 S    5  0.0  0:00.25 wuclient
    7967  ph   20   0 33072 1740 1372 S    5  0.0  0:00.35 wuclient
    

    让我们想想这里发生了什么。天气服务端只有一个socket,这里我们让它并行给5个客户端发送数据。我们可以有几千个并行客户端。服务端应用程序看不见它们,不会直接与它们通讯。所以ZeroMQ socket像一个小型服务端,悄悄地接收客户端请求,只要网络可以处理,就会尽快推送数据到客户端。它是一个多线程服务端,充分利用你的CPU。

    从ZeroMQ v2.2升级到ZeroMQ v3.2

    警告:不稳定的示范

    典型的网络编程建立在一个普遍的假设基础上:一个socket与一个连接,一个终端通讯。有多路广播协议,但它们是独特的。当我们假设一个socket等于一个连接时,我们以特定方式扩展架构。我们创建逻辑线程,每个线程处理一个socket,一个终端。我们在这些线程中处理逻辑和状态。

    在ZeroMQ世界中,sockets是进入小而快的后台通讯引擎的入口,该引擎自动管理所有连接。你不能查看,操作,打开,关闭,或者附加状态到这些连接。不管你使用阻塞发送或接收,或者poll,你只能与socket交互,而不是socket为你管理的连接。连接是私有的,不可见的,这是ZeroMQ可扩展性的关键。

    因为你的代码与socket交互,所以可以处理任何网络协议的任意连接数量。ZeroMQ中的消息传输模式比你的应用程序代码中的消息传输模式更容易扩展。

    所以,普遍的建设不再成立。当你阅读代码示例时,你的代码试图映射你已经知道的知识。当你看到“socket”,会想“哦,这表示到另一个节点的连接”。这是错误的。当你看到“线程”,会再次想”哦,线程表示到另一个节点的连接“,你的大脑再次错了。

    如果你第一次阅读这篇指南,当你真正编写一段时间ZeroMQ代码后,才会认识到这一点。你可能会感到困惑,尤其是ZeroMQ让事情变得这么简单,你可能试图强加这些假设在ZeroMQ上,但这不对。

    相关文章

      网友评论

      • 英武:翻译的非常赞!其实jupyter就是基于zmq的

      本文标题:ZeroMQ指南:第一章——基础

      本文链接:https://www.haomeiwen.com/subject/uivupttx.html