美文网首页
ZeroMQ通讯订阅模式--iOS

ZeroMQ通讯订阅模式--iOS

作者: 爬向天花板的蚂蚁 | 来源:发表于2017-04-23 20:51 被阅读0次

    ZeroMQ在使用模式上支持多种,有req-reply,publish-subscribe。订阅模式是zmq的重头戏,以鄙人的使用和理解来浅谈下订阅模式,本人也是边学边用,如有问题请大神指出。关于应答模式请阅读 这篇文章

    zmq的订阅模式实现示例代码如下

    ZMQContext *ctx = [[ZMQContext alloc] initWithIOThreads:1U];
    // Socket to talk to server
    ZMQSocket *subscriber = [ctx socketWithType:ZMQ_SUB];
    if (![subscriber connectToEndpoint:@"tcp://localhost:5556"]) {
        return EXIT_FAILURE;
    }
    
    const char *kNYCZipCode = "10001";
    const char *filter = (argc > 1)? argv[1] : kNYCZipCode;
    NSData *filterData = [NSData dataWithBytes:filter length:strlen(filter)];
    [subscriber setData:filterData forOption:ZMQ_SUBSCRIBE];
    
    (void)setvbuf(stdout, NULL, _IONBF, BUFSIZ);
    
    const int kMaxUpdate = 100;
    long total_temp = 0;
    for (int update_nbr = 0; update_nbr < kMaxUpdate; ++update_nbr) {
        NSData *msg = [subscriber receiveDataWithFlags:0];
        const char *string = [msg bytes];
    
        int zipcode = 0, temperature = 0, relhumidity = 0;
        (void)sscanf(string, "%d %d %d", &zipcode, &temperature, &relhumidity);
    
        printf("%d ", temperature);
        total_temp += temperature;
    }
    

    订阅模式的重点在于数据分发,数据由服务器主动推回来给客户端,订阅的数据统一接收,然后在分发到需要数据处理的对象中。示例代码只是简单的接收数据,它的做法是一个线程接收一个订阅消息,而且数据接收是阻塞当前线程的,这个在实际的项目应用也是不符合,所以封装和优化数据分发是必须的。

    NSData *msg = [subscriber receiveDataWithFlags:0]; // 阻塞线程直到接收到数据
    

    数据的处理分发采用代理来回调,当然也可以使用block,个人建议使用代理,毕竟数据分发到不同的处理对象中,使用代理可以很好的避免循环引用,避免内存泄漏。我这里是使用一个两层Dictionary来存储代理,dictionary的遍历速度要比数组好一点。第一层dictionary以订阅码来做key,第二层dictionary以代理对象的hash值为key,这样可以快速找到代理对象进行分发,也可以准确找到某个代理对象并实现移除操作。

    添加代理对象的方法

    - (void)setCode:(NSString *)code withDelegate:(id<ZMQSubscriptionDelegate>)delegate {
        NSAssert(delegate != nil, @"代理为空");
        NSAssert(code != nil, @"订阅码为空");
    
        // 取出当前订阅码的代理字典
        NSMutableDictionary *delegateDict = self.delegates[code];
    
        if (delegateDict == nil) { // 不存在当前的订阅码代理对象
            // 创建当前订阅码的代理对象的字典
            delegateDict = [NSMutableDictionary dictionary];
            self.delegates[code] = delegateDict;
        }
    
        // 添加代理对象
        NSString *key = [NSString stringWithFormat:@"%ld", (unsigned long)[delegate hash]];
        delegateDict[key] = delegate;
    }
    

    移除代理对象的方法

    - (void)removeDelegate:(NSString *)code withDelegate:(id<ZMQSubscriptionDelegate>)delegate {
        NSAssert(delegate != nil, @"代理为空");
        NSAssert(code != nil, @"订阅码为空");
    
        // 取出当前订阅码的代理字典
        NSMutableDictionary *delegateDict = self.delegates[code];
    
        if (delegateDict == nil) return; // 不存在当前的订阅码代理对象
    
        NSString *key = [NSString stringWithFormat:@"%ld", (unsigned long)[delegate hash]];
        [delegateDict removeObjectForKey:key];
    }
    

    注意,当对象不需要处理订阅消息时,一定要调用removeDelegate方法,因为代理对象是存储到字典中的,持有代理对象的强引用。

    zmq要订阅什么消息就发对应的订阅码,如果要全部订阅,就发一个空字符串,在子线程使用一个while循环来接收数据,定义一个标志位,让while循环可以控制,更改后的代码如下

    _context = [[ZMQContext alloc] initWithIOThreads:1];
    _socket = [_context socketWithType:ZMQ_SUB];
    _socket.loadingtime = 3000; // 超时时间,单位毫秒
    
    NSString *endpoint = @"tcp://:41204"; // 服务器IP地址
    if (![_socket connectToEndpoint:endpoint]) {
        NSLog(@"订阅失败");
        return;
    }
    
    NSData *filterData = [@"" dataUsingEncoding:NSUTF8StringEncoding];
    [_socket setData:filterData forOption:ZMQ_SUBSCRIBE];
    
    (void)setvbuf(stdout, NULL, _IONBF, BUFSIZ);
    
    closeSocket = NO;
    while (!closeSocket) {
        
        @autoreleasepool {
            NSData *recieveData = [_socket receiveDataWithFlags:0];
            if (recieveData == nil)  continue; // 订阅消息超时返回nil
            
            // 数据处理
            NSString *dataStr = [[NSString alloc] initWithData:recieveData encoding:NSUTF8StringEncoding];
            NSRange range = [dataStr rangeOfString:@"{" options:NSCaseInsensitiveSearch];
            NSString *subStr = [dataStr substringFromIndex:range.location];
            NSString *codeKey = [dataStr substringToIndex:range.location];
            
            /*** 数据分发 ***/
            // 取出当前订阅码的代理字典
            NSMutableDictionary *delegateDict = self.delegates[codeKey];
            
            if (delegateDict == nil) continue; // 没有对当前订阅码的代理对象
            
            [delegateDict enumerateKeysAndObjectsUsingBlock:^(id  _Nonnull key, id  _Nonnull obj, BOOL * _Nonnull stop) {
                
                [self handleData:subStr delegate:obj code:codeKey];
            }];
        }    
    }
    

    zmq订阅返回的数据格式是订阅码拼接上推送的数据,当接收的数据为nil,那就是接收超时了,zmq订阅只要接收到数据,肯定不为空,必然有订阅码。这里的数据处理方式是因为我项目服务器返回的是订阅码拼接json的数据,要根据服务器定义的数据格式来处理数据。注意,这里是有一个坑,要在循环内使用@ autoreleasepool {}。一个对象只要出了作用的使用区域,就会自动释放了,可是当使用区域是一个while循环时,系统会认为你的对象还要使用就不会释放对象。所以,在循环里加上autoreleasepool标记,不管数据传递到哪个对象中使用,只有没有对象牵引着数据对象,就会释放了。

    我在使用zmq订阅模式的时候发现两个问题。zmq订阅模式有自动重新连接的机制,比如断网重连,服务器断开重连,但是超过一分钟好像就重连不了,我测试了很多次,可是安卓组的同事使用zmq订阅不会遇到重连不了的问题。因为重连不了,我要关闭订阅socket,重新开启订阅socket,发现一关闭socket就马上开启socket,zmq底层库就报错,然后程序直接崩溃了,所以我的启动订阅的方法要判断是否socket的关闭。

    - (void)start {
        if (closeSocket) {
            [self performSelector:@selector(startConnect) withObject:nil afterDelay:1.0];
            return;
        }
    
        [self startConnect];
    }
    

    zmq的订阅模式使用起来还是很稳定的,相对来说zmq的应答模式会出现请求丢弃的问题。因为本人的项目使用zmq通讯库,后期还会继续研究。

    示例代码都放在 github

    参考

    zeroMQ使用指导 http://zguide.zeromq.org/page:all

    zeroMQ的示例程序 https://github.com/imatix/zguide.git

    相关文章

      网友评论

          本文标题:ZeroMQ通讯订阅模式--iOS

          本文链接:https://www.haomeiwen.com/subject/dloezttx.html