ZeroMQ在使用模式上支持多种,有req-reply,publish-subscribe。订阅模式是zmq的重头戏,以鄙人的使用和理解来浅谈下订阅模式,本人也是边学边用,如有问题请大神指出。关于应答模式请阅读 这篇文章 。
zmq的订阅模式实现示例代码如下
ZMQContext *ctx = [[ZMQContext alloc] initWithIOThreads:1U];
// Socket to talk to server
ZMQSocket *subscriber = [ctx socketWithType:ZMQ_SUB];
if (![subscriber connectToEndpoint:@"tcp://localhost:5556"]) {
return EXIT_FAILURE;
}
const char *kNYCZipCode = "10001";
const char *filter = (argc > 1)? argv[1] : kNYCZipCode;
NSData *filterData = [NSData dataWithBytes:filter length:strlen(filter)];
[subscriber setData:filterData forOption:ZMQ_SUBSCRIBE];
(void)setvbuf(stdout, NULL, _IONBF, BUFSIZ);
const int kMaxUpdate = 100;
long total_temp = 0;
for (int update_nbr = 0; update_nbr < kMaxUpdate; ++update_nbr) {
NSData *msg = [subscriber receiveDataWithFlags:0];
const char *string = [msg bytes];
int zipcode = 0, temperature = 0, relhumidity = 0;
(void)sscanf(string, "%d %d %d", &zipcode, &temperature, &relhumidity);
printf("%d ", temperature);
total_temp += temperature;
}
订阅模式的重点在于数据分发,数据由服务器主动推回来给客户端,订阅的数据统一接收,然后在分发到需要数据处理的对象中。示例代码只是简单的接收数据,它的做法是一个线程接收一个订阅消息,而且数据接收是阻塞当前线程的,这个在实际的项目应用也是不符合,所以封装和优化数据分发是必须的。
NSData *msg = [subscriber receiveDataWithFlags:0]; // 阻塞线程直到接收到数据
数据的处理分发采用代理来回调,当然也可以使用block,个人建议使用代理,毕竟数据分发到不同的处理对象中,使用代理可以很好的避免循环引用,避免内存泄漏。我这里是使用一个两层Dictionary来存储代理,dictionary的遍历速度要比数组好一点。第一层dictionary以订阅码来做key,第二层dictionary以代理对象的hash值为key,这样可以快速找到代理对象进行分发,也可以准确找到某个代理对象并实现移除操作。
添加代理对象的方法
- (void)setCode:(NSString *)code withDelegate:(id<ZMQSubscriptionDelegate>)delegate {
NSAssert(delegate != nil, @"代理为空");
NSAssert(code != nil, @"订阅码为空");
// 取出当前订阅码的代理字典
NSMutableDictionary *delegateDict = self.delegates[code];
if (delegateDict == nil) { // 不存在当前的订阅码代理对象
// 创建当前订阅码的代理对象的字典
delegateDict = [NSMutableDictionary dictionary];
self.delegates[code] = delegateDict;
}
// 添加代理对象
NSString *key = [NSString stringWithFormat:@"%ld", (unsigned long)[delegate hash]];
delegateDict[key] = delegate;
}
移除代理对象的方法
- (void)removeDelegate:(NSString *)code withDelegate:(id<ZMQSubscriptionDelegate>)delegate {
NSAssert(delegate != nil, @"代理为空");
NSAssert(code != nil, @"订阅码为空");
// 取出当前订阅码的代理字典
NSMutableDictionary *delegateDict = self.delegates[code];
if (delegateDict == nil) return; // 不存在当前的订阅码代理对象
NSString *key = [NSString stringWithFormat:@"%ld", (unsigned long)[delegate hash]];
[delegateDict removeObjectForKey:key];
}
注意,当对象不需要处理订阅消息时,一定要调用removeDelegate方法,因为代理对象是存储到字典中的,持有代理对象的强引用。
zmq要订阅什么消息就发对应的订阅码,如果要全部订阅,就发一个空字符串,在子线程使用一个while循环来接收数据,定义一个标志位,让while循环可以控制,更改后的代码如下
_context = [[ZMQContext alloc] initWithIOThreads:1];
_socket = [_context socketWithType:ZMQ_SUB];
_socket.loadingtime = 3000; // 超时时间,单位毫秒
NSString *endpoint = @"tcp://:41204"; // 服务器IP地址
if (![_socket connectToEndpoint:endpoint]) {
NSLog(@"订阅失败");
return;
}
NSData *filterData = [@"" dataUsingEncoding:NSUTF8StringEncoding];
[_socket setData:filterData forOption:ZMQ_SUBSCRIBE];
(void)setvbuf(stdout, NULL, _IONBF, BUFSIZ);
closeSocket = NO;
while (!closeSocket) {
@autoreleasepool {
NSData *recieveData = [_socket receiveDataWithFlags:0];
if (recieveData == nil) continue; // 订阅消息超时返回nil
// 数据处理
NSString *dataStr = [[NSString alloc] initWithData:recieveData encoding:NSUTF8StringEncoding];
NSRange range = [dataStr rangeOfString:@"{" options:NSCaseInsensitiveSearch];
NSString *subStr = [dataStr substringFromIndex:range.location];
NSString *codeKey = [dataStr substringToIndex:range.location];
/*** 数据分发 ***/
// 取出当前订阅码的代理字典
NSMutableDictionary *delegateDict = self.delegates[codeKey];
if (delegateDict == nil) continue; // 没有对当前订阅码的代理对象
[delegateDict enumerateKeysAndObjectsUsingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) {
[self handleData:subStr delegate:obj code:codeKey];
}];
}
}
zmq订阅返回的数据格式是订阅码拼接上推送的数据,当接收的数据为nil,那就是接收超时了,zmq订阅只要接收到数据,肯定不为空,必然有订阅码。这里的数据处理方式是因为我项目服务器返回的是订阅码拼接json的数据,要根据服务器定义的数据格式来处理数据。注意,这里是有一个坑,要在循环内使用@ autoreleasepool {}。一个对象只要出了作用的使用区域,就会自动释放了,可是当使用区域是一个while循环时,系统会认为你的对象还要使用就不会释放对象。所以,在循环里加上autoreleasepool标记,不管数据传递到哪个对象中使用,只有没有对象牵引着数据对象,就会释放了。
我在使用zmq订阅模式的时候发现两个问题。zmq订阅模式有自动重新连接的机制,比如断网重连,服务器断开重连,但是超过一分钟好像就重连不了,我测试了很多次,可是安卓组的同事使用zmq订阅不会遇到重连不了的问题。因为重连不了,我要关闭订阅socket,重新开启订阅socket,发现一关闭socket就马上开启socket,zmq底层库就报错,然后程序直接崩溃了,所以我的启动订阅的方法要判断是否socket的关闭。
- (void)start {
if (closeSocket) {
[self performSelector:@selector(startConnect) withObject:nil afterDelay:1.0];
return;
}
[self startConnect];
}
zmq的订阅模式使用起来还是很稳定的,相对来说zmq的应答模式会出现请求丢弃的问题。因为本人的项目使用zmq通讯库,后期还会继续研究。
示例代码都放在 github 。
参考
zeroMQ使用指导 http://zguide.zeromq.org/page:all
zeroMQ的示例程序 https://github.com/imatix/zguide.git
网友评论