美文网首页
ZeroMQ快速入门

ZeroMQ快速入门

作者: 啊呀哟嘿 | 来源:发表于2020-02-05 16:29 被阅读0次

    本文主要参考guotianqing的CSDN博客:ZeroMQ基础入门官方文档

    ZeroMQ是一个轻量级消息通信库,扩展传统的标准socket接口。提供了异步消息队列的抽象,能够实现消息过滤,能够无缝对接多种传输协议。其中MQ是消息队列(Message Queue)的缩写。其官方介绍如下:

    ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fan-out, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems.

    ZMQ支持常见的三种基本通信模式,分别是Pub/Sub、Req/Rep、Push/Pull。比较重要的类型有:zmq::context_tzmq::socket_tzmq::message_t。比较重要的方法则包括:socket_t::send(message_t)socket_t::recv(&message_t)socket_t::setsockopt()等。

    Req/Rep

    Request-Reply

    服务端(响应端ZMQ_REP):

    //
    //  Hello World server in C++
    //  Binds REP socket to tcp://*:5555
    //  Expects "Hello" from client, replies with "World"
    //
    #include <zmq.hpp>
    #include <string>
    #include <iostream>
    #ifndef _WIN32
    #include <unistd.h>
    #else
    #include <windows.h>
    
    #define sleep(n)    Sleep(n)
    #endif
    
    int main () {
        //  Prepare our context and socket
        zmq::context_t context (1);
        zmq::socket_t socket (context, ZMQ_REP);
        socket.bind ("tcp://*:5555");
    
        while (true) {
            zmq::message_t request;
    
            //  Wait for next request from client
            socket.recv (&request);
            std::cout << "Received Hello" << std::endl;
    
            //  Do some 'work'
            sleep(1);
    
            //  Send reply back to client
            zmq::message_t reply (5);
            memcpy (reply.data (), "World", 5);
            socket.send (reply);
        }
        return 0;
    }
    

    客户端(请求端ZMQ_REQ):

    //
    //  Hello World client in C++
    //  Connects REQ socket to tcp://localhost:5555
    //  Sends "Hello" to server, expects "World" back
    //
    #include <zmq.hpp>
    #include <string>
    #include <iostream>
    
    int main ()
    {
        //  Prepare our context and socket
        zmq::context_t context (1);
        zmq::socket_t socket (context, ZMQ_REQ);
    
        std::cout << "Connecting to hello world server…" << std::endl;
        socket.connect ("tcp://localhost:5555");
    
        //  Do 10 requests, waiting each time for a response
        for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
            zmq::message_t request (5);
            memcpy (request.data (), "Hello", 5);
            std::cout << "Sending Hello " << request_nbr << "…" << std::endl;
            socket.send (request);
    
            //  Get the reply.
            zmq::message_t reply;
            socket.recv (&reply);
            std::cout << "Received World " << request_nbr << std::endl;
        }
        return 0;
    }
    

    对于字符串,需要注意的一点是,ZMQ不会关心发送消息的内容,只要知道它所包含的字节数。这意味着,ZMQ的字符串是有长度的,且传送时不加结束符。当使用c语言接收时,应注意申请比长度多一个字节的存储空间,并置位结束符’/0’,否则在打印字符串时可能得到奇怪的结果。

    Pub/Sub

    Publish-Subscribe
    服务端(发布端ZMQ_PUB):
    //  Weather update server in C++
    //  Binds PUB socket to tcp://*:5556
    //  Publishes random weather updates
    //
    //  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
    //
    #include <zmq.hpp>
    #include <stdio.h>
    #include <stdlib.h>
    #include <time.h>
    
    #if (defined (WIN32))
    #include <zhelpers.hpp>
    #endif
    
    #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
    
    int main () {
    
        //  Prepare our context and publisher
        zmq::context_t context (1);
        zmq::socket_t publisher (context, ZMQ_PUB);
        publisher.bind("tcp://*:5556");
        publisher.bind("ipc://weather.ipc");                // Not usable on Windows.
    
        //  Initialize random number generator
        srandom ((unsigned) time (NULL));
        while (1) {
    
            int zipcode, temperature, relhumidity;
    
            //  Get values that will fool the boss
            zipcode     = within (100000);
            temperature = within (215) - 80;
            relhumidity = within (50) + 10;
    
            //  Send message to all subscribers
            zmq::message_t message(20);
            snprintf ((char *) message.data(), 20 ,
                "%05d %d %d", zipcode, temperature, relhumidity);
            publisher.send(message);
    
        }
        return 0;
    }
    

    客户端(订阅端ZMQ_SUB):

    //  Weather update client in C++
    //  Connects SUB socket to tcp://localhost:5556
    //  Collects weather updates and finds avg temp in zipcode
    //
    //  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
    //
    #include <zmq.hpp>
    #include <iostream>
    #include <sstream>
    
    int main (int argc, char *argv[])
    {
        zmq::context_t context (1);
    
        //  Socket to talk to server
        std::cout << "Collecting updates from weather server…\n" << std::endl;
        zmq::socket_t subscriber (context, ZMQ_SUB);
        subscriber.connect("tcp://localhost:5556");
    
        //  Subscribe to zipcode, default is NYC, 10001
        const char *filter = (argc > 1)? argv [1]: "10001 ";
        subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
    
        //  Process 100 updates
        int update_nbr;
        long total_temp = 0;
        for (update_nbr = 0; update_nbr < 100; update_nbr++) {
    
            zmq::message_t update;
            int zipcode, temperature, relhumidity;
    
            subscriber.recv(&update);
    
            std::istringstream iss(static_cast<char*>(update.data()));
            iss >> zipcode >> temperature >> relhumidity ;
    
            total_temp += temperature;
        }
        std::cout     << "Average temperature for zipcode '"<< filter
                    <<"' was "<<(int) (total_temp / update_nbr) <<"F"
                    << std::endl;
        return 0;
    }
    

    需要注意的是,在使用SUB套接字时,必须使用setsockopt()方法来设置订阅的内容。该方法可以用于设置消息过滤器,但如果你不设置订阅内容,那将什么消息都收不到。订阅信息可以是任何字符串,可以设置多次。只要消息满足其中一条订阅信息,SUB套接字就会收到。订阅者可以选择不接收某类消息,也是通过setsockopt()方法实现的。

    Push/Pull

    push/pull

    其他参考

    https://github.com/booksbyus/zguide/tree/master/examples/C%2B%2B

    相关文章

      网友评论

          本文标题:ZeroMQ快速入门

          本文链接:https://www.haomeiwen.com/subject/ygusxhtx.html