ActiveMQ

作者: 赵铁柱啊 | 来源:发表于2018-07-23 23:08 被阅读22次

    什么是activeMQ

    activeMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。

    ActiveMQ的作用以及原理

    Activemq 的作用就是系统之间进行通信。 当然可以使用其他方式进行系统间通信, 如果使用 Activemq 的话可以对系统之间的调用进行解耦, 实现系统间的异步通信。 原理就是生产者生产消息, 把消息发送给activemq。 Activemq 接收到消息, 然后查看有多少个消费者, 然后把消息转发给消费者, 此过程中生产者无需参与。 消费者接收到消息后做相应的处理和生产者没有任何关系

    activemq的几种通信方式

    (1)publish(发布)-subscribe(订阅)(发布-订阅方式)

    发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

    (2)p2p(point-to-point)(点对点)

    p2p的过程则理解起来比较简单。它好比是两个人打电话,这两个人是独享这一条通信链路的。一方发送消息,另外一方接收,就这么简单。

    ActiveMQ服务器宕机怎么办?

    这得从ActiveMQ的储存机制说起。在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的,它们的最大限制在配置文件的<systemUsage>节点中配置。但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ会将内存中的非持久化消息写入临时文件中,以腾出内存。虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除。

    解决方案:尽量不要用非持久化消息,非要用的话,将临时文件限制尽可能的调大。

    丢消息怎么办?

    这得从java的java.net.SocketException异常说起。简单点说就是当网络发送方发送一堆数据,然后调用close关闭连接之后。这些发送的数据都在接收者的缓存里,接收者如果调用read方法仍旧能从缓存中读取这些数据,尽管对方已经关闭了连接。但是当接收者尝试发送数据时,由于此时连接已关闭,所以会发生异常,这个很好理解。不过需要注意的是,当发生SocketException后,原本缓存区中数据也作废了,此时接收者再次调用read方法去读取缓存中的数据,就会报Software caused connection abort: recv failed错误。

    通过抓包得知,ActiveMQ会每隔10秒发送一个心跳包,这个心跳包是服务器发送给客户端的,用来判断客户端死没死。如果你看过上面第一条,就会知道非持久化消息堆积到一定程度会写到文件里,这个写的过程会阻塞所有动作,而且会持续20到30秒,并且随着内存的增大而增大。当客户端发完消息调用connection.close()时,会期待服务器对于关闭连接的回答,如果超过15秒没回答就直接调用socket层的close关闭tcp连接了。这时客户端发出的消息其实还在服务器的缓存里等待处理,不过由于服务器心跳包的设置,导致发生了java.net.SocketException异常,把缓存里的数据作废了,没处理的消息全部丢失。

    解决方案:用持久化消息,或者非持久化消息及时处理不要堆积,或者启动事务,启动事务后,commit()方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。

    持久化消息非常慢

    默认的情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的,遇到慢一点的硬盘,发送消息的速度是无法忍受的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升。所以在发送持久化消息时,请务必开启事务模式。其实发送非持久化消息时也建议开启事务,因为根本不会影响性能。

    消息的不均匀消费

    有时在发送一些消息之后,开启2个消费者去处理消息。会发现一个消费者处理了所有的消息,另一个消费者根本没收到消息。原因在于ActiveMQ的prefetch机制。当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是1000条。这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费”,如果消息最后被消费,则会在服务器端被删除,如果消费者崩溃,则这些消息会被重新分配给新的消费者。但是如果消费者既不消费确认,又不崩溃,那这些消息就永远躺在消费者的缓存区里无法处理。更通常的情况是,消费这些消息非常耗时,你开了10个消费者去处理,结果发现只有一台机器吭哧吭哧处理,另外9台啥事不干。

    解决方案:将prefetch设为1,每次处理1条消息,处理完再去取,这样也慢不了多少。

    死信队列

    如果你想在消息处理失败后,不被服务器删除,还能被其他消费者处理或重试,可以关闭AUTO_ACKNOWLEDGE,将ack交由程序自己处理。那如果使用了AUTO_ACKNOWLEDGE,消息是什么时候被确认的,还有没有阻止消息确认的方法?有!

    消费消息有2种方法,一种是调用consumer.receive()方法,该方法将阻塞直到获得并返回一条消息。这种情况下,消息返回给方法调用者之后就自动被确认了。另一种方法是采用listener回调函数,在有消息到达时,会调用listener接口的onMessage方法。在这种情况下,在onMessage方法执行完毕后,消息才会被确认,此时只要在方法中抛出异常,该消息就不会被确认。那么问题来了,如果一条消息不能被处理,会被退回服务器重新分配,如果只有一个消费者,该消息又会重新被获取,重新抛异常。就算有多个消费者,往往在一个服务器上不能处理的消息,在另外的服务器上依然不能被处理。难道就这么退回--获取--报错死循环了吗?

    在重试6次后,ActiveMQ认为这条消息是“有毒”的,将会把消息丢到死信队列里。如果你的消息不见了,去ActiveMQ.DLQ里找找,说不定就躺在那里。

    java消息服务

    不同系统之间的信息交换,是我们开发中比较常见的场景,比如系统A要把数据发送给系统B,这个问题我们应该如何去处理? 1999年,原来的SUN公司领衔提出了一种面向消息的中间件服务--JMS规范(标准);常用的几种信息交互技术(httpClient、hessian、dubbo、jms、webservice 五种).

    JMS概述

    JMS即Java消息服务(Java Message Service的简称),是Java EE 的标准/规范之一。这种规范(标准)指出:消息的发送应该是异步的、非阻塞的。也就是说消息的发送者发送完消息后就直接返回了,不需要等待接收者返回后才能返回,发送者和接收者可以说是互不影响。所以这种规范(标准)能够减轻或消除系统瓶颈,实现系统之间去除耦合,提高系统的整体可伸缩性和灵活性。JMS只是Java EE中定义的一组标准API,它自身并不是一个消息服务系统,它是消息传送服务的一个抽象,也就是说它定义了消息传送的接口而并没有具体实现。

    ActiveMQ概述:

    我们知道JMS只是消息服务的一组规范和接口,并没有具体的实现,而ActiveMQ就是JMS规范的具体实现;它是Apache下的一个项目,采用Java语言开发;是一款非常流行的开源消息服务器.

    ActiveMQ与JMS关系

    我们知道,JMS只是定义了一组有关消息传送的规范和标准,并没有真正实现,也就说JMS只是定义了一组接口而已;就像JDBC抽象了关系数据库访问、JPA抽象了对象与关系数据库映射、JNDI抽象了命名目录服务访问一样,JMS具体的实现由不同的消息中间件厂商提供,比如Apache ActiveMQ就是JMS规范的具体实现,Apache ActiveMQ才是一个消息服务系统,而JMS不是。

    基本要素

    1、生产者producer ; 2、消费者consumer ; 3、消息服务broker

    JMS两种消息传送模式

    点对点( Point-to-Point):专门用于使用队列Queue传送消息;基于队列Queue的点对点消息只能被一个消费者消费,如多个消费者都注册到同一个消息队列上,当生产者发送一条消息后,而只有其中一个消费者会接收到该消息,而不是所有消费者都能接收到该消息。

    发布/订阅(Publish/Subscribe):专门用于使用主题Topic传送消息。基于主题的发布与订阅消息能被多个消费者消费,生产者发送的消息,所有订阅了该topic的消费者都能接收到。

    JMS API 概览

    JMS API可以分为3个主要部分:

    1、公共API:可用于向一个队列或主题发送消息或从其中接收消息;

    2、点对点API:专门用于使用队列Queue传送消息;

    3、发布/订阅API:专门用于使用主题Topic传送消息。

    JMS公共API

    在JMS公共API内部,和发送与接收消息有关的JMS API接口主要是:ConnectionFactory / Connection / Session / Message / Destination / MessageProducer / MessageConsumer . 它们的关系是:一旦有了ConnectionFactory,就可以创建Connection,一旦有了Connection,就可以创建Session,而一旦有了Session,就可以创建 Message 、MessageProducer 和 MessageConsumer 。

    JMS点对点API

    点对点(p2p)消息传送模型API是指JMS API之内基于队列(Queue)的接口:QueueConnectionFactory / QueueConnection / QueueSession / Message / Queue / QueueSender / QueueReceiver .

    从接口的命名可以看出,大多数接口名称仅仅是在公共API接口

    名称之前添加Queue一词。一般来说,使用点对点消息传送模型的应用程序将使用基于队列的API,而不使用公共API 。

    JMS发布/订阅API发布/订阅消息传送模型API是指JMS

    API之内基于主题(Topic)的接口:TopicConnectionFactory / TopicConnection /

    TopicSession / Message / Topic / TopicPublisher / TopicSubscriber . 由于基于主题(Topic)的JMS API类似于基于队列(Queue)

    的API,因此在大多数情况下,Queue这个词会由Topic取代。

    ActiveMQ点对点发送与接收消息示例

    简单示例

    发送

    package com.kinglong.activemq.queue; 
    
    import org.apache.activemq.ActiveMQConnectionFactory; 
    
    import javax.jms.*; 
    
    /** 
    * 消息发送者 
    * 
    */ 
    public class Sender { 
    
    /**消息服务器的连接地址**/ 
    public static final String BROKER_URL = "tcp://192.168.174.129:61616"; 
    
    public static void main(String[] args) { 
    Sender sender = new Sender(); 
    sender.sendMessage("Hello ActiveMQ."); 
    } 
    
    /** 
    * 发送消息 
    * 
    * @param msg 
    */ 
    public void sendMessage (String msg) { 
    
    Connection connection = null; 
    Session session = null; 
    MessageProducer messageProducer = null; 
    
    try { 
    //1.创建一个连接工厂 
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); 
    
    //2.创建一个连接 
    connection = connectionFactory.createConnection(); 
    
    //3.创建一个Session 
    session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
    
    //4.创建消息,此处创建了一个文本消息 
    Message message = session.createTextMessage(msg); 
    
    //5.创建一个目的地 
    Destination destination = session.createQueue("myQueue"); 
    
    //6.创建一个消息的生产者(发送者) 
    messageProducer = session.createProducer(destination); 
    
    //7.发送消息 
    messageProducer.send(message); 
    
    } catch (JMSException e) { 
    e.printStackTrace(); 
    } finally { 
    try { 
    //关闭连接释放资源 
    if (null != messageProducer) { 
    messageProducer.close(); 
    } 
    if (null != session) { 
    session.close(); 
    } 
    if (null != connection) { 
    connection.close(); 
    } 
    } catch (JMSException e) { 
    e.printStackTrace(); 
    } 
    } 
    } 
    }
    

    接收

    package com.kinglong.activemq.queue; 
    
    import org.apache.activemq.ActiveMQConnectionFactory; 
    
    import javax.jms.*; 
    
    public class Receiver { 
    
    /**消息服务器的连接地址**/ 
    public static final String BROKER_URL = "tcp://192.168.174.129:61616"; 
    
    public static void main(String[] args) { 
    Receiver receiver = new Receiver(); 
    receiver.receiveMessage(); 
    } 
    
    /** 
    * 接收消息 
    * 
    */ 
    public void receiveMessage () { 
    
    Connection connection = null; 
    Session session = null; 
    MessageConsumer messageConsumer = null; 
    
    try { 
    //1.创建一个连接工厂 
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); 
    
    //2.创建一个连接 
    connection = connectionFactory.createConnection(); 
    
    //3.创建一个Session 
    session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
    
    //4.创建一个目的地 
    Destination destination = session.createQueue("myQueue"); 
    
    //5.创建一个消息的消费者(接收者) 
    messageConsumer = session.createConsumer(destination); 
    
    //接收消息之前,需要把连接启动一下 
    connection.start(); 
    
    //6.接收消息 
    Message message = messageConsumer.receive(); 
    
    //判断消息的类型 
    if (message instanceof TextMessage) { //判断是否是文本消息 
    String text = ((TextMessage) message).getText(); 
    System.out.println("接收到的消息内容是:" + text); 
    } 
    } catch (JMSException e) { 
    e.printStackTrace(); 
    } finally { 
    try { 
    //关闭连接释放资源 
    if (null != messageConsumer) { 
    messageConsumer.close(); 
    } 
    if (null != session) { 
    session.close(); 
    } 
    if (null != connection) { 
    connection.close(); 
    } 
    } catch (JMSException e) { 
    e.printStackTrace(); 
    } 
    } 
    } 
    }
    

    总结

    1. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
    // 其中:Boolean.FALSE表示本次会话不开启事务管理,假如需要开启事务管理,将其改为Boolean.TRUE即可 
    //同时需要在发送消息后添加session.commit(),否则,消息是不会被提交的. 
    //Session.AUTO_ACKNOWLEDGE表示消息确认机制 
    AUTO_ACKNOWLEDGE:自动确认 
    CLIENT_ACKNOWLEDGE:客户端确认 
    SESSION_TRANSACTED:事务确认,如果使用事务推荐使用该确认机制 
    AUTO_ACKNOWLEDGE:懒散式确认,消息偶尔不会被确认,也就是消息可能会被重复发送.但发生的概率很小 
    2. connection.start(); 
    //在消息接收端,接受消息前需要加入这段代码,开启连接,否则一样无法获取消息. 
    3. Destination destination = session.createQueue("myQueue"); 
    //创建目的地时,如果做测试收不到信息,可以将目的地名称修改一下,我用的是IDEA,不清楚为何, 
    //有时候收不到信息,修改一下就好了,猜测可能是缓存的原因吧
    

    发布与订阅的topic方式实际与点对点的queue方式,代码通用很多,只是在创建目的地Destination时候创建为

    Destination destination = session.createTopic("myTopic
    

    Queue与Topic比较

    比较项 队列 主题
    概要说明 point-to-ponit点对点小欻 publish/subscribe发布订阅消息
    消息完整性保障 队列保证每条数据都能被receiver接收 不保证publisher发布的每条数据,subscriber都能接收到
    消息是否会丢失 sender发送消息到目标队列,receiver可以同步或者异步接收到这个队列上的消息,队列上的消息如果暂时没有被receiver接收到,也不会丢失 publisher发布消息到目标主题时,只有正在监听该主题的subscriber能够接收到消息,如果目标topic没有subscriber在监听,该主题上的消息就会丢失。
    消息传送策略 一对一的消息发布接受策略,一个sender发送的消息,只能有一个receiver接收,receiver接收到后,通知mq服务器已接收,mq服务器对队列里的消息采取删除或其他操作。 一对多的消息发布接收策略,监听同一个主题地址的多个subscriber都能接收到publisher发送的消息,subscriber接收完通知mq服务器。

    拉模式与推模式

    a.点对点消息,如果没有消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是

    传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得(拉模式)。

    b.pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。

    ActiveMQ消息类型

    1、TextMessage 文本消息:携带一个java.lang.String作为有效数据(负载)的消息,可用于字符串类型的信息交换;

    2、ObjectMessage 对象消息:携带一个可以序列化的Java对象作为有效负载的消息,可用于Java对象类型的信息交换;

    3、MapMessage 映射消息:携带一组键值对的数据作为有效负载的消息,有效数据值必须是Java原始数据类型(或者它们的包装类)及String。即:byte , short , int , long , float , double , char , boolean , String

    4、BytesMessage 字节消息 :携带一组原始数据类型的字节流作为有效负载的消息;

    5、StreamMessage 流消息:携带一个原始数据类型流作为有效负载的消息,它保持了写入流时的数据类型,写入什么类型,则读取也需要是相同的类型;

    ActiveMQ事务消息和非事务消息

    消息分为事务消息和非事务消息

    1、事务消息:创建会话Session使用transacted=true

    connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    

    2、非事务消息:创建会话Session使用transacted=false

    connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    

    事务消息必须在发送和接收完消息后显式地调用session.commit();

    事务性消息,不管设置何种消息确认模式,都会自动被确认;与设置的确认机制无关,但官方推荐事务性消息使用事务确认机制.

    ActiveMQ消息确认机制

    消息只有在被确认之后,才认为已经被成功消费,然后消息才会从队列或主题中删除。消息的成功消费通常包含三个阶段:


    (1)、客户接收消息;(2)、客户处理消息;(3)、消息被确认;

    确认机制前面提过一下,共有四种:

    (1)、Session.AUTO_ACKNOWLEDGE;客户(消费者)成功从receive方法返回时,或者从MessageListener.onMessage方法成功返回时,会话自动确认消息,然后自动删除消息.

    (2)、Session.CLIENT_ACKNOWLEDGE;客户通过显式调用消息的acknowledge方法确认消息,。 即在接收端调用message.acknowledge();方法,否则,消息是不会被删除的.

    (3)、Session. DUPS_OK_ACKNOWLEDGE ;不是必须确认,是一种“懒散的”消息确认,消息可能会重复发送,在第二次重新传送消息时,消息头的JMSRedelivered会被置为true标识当前消息已经传送过一次,客户端需要进行消息的重复处理控制。

    (4)、 Session.SESSION_TRANSACTED;事务提交并确认。

    ActiveMQ持久化消息与非持久化消息

    messageProducer.setDeliveryMode(DeliveryMode. NON_PERSISTENT);//不持久化
    messageProducer.setDeliveryMode(DeliveryMode.
    PERSISTENT);//持久化的,当然activemq发送消息默认都是持久化的
    

    说明:

    设置完后,如果为持久化,那么消息在没有被消费前都会被写入本地磁盘kahadb文件中保存起来,即使服务器宕机,也不会影响

    消息.如果是非持久化的,那么,服务一旦宕机之类的情况发生,消息即会被删除.

    ActiveMQ默认是持久化的.

    ActiveMQ消息过滤

    ActiveMQ提供了一种机制,可根据消息选择器中的标准来执行消息过滤,只接收符合过滤标准的消息;

    生产者可在消息中放入特有的标志,而消费者使用基于这些特定的标志来接收消息;

    1、发送消息放入特殊标志:message . setString Property ( name , value ) ;

    2、接收消息使用基于特殊标志的消息选择器:

    MessageConsumer createConsumer(Destination destination, String messageSelector);
    

    注:消息选择器是一个字符串,语法与数据库的SQL相似,相当于SQL语句where条件后面的内容;

    具体代码如下:

    发送端代码:

    package com.bjpowernode.activemq.selector; 
    
    import org.apache.activemq.Active
    
    MQConnectionFactory; 
    
    import javax.jms.*; 
    
    /** 
    * 消息发送者 
    * 
    */ 
    public class Sender { 
    
    /**消息服务器的连接地址**/ 
    public static final String BROKER_URL = "tcp://192.168.174.129:61616"; 
    
    public static void main(String[] args) { 
    Sender sender = new Sender(); 
    sender.sendMessage("Hello ActiveMQ."); 
    } 
    
    /** 
    * 发送消息 
    * 
    * @param msg 
    */ 
    public void sendMessage (String msg) { 
    
    Connection connection = null; 
    Session session = null; 
    MessageProducer messageProducer = null; 
    
    try { 
    //1.创建一个连接工厂 
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); 
    
    //2.创建一个连接 
    connection = connectionFactory.
    
    createConnection(); 
    
    //3.创建一个Session 
    session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
    
    
    
    //5.创建一个目的地 
    Destination destination = session.create
    
    Queue("myQueue"); 
    
    //6.创建一个消息的生产者(发送者) 
    messageProducer = session.createProducer
    
    (destination); 
    
    //设置发送的消息是否需要持久化 
    messageProducer.setDeliveryMode(Delivery
    
    Mode.NON_PERSISTENT);//这里使用不持久化 
    
    //创建一个循环,测试消息标识的使用 
    for (int i=0; i<20; i++) { 
    //4.创建消息,此处创建了一个文本消息 
    Message message = session.createText
    
    Message(msg+i); 
    
    //将消息设置一个特有的标识 
    message.setIntProperty("id", i); 
    
    //7.发送消息 
    messageProducer.send(message); 
    } 
    } catch (JMSException e) { 
    e.printStackTrace(); 
    } finally { 
    try { 
    //关闭连接释放资源 
    if (null != messageProducer) { 
    messageProducer.close(); 
    } 
    if (null != session) { 
    session.close(); 
    } 
    if (null != connection) { 
    connection.close(); 
    } 
    } catch (JMSException e) { 
    e.printStackTrace(); 
    } 
    } 
    } 
    }
    

    接收端代码:

    package com.bjpowernode.activemq.selector; 
    
    import org.apache.activemq.ActiveMQ
    
    ConnectionFactory; 
    
    import javax.jms.*; 
    
    public class Receiver { 
    
    /**消息服务器的连接地址**/ 
    public static final String BROKER_URL = "tcp://192.168.174.129:61616"; 
    
    public static void main(String[] args) { 
    Receiver receiver = new Receiver(); 
    receiver.receiveMessage(); 
    } 
    
    /** 
    * 接收消息 
    * 
    */ 
    public void receiveMessage () { 
    
    Connection connection = null; 
    Session session = null; 
    MessageConsumer messageConsumer = null; 
    
    try { 
    //1.创建一个连接工厂 
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); 
    
    //2.创建一个连接 
    connection = connectionFactory.
    
    createConnection(); 
    
    //3.创建一个Session 
    session = connection.createSession
    
    (Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
    
    //4.创建一个目的地 
    Destination destination = session.createQueue("myQueue"); 
    
    //5.创建一个消息的消费者(接收者),
    
    selector即为消息选择器,通过选择需要的标识,过滤消息接受id为10-15之 //间的消息 
    String selector = "id >=10 and id<=15"; 
    messageConsumer = session.createConsumer
    
    (destination, selector); 
    
    //接收消息之前,需要把连接启动一下 
    connection.start(); 
    
    while (true) { 
    //6.接收消息 同步接收 
    Message message = messageConsumer.
    
    receive(); 
    
    //判断消息的类型 
    if (message instanceof TextMessage)
    
    { //判断是否是文本消息 
    String text = ((TextMessage) message).getText(); 
    System.out.println("接收到的消息内容是:" + text); 
    } 
    } 
    } catch (JMSException e) { 
    e.printStackTrace(); 
    } finally { 
    try { 
    //关闭连接释放资源 
    if (null != messageConsumer) { 
    messageConsumer.close(); 
    } 
    if (null != session) { 
    session.close(); 
    } 
    if (null != connection) { 
    connection.close(); 
    } 
    } catch (JMSException e) { 
    e.printStackTrace(); 
    } 
    } 
    } 
    }
    

    ActiveMQ消息接收方式

    同步接收:receive()方法接收消息叫同步接收,就是之前的Demo代码使用的接收方式.在不使用循环方法时接收端代码执行

    一次即结束.

    异步接收:使用监听器接收消息,这种接收方式叫异步接收,接收端会一直处于监听状态,只要有消息产生,即会接收消息.

    下面是异步接收代码:

    package com.bjpowernode.activemq.listener; 
    
    import org.apache.activemq.ActiveMQConnectionFactory; 
    
    import javax.jms.*; 
    
    public class Receiver { 
    
    /**消息服务器的连接地址**/ 
    public static final String BROKER_URL = "tcp://192.168.174.129:61616"; 
    
    public static void main(String[] args) { 
    Receiver receiver = new Receiver(); 
    receiver.receiveMessage(); 
    } 
    
    /** 
    * 接收消息 
    * 
    */ 
    public void receiveMessage () { 
    
    Connection connection = null; 
    Session session = null; 
    MessageConsumer messageConsumer = null; 
    
    try { 
    //1.创建一个连接工厂 
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); 
    
    //2.创建一个连接 
    connection = connectionFactory.createConnection(); 
    
    //3.创建一个Session 
    session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
    
    //4.创建一个目的地 
    Destination destination = session.createQueue("myQueue"); 
    
    //5.创建一个消息的消费者(接收者) 
    messageConsumer = session.createConsumer(destination); 
    
    //接收消息之前,需要把连接启动一下 
    connection.start(); 
    
    //6.接收消息 同步接收 
    //Message message = messageConsumer.receive(); 
    
    //异步接收,使用监听器接收消息 
    messageConsumer.setMessageListener(new MessageListener(){ 
    public void onMessage(Message message) { 
    //判断消息的类型 
    if (message instanceof TextMessage) { //判断是否是文本消息 
    String text = null; 
    try { 
    text = ((TextMessage) message).getText(); 
    } catch (JMSException e) { 
    e.printStackTrace(); 
    } 
    System.out.println("接收到的消息内容是:" + text); 
    } 
    } 
    }); 
    } catch (JMSException e) { 
    e.printStackTrace(); 
    } finally { 
    /*try { 
    //关闭连接释放资源 
    if (null != messageConsumer) { 
    messageConsumer.close(); 
    } 
    if (null != session) { 
    session.close(); 
    } 
    if (null != connection) { 
    connection.close(); 
    } 
    } catch (JMSException e) { 
    e.printStackTrace(); 
    }*/ 
    } 
    } 
    }
    

    相关文章

      网友评论

      本文标题:ActiveMQ

      本文链接:https://www.haomeiwen.com/subject/rgusmftx.html