美文网首页
ActiveMQ 使用小结

ActiveMQ 使用小结

作者: CC_06fa | 来源:发表于2020-02-19 15:47 被阅读0次

Active MQ

Active MQ 支持两类消息处理方式,分别为

  • Topic 方式:
  • Queue 方式:

Topic

Topic 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到topic的消息会被所有订阅者消费。
当生产者发布消息,不管是否有消费者。都不会保存消息

若存离线的持久化订阅者,会为该持久订阅者保存消息,当该持久订阅者上线后,会收到消息。

Queue

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。当消费者不存在时,消息会一直保存,直到有消费消费

ACK

ack的值只有三种:auto(默认)、client和client-individual

当客户端使用SUBSCRIBE命令进行订阅时,如果在SUBSCRIBE命令中制定ack属性为client或client-individual.那么这个客户端在收到某条消息(id为XXXX)后,必须向Stomp代理端发送ACK命令,这样代理端才会认为消息处理成功了;如果Stomp客户端在断开连接之前都没有发送ACK命令,那么Stomp代理端将在这个客户端断开连接后,将这条消息发送给其它客户端;

  • client 累计确认。ack 确认之前所有消息
  • client-individual独立确认。每条消息单独确认
>>> conn.subscribe('/queue/test', id=4, ack='client-individual')
on_before_message {'message-id': 'mybroker-14aa2', 'destination': '/queue/test', 'subscription': '4', 'content-length': '14'} test message 1
on_message {'message-id': 'mybroker-14aa2', 'destination': '/queue/test', 'subscription': '4', 'content-length': '14'} test message 1
>>> conn.ack('mybroker-14aa2', 4)

on_before_message {'message-id': 'mybroker-14ab2', 'destination': '/queue/test', 'subscription': '4', 'content-length': '14'} test message 2
on_message {'message-id': 'mybroker-14ab2', 'destination': '/queue/test', 'subscription': '4', 'content-length': '14'} test message 2
>>> conn.nack('mybroker-14ab2', 4)

不同的stomp 版本,ack参数有所差异

代码示例

# -*- coding: utf-8 -*-
import time
import sys
import stomp


class MyListener(stomp.ConnectionListener):

    def __init__(self, conn, print_to_log=False,):
        self.print_to_log = print_to_log
        self._conn = conn

    def __print(self, msg, *args):
        print(msg % args)

    def on_connecting(self, host_and_port):
        """
        :param (str,int) host_and_port:
        """
        self.__print('on_connecting %s %s', *host_and_port)

    def on_connected(self, headers, body):
        """
        :param dict headers:
        :param body:
        """
        self.__print('on_connected %s %s', headers, body)

    def on_disconnected(self):
        self.__print('on_disconnected')

    def on_message(self, headers, body):
        """
        Called by the STOMP connection when a MESSAGE frame is received.
        :param dict headers:
        :param body:
        """
        ack_id = headers.setdefault('message-id', None)
        subscription = headers.setdefault('subscription', None)
        # deal with something
        print("deal with  %s" % body)
        time.sleep(1)
        # deal over
        if ack_id and subscription:
            # send ACK frame
            self._conn.ack(ack_id, subscription)


def Producer():
    conn = stomp.Connection12([('192.168.83.132', 61613)])
    conn.connect('admin', 'password', wait=True)
    for i in range(1000):
        mes = str(i)
        # '/topic/q1' 表示名为test 的topic
        # '/queue/q1' 表示名为test 的queue
        conn.send(body=mes, destination='/queue/test')
        print("send %s" % mes)
        time.sleep(1)
    conn.disconnect()


def Consumer():
    conn = stomp.Connection([('192.168.83.132', 61613)])
    conn.set_listener('1', MyListener(conn=conn))
    conn.connect('admin', 'password', wait=True)
    try:
        conn.subscribe(destination='/queue/test', id="c1", ack='client-individual')
        while True:
            pass
    except KeyboardInterrupt:
        conn.disconnect()


if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("input argument."
              " 'c' is Consumer"
              " 'p' is Producer")
    else:
        argument_dict = {'c': Consumer, 'p': Producer}
        fun = argument_dict[sys.argv[1]]
        fun()

相关文章

  • ActiveMQ 使用小结

    Active MQ Active MQ 支持两类消息处理方式,分别为 Topic 方式: Queue 方式: To...

  • activemq小结

    activemq简单的实现运用并不难。 发送端,消费者端,按步骤创建所需的属性。 发送端: 1.先创建连接工厂Co...

  • 【JAVA】ActiveMQ

    消息队列ActiveMQ的使用详解 ActiveMQ安装 下载地址:http://activemq.apache....

  • SpringBoot集成ActiveMQ

    Spring Boot 集成ActiveMQ 使用ActiveMQ版本5.14.0,spring Boot版本1....

  • java消息队列ActiveMQ的简单使用

    activeMQ 是学习java消息队列的实现项目,使用jfinal +jfinal-ext + activeMQ...

  • ActiveMQ快速入门

    环境搭建 1.根据自己使用的操作系统,去ActiveMQ官网下载ActiveMQ。本文使用的是Windows,下载...

  • ActiveMQ集群的搭建(高可用)

    ActiveMQ集群的搭建(高可用) 使用ZooKeeper+ActiveMQ搭建高可用集群。 1 前提准备 Zo...

  • ActiveMq伪集群设置

    自己学习ActiveMq集群化的时候可以使用多台虚拟机模拟真实环境。也可以在单机上使用多个ActiveMq节点,也...

  • 消息队列ActiveMQ的初次使用

    #消息队列ActiveMQ的初次使用一个简单的整合案例, 初步实现 activemq 的 queue 队列模式 和...

  • springboot整合activemq

    1.linux安装activemq 本例使用docker pull的activemq的镜像,并没有安装,安装完成之...

网友评论

      本文标题:ActiveMQ 使用小结

      本文链接:https://www.haomeiwen.com/subject/uwpafhtx.html