Active MQ
Active MQ 支持两类消息处理方式,分别为
- Topic 方式:
- Queue 方式:
Topic
Topic 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到topic的消息会被所有订阅者消费。
当生产者发布消息,不管是否有消费者。都不会保存消息
若存离线的持久化订阅者,会为该持久订阅者保存消息,当该持久订阅者上线后,会收到消息。
Queue
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。当消费者不存在时,消息会一直保存,直到有消费消费
ACK
ack的值只有三种:auto(默认)、client和client-individual
当客户端使用SUBSCRIBE命令进行订阅时,如果在SUBSCRIBE命令中制定ack属性为client或client-individual.那么这个客户端在收到某条消息(id为XXXX)后,必须向Stomp代理端发送ACK命令,这样代理端才会认为消息处理成功了;如果Stomp客户端在断开连接之前都没有发送ACK命令,那么Stomp代理端将在这个客户端断开连接后,将这条消息发送给其它客户端;
- client 累计确认。ack 确认之前所有消息
- client-individual独立确认。每条消息单独确认
>>> conn.subscribe('/queue/test', id=4, ack='client-individual')
on_before_message {'message-id': 'mybroker-14aa2', 'destination': '/queue/test', 'subscription': '4', 'content-length': '14'} test message 1
on_message {'message-id': 'mybroker-14aa2', 'destination': '/queue/test', 'subscription': '4', 'content-length': '14'} test message 1
>>> conn.ack('mybroker-14aa2', 4)
on_before_message {'message-id': 'mybroker-14ab2', 'destination': '/queue/test', 'subscription': '4', 'content-length': '14'} test message 2
on_message {'message-id': 'mybroker-14ab2', 'destination': '/queue/test', 'subscription': '4', 'content-length': '14'} test message 2
>>> conn.nack('mybroker-14ab2', 4)
不同的stomp 版本,ack参数有所差异
代码示例
# -*- coding: utf-8 -*-
import time
import sys
import stomp
class MyListener(stomp.ConnectionListener):
def __init__(self, conn, print_to_log=False,):
self.print_to_log = print_to_log
self._conn = conn
def __print(self, msg, *args):
print(msg % args)
def on_connecting(self, host_and_port):
"""
:param (str,int) host_and_port:
"""
self.__print('on_connecting %s %s', *host_and_port)
def on_connected(self, headers, body):
"""
:param dict headers:
:param body:
"""
self.__print('on_connected %s %s', headers, body)
def on_disconnected(self):
self.__print('on_disconnected')
def on_message(self, headers, body):
"""
Called by the STOMP connection when a MESSAGE frame is received.
:param dict headers:
:param body:
"""
ack_id = headers.setdefault('message-id', None)
subscription = headers.setdefault('subscription', None)
# deal with something
print("deal with %s" % body)
time.sleep(1)
# deal over
if ack_id and subscription:
# send ACK frame
self._conn.ack(ack_id, subscription)
def Producer():
conn = stomp.Connection12([('192.168.83.132', 61613)])
conn.connect('admin', 'password', wait=True)
for i in range(1000):
mes = str(i)
# '/topic/q1' 表示名为test 的topic
# '/queue/q1' 表示名为test 的queue
conn.send(body=mes, destination='/queue/test')
print("send %s" % mes)
time.sleep(1)
conn.disconnect()
def Consumer():
conn = stomp.Connection([('192.168.83.132', 61613)])
conn.set_listener('1', MyListener(conn=conn))
conn.connect('admin', 'password', wait=True)
try:
conn.subscribe(destination='/queue/test', id="c1", ack='client-individual')
while True:
pass
except KeyboardInterrupt:
conn.disconnect()
if __name__ == '__main__':
if len(sys.argv) != 2:
print("input argument."
" 'c' is Consumer"
" 'p' is Producer")
else:
argument_dict = {'c': Consumer, 'p': Producer}
fun = argument_dict[sys.argv[1]]
fun()
网友评论