1. 关于消息
ZMQ的传输单位是消息,即一个二进制块。你可以使用任意的序列化工具,如谷歌的Protocal Buffers、XDR、JSON等,将内容转化成ZMQ消息。不过这种转化工具最好是便捷和快速的,请注意衡量。
消息使用的几点注意
- 一旦一个消息对象被发送之后(长度会清0),就不能被重复发送;如果想发送同一个消息对象多次,可以在发送之前新建一个消息对象,通过函数拷贝即可
- ZMQ支持多帧消息(multipart message)发送
- 可以发送0长度的消息,作为一种信号
- 消息的收发都是整体的,不会只收到消息的一部分
- ZMQ不会立刻发送消息,会存在一定的延迟
- 消息必须能够在内存中保存,如果你想发送文件或超长的消息,就需要将他们切割成小块,在独立的消息中进行发送
- 消息发送完之后要记得close,但在一些会在变量超出作用域时自动释放消息对象的语言中除外
2. 多套接字处理
一般地,主程序的循环体内都会包括如下三个部分:
- 等待套接字消息
- 处理消息
- repeat
而针对多套接字处理,常见的有两种方式:nonblock+并行,zmq轮询。
下面以同时处理任务分发和天气消息订阅套接字为例展示以上两种方式的具体处理过程。
2.1 非阻塞并行处理
# -*- coding: utf-8 -*-
import zmq,time
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect('tcp://localhost:5557')
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5556')
subscriber.setsockopt(zmq.SUBSCRIBE, b'10001')
while True:
while True: # 任务分发优先级提前
try:
msg = receiver.recv(zmq.DONTWAIT)
except zmq.Again:
break
#处理任务
try:
msg = subscriber.recv(zmq.DONTWAIT)
except zmq.Again
break
#天气更新
time.sleep(0.001)
上面方法的缺点是没有收到任何消息之前会有1毫秒的延迟,这在高压力的程序中还是会有问题的。并且程序中有意把任务分发的优先级提升,可以用轮询的方式改进一下,类似于ZMQ中的公平队列机制。
2.2 ZMQ轮询
#-*- coding: utf-8 -*-
import zmq
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect('tcp://localhost:5557')
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5556')
subscriber.setsockopt(zmq.SUBSCRIBE, b'10001')
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
break
if receiver in socks:
msg = receiver.recv()
# 处理任务
if subscriber in socks:
msg = subscriber.recv()
# 更新天气信息
网友评论