美文网首页技术知识专栏
ActiveMQ 消息队列 简单了解

ActiveMQ 消息队列 简单了解

作者: 铁皮农夫 | 来源:发表于2020-05-23 18:18 被阅读0次

ActiveMQ 消息队列

1.消息队列的产生原因:

    对于用户体验有很大的提升。如用户注册时,要提示注册成功还要发手机信息、发邮件通知,让用户知道自己操作成功的良好体验。但是这些发手机消息等操作需要程序运作,需要时间,导致程序变慢。所以需要消息队列来管理发手机消息等操作,而不会影响提示注册成功的运行效率。

2.ActiveMQ运行原理:

    当有一个消息发过来时,将其保存在ActiveMQ中,然后通过JAVA有的JMS消息服务来监听ActiveMQ,有消息过来就执行这个消息的业务程序。是通过异步来实现的,来保证正常的业务正常执行。

3.ActiveMQ的应用场景:

    不影响主业务的一些优化,提高用户的体验的业务。如用户注册时,如果要输入验证码的业务就不能加入消息队列中。因为这个已经是会影响主业务的流程。

4.ActiveMQ的两种模式:

    p2p(Queue):单对单的操作。对时间依赖性高(MQ队列中的消息不管是什么时候加入进去的,JMS的监听都会消费执行掉)   如MQ中有一个消息队列,而JMS中有个监听队列一直看MQ中的消息队列有没有消息,有消息就执行掉这个消息的业务,成功后返回给MQ结果,让MQ将其消息删掉。   

    PB 发布订阅(Topic):多对多的操作,对时间依赖性低。如多个用户订阅一个书籍的话题时,MQ中书籍话题的多个消息是要推送给多个用户的。但是只会推送订阅后的消息,而订阅前的消息不会推送。

5.ActiveMQ类似的主流技术:

    Kafka:国外应用的技术,用zookeeper进行服务注册。

    RocketMQ:阿里巴巴仿照kafka制作出来的,主要是使用nameSrv简化了服务注册。

6.ActiveMQ的安装下载和使用:

    文件是解压版不需要安装,下载路径:http://activemq.apache.org 

    使用是执行运行解压文件bin目录下的win64下的activemq.bat文件。其中端口8161是用来查看消息的。账号/密码默认为admin/admin

7.JAVA代码实现ActiveMQ:

    生产消息的类如下:

package com.fenggf.MQ;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MQMain {

public static void main(String[] args) throws JMSException {

//创建连接工厂 JMS 用它创建连接

ConnectionFactory connectFac =  new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,

ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");

//JMS 客户端到JMS Provider 的连接

Connection conn = connectFac.createConnection();

conn.start();

//参数 是否开启事务模式,签收模式,默认是自动模式

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

//获取session注册参数值my-queue的名字

//Destination 消息的目的地,消息发送给谁

Destination destina = session.createQueue("fgf-queue");

//MessageProducer :消息生产者

MessageProducer producer = session.createProducer(destina);

//设置不持久

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

//发送一条消息

sendMSG(session,producer,"你好 消息队列 3333");

//关闭连接

conn.close();

}

private static void sendMSG(Session session, MessageProducer producer,

String msg) throws JMSException {

//创建一条文本消息

TextMessage message = session.createTextMessage(msg);

//通过消息生产者发出消息

producer.send(message);

}

}

监听消息的消费类代码如下:

package com.fenggf.MQ;

import javax.jms.Connection;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSMain {

public static void main(String[] args) throws JMSException {

//创建工厂

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER

, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");

//JMS 客户端到JMS provider 的连接

Connection connection = connectionFactory.createConnection();

connection.start();

//Session : 发送或者接收消息的线程

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//消息的目的地,消息发送给谁

//获取服务器上同名的消息队列

Destination destina = session.createQueue("fgf-queue");

//消息接收者

MessageConsumer consumer = session.createConsumer(destina);

while(true){

TextMessage message = (TextMessage)consumer.receive();

if(null != message){

System.out.println("收到消息为: "+message.getText());

//session.commit();

}else

break;

}

session.close();

connection.close();

}

}

相关文章

网友评论

    本文标题:ActiveMQ 消息队列 简单了解

    本文链接:https://www.haomeiwen.com/subject/owykahtx.html