美文网首页
ActiveMQ学习笔记

ActiveMQ学习笔记

作者: 青年心路 | 来源:发表于2019-08-01 14:08 被阅读0次
一、ActiveMQ简介
1.什么是ActiveMQ

ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1 和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

2.什么是消息

“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串,也可以更复杂,可以包含嵌入对象。

3.什么是队列

是一种有序的,先进先出的数据结构,例如:生活中的排队

4.什么是消息队列

“消息队列”是在消息的传输过程中保存消息的容器

5.常见消息服务应用
  • ActiveMQ
  • RabbitMQ
  • RocketMQ
二、消息服务的应用场景

消息队列的特点主要是异步处理,主要作用是减少消息请求和响应的时间以及解耦。所以主要用于比较耗时并且不需要即时(同步)返回结果的操作。

image.png
2.1 异步处理
2.1.1 用户注册
用户注册流程:
  • 注册处理及写入数据库
  • 发送注册成功的手机短信
  • 发送注册成功的邮件信息

如果使用消息中间件,则可以创建两个线程来做这些事情,直接发送消息给消息中间件,然后让邮件服务和短信服务去消息中间件中取消息,取到消息后自己再做对应的操作。

2.2 应用的解耦
2.2.1 订单处理
生成订单流程:
  • 在购物车中点击结算
  • 完成支付
  • 创建订单
  • 调用库存系统

订单完成后,订单系统不用直接取调用库存系统,而是发送消息到消息中间件,写入一个订单信息。库存系统自己去消息中间件中获取,然后做发货处理,并更新库存。

2.3流量的削峰
2.3.1 秒杀功能
秒杀流程
  • 用户点击秒杀
  • 发送请求到秒杀应用
  • 在请求秒杀应用之前将请求放入到消息队列
  • 秒杀应用从消息队列中获取请求并处理

系统举行秒杀活动,流量蜂拥而至100件商品,10万人挤进来怎么办?
将10万秒杀的操作,放入消息队列。秒杀应用将10万个请求中的前100个进行处理,其它的驳回通知失败。这样将流量控制在了消息队列处。秒杀应用不会被怼死。

三、JMS
1.什么是JMS

JMS(Java Message Service)是Java平台上面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且提供标准的产生、发送、接收消息的接口,简化企业应用的开发。

2.JMS模型
2.1 点对点模型(Point To Point)

生产者发送一条消息到queue,只有一个消费者能收到。


image.png
2.2 发布订阅模型(Publish/Subscribe)

发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。


image.png
四、ActiveMQ安装
1.下载资源

ActiveMQ官网:http://activemq.apache.org

1.1 版本说明

ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。
ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。

2.上传至Linux服务器
3.解压安装文件
tar -zxf apache-activemq-5.9.0-bin.tar.gz
4.检查权限
ls -al apache-activemq-5.9.0/bin

如果权限不足,则无法执行,需要修改文件权限:

chmod 755 activemq
5.复制应用至本地目录
cp apache-activemq-5.9.0 /usr/local/activemq -r
6.启动ActiveMQ
/usr/local/activemq/bin/activemq start
7.测试ActiveMQ
7.1检查进程
ps aux|grep activemq
7.2管理界面

使用浏览器访问ActiveMQ管理应用,地址如下:
http://ip:8161/admin/
用户名:admin
密码:admin
AcitveMQ使用的是Jetty提供的HTTP服务。启动稍慢,建议短暂等待再访问测试。
见到如下界面代表服务启动成功

image.png
7.3 修改访问端口(管理应用监听的端口)

修改ActiveMQ配置文件:

/usr/local/activemq/conf/jetty.xml
image.png
7.4 修改用户名和密码

修改conf/users.properties配置文件,内容为:用户名=密码
保存并启动ActiveMQ服务即可。


image.png
8.重启ActiveMQ
/usr/local/activemq/bin/activemq restart
9.关闭ActiveMQ
/usr/local/activemq/bin/activemq stop
10.配置文件activemq.xml

配置文件中,配置的是ActiveMQ的核心配置信息,是提供服务时使用的配置,可以修改启动的访问端口,即Java编程中访问ActiveMQ的访问端口


image.png

默认端口:61616(编程时使用的端口)
使用协议:TCP协议
修改端口后,保存并重启ActiveMQ服务即可

11.ActiveMQ目录介绍

bin:可执行的脚本文件
conf:相关的配置文件
data:存放的是日志文件
docs:存放的是相关文档
examples:存放的是简单的实例
lib:相关的jar包
webapps:用于存放项目的目录

五、ActiveMQ术语
1.Destination

目的地,JMS Provider(消息中间件)负责维护,用于对Message进行管理的对象。MessageProducer需要指定Destination才能发送消息,MessageReceiver需要指定Destination才能接收消息

2.Producer

消息生成者,负责发送Message到目的地。

3.Consumer|Receiver

消息消费者,负责从目的地中消费(处理/监听/订阅)Message

4.Message

消息,用于封装一次通信的内容

六、ActiveMQ应用
1.ActiveMQ常用API简介

下述API都是接口类型,定义在javax.jms包中

1.1 ConnectionFactory

连接工厂:用于创建连接的工厂类型

1.2 Connection

连接:用于建立访问ActiveMQ连接的类型,由连接工厂创建

1.3 Session

会话:一次持久、有效、有状态的访问,由连接创建

1.4 Destination & Queue

目的地:用于描述本次访问ActiveMQ的消息访问目的地,即ActiveMQ服务中的具体队列,由会话创建
Interface Queue extends Destination

1.5 MessageProducer

消息生产者:在一次有效会话中,用于发送消息给ActiveMQ的服务工具,由会话创建

1.6 MessageConsumer

消息消费者:在一次有效会话中,用于从ActiveMQ中获取消息的工具,由会话创建

1.7 Message

消息:通过消息生产者向ActiveMQ服务发送消息时使用的数据载体对象或消息消费者从ActiveMQ服务中获取消息时使用的数据载体对象,是所有消息(文本消息、对象消息等)具体类型的顶级接口,可以通过会话创建或通过会话从ActiveMQ服务中获取

2.JMS-HelloWorld
2.1 处理文本消息
2.1.1 创建消息生产者
创建工程

mq-producer

添加坐标
    <!--activeMQ-->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>${activemq-all.version}</version>
    </dependency>
编写消息生产者
public class HelloWorldProducer {

    public void sendHelloWorldActiveMQ(String msgText){
        //定义连接工厂
        ConnectionFactory connectionFactory = null;

        //定义连接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生产者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            //传入的用户名和密码可以通过jetty-realm.properties文件修改
            //brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接(此时才是真正创建连接)
            connection.start();

            /**
             * 创建会话
             * transacted:是否使用事务,可选值为true,false
             *              true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
             *              false:不使用事务,设置此变量 则acknowledgeMode必须设置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自动确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
            destination = session.createQueue("helloworld-destination");

            //创建消息生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createTextMessage(msgText);

            //发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.1.2 创建消息消费者
创建工程

mq-consumer

添加坐标
<!--activeMQ-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
</dependency>
创建消息生产者
public class HelloWorldConsumer {

    public void receiveHelloWorldActiveMQ() {
        //定义连接工厂
        ConnectionFactory connectionFactory = null;
        //定义连接
        Connection connection = null;
        //定义会话
        Session session = null;
        //定义目的地
        Destination destination = null;
        //定义消息消费者
        MessageConsumer consumer = null;
        //定义消息
        Message message = null;

        try {
            //创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //创建连接对象
            connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //创建目的地
            destination = session.createQueue("helloworld-destination");
            //创建消息消费者
            consumer = session.createConsumer(destination);
            //接收消息
            message = consumer.receive();

            //获取文本消息
            String msg = ((TextMessage) message).getText();
            System.out.println("从ActiveMQ中获取的文本信息:" + msg);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.1.3 测试
测试生产者
public class Test {
    public static void main(String[] args) {
        HelloWorldProducer producer = new HelloWorldProducer();
        producer.sendHelloWorldActiveMQ("HelloWorld");
    }
}
测试消费者
public class Test {
    public static void main(String[] args) {
        HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.receiveHelloWorldActiveMQ();
    }
}
image.png
2.2 处理对象消息
2.2.1 创建对象
public class User implements Serializable {

    private Integer userId;
    private String userName;
    private Integer userAge;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public Integer getUserAge() {
        return userAge;
    }

    public void setUserAge(Integer userAge) {
        this.userAge = userAge;
    }

    @Override
    public String toString() {
        return "User{" +
                "userId=" + userId +
                ", userName='" + userName + '\'' +
                ", userAge=" + userAge +
                '}';
    }
}
2.2.2 创建生产者
public class HelloWorldProducer2 {

    public void sendHelloWorldActiveMQ(User user){
        //定义连接工厂
        ConnectionFactory connectionFactory = null;

        //定义连接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生产者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            //传入的用户名和密码可以通过jetty-realm.properties文件修改
            //brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接(此时才是真正创建连接)
            connection.start();

            /**
             * 创建会话
             * transacted:是否使用事务,可选值为true,false
             *              true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
             *              false:不使用事务,设置此变量 则acknowledgeMode必须设置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自动确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
            destination = session.createQueue("my-user");

            //创建消息生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createObjectMessage(user);

            //发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.2.3 创建消费者
public class HelloWorldConsumer2 {

    public void receiveHelloWorldActiveMQ() {
        //定义连接工厂
        ConnectionFactory connectionFactory = null;
        //定义连接
        Connection connection = null;
        //定义会话
        Session session = null;
        //定义目的地
        Destination destination = null;
        //定义消息消费者
        MessageConsumer consumer = null;
        //定义消息
        Message message = null;

        try {
            //创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //创建连接对象
            connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //创建目的地
            destination = session.createQueue("my-user");
            //创建消息消费者
            consumer = session.createConsumer(destination);
            //接收消息
            message = consumer.receive();

            Serializable obj = ((ObjectMessage) message).getObject();

            User user = (User) obj;
            System.out.println("从ActiveMQ中获取的对象信息:" + user);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.2.4 测试
public class Test {
    public static void main(String[] args) {
        /*HelloWorldProducer producer = new HelloWorldProducer();
        producer.sendHelloWorldActiveMQ("HelloWorld");*/

        HelloWorldProducer2 producer2 = new HelloWorldProducer2();
        producer2.sendHelloWorldActiveMQ(new User(1, "tom", 21));
    }
}
public class Test {
    public static void main(String[] args) {
        /*HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.receiveHelloWorldActiveMQ();*/

        HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
        consumer2.receiveHelloWorldActiveMQ();
    }
}
image.png
3.JMS-实现队列服务监听
队列监听使用了观察者模式
3.1 创建消息生产者
public class HelloWorldProducer3 {

    public void sendHelloWorldActiveMQ(User user){
        //定义连接工厂
        ConnectionFactory connectionFactory = null;

        //定义连接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生产者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            //传入的用户名和密码可以通过jetty-realm.properties文件修改
            //brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接(此时才是真正创建连接)
            connection.start();

            /**
             * 创建会话
             * transacted:是否使用事务,可选值为true,false
             *              true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
             *              false:不使用事务,设置此变量 则acknowledgeMode必须设置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自动确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
            destination = session.createQueue("my-destination");

            //创建消息生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createObjectMessage(user);

            //发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
3.2 创建消息消费者
public class HelloWorldConsumer3 {

    public void receiveHelloWorldActiveMQ() {
        //定义连接工厂
        ConnectionFactory connectionFactory = null;
        //定义连接
        Connection connection = null;
        //定义会话
        Session session = null;
        //定义目的地
        Destination destination = null;
        //定义消息消费者
        MessageConsumer consumer = null;
        //定义消息
        Message message = null;

        try {
            //创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //创建连接对象
            connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //创建目的地
            destination = session.createQueue("my-destination");
            //创建消息消费者
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {

                //ActiveMQ的回调方法,通过该方法将消息传递到consumer中
                @Override
                public void onMessage(Message message) {
                    Serializable obj = null;
                    try {
                        obj = ((ObjectMessage) message).getObject();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }

                    User user = (User) obj;
                    System.out.println("从ActiveMQ中获取的对象信息:" + user);
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
4.Topic模型
4.1 Publish/Subscribe 处理模式(Topic)

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到topic的消息会被所有订阅者消费
当生产者发布消息,不管是否有消费者,都不会保存消息
一定要先有消息的消费者,后有消息生产者

image.png
4.2 创建消息生产者
public class HelloWorldProducerTopic {

    public void sendHelloWorldActiveMQ(String msgText){
        //定义连接工厂
        ConnectionFactory connectionFactory = null;

        //定义连接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生产者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            //传入的用户名和密码可以通过jetty-realm.properties文件修改
            //brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接(此时才是真正创建连接)
            connection.start();

            /**
             * 创建会话
             * transacted:是否使用事务,可选值为true,false
             *              true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
             *              false:不使用事务,设置此变量 则acknowledgeMode必须设置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自动确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
            destination = session.createTopic("test-topic");

            //创建消息生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createTextMessage(msgText);

            //发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
4.3 创建消息消费者

创建三份

public class HelloWorldConsumerTopic1 implements Runnable{

    public void receiveHelloWorldActiveMQ() {
        //定义连接工厂
        ConnectionFactory connectionFactory = null;
        //定义连接
        Connection connection = null;
        //定义会话
        Session session = null;
        //定义目的地
        Destination destination = null;
        //定义消息消费者
        MessageConsumer consumer = null;
        //定义消息
        Message message = null;

        try {
            //创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //创建连接对象
            connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //创建目的地
            destination = session.createTopic("test-topic");
            //创建消息消费者
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //获取文本消息
                    String msg = null;
                    try {
                        msg = ((TextMessage) message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    System.out.println("从ActiveMQ中获取的文本信息----topic1:" + msg);
                }
            });

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        receiveHelloWorldActiveMQ();
    }
}
4.4 测试
public class Test {
    public static void main(String[] args) {
        /*HelloWorldProducer producer = new HelloWorldProducer();
        producer.sendHelloWorldActiveMQ("HelloWorld");*/

        /*HelloWorldProducer2 producer2 = new HelloWorldProducer2();
        producer2.sendHelloWorldActiveMQ(new User(1, "tom", 21));*/

        /*HelloWorldProducer3 producer3 = new HelloWorldProducer3();
        producer3.sendHelloWorldActiveMQ(new User(2,"alice",19));*/

        HelloWorldProducerTopic topic = new HelloWorldProducerTopic();
        topic.sendHelloWorldActiveMQ("Hello Topic");
    }
}
public class Test {
    public static void main(String[] args) {
        /*HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.receiveHelloWorldActiveMQ();*/

        /*HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
        consumer2.receiveHelloWorldActiveMQ();*/

        /*HelloWorldConsumer3 consumer3 = new HelloWorldConsumer3();
        consumer3.receiveHelloWorldActiveMQ();*/

        HelloWorldConsumerTopic1 topic1 = new HelloWorldConsumerTopic1();
        Thread thread1 = new Thread(topic1);
        thread1.start();

        HelloWorldConsumerTopic2 topic2 = new HelloWorldConsumerTopic2();
        Thread thread2 = new Thread(topic2);
        thread2.start();

        HelloWorldConsumerTopic3 topic3 = new HelloWorldConsumerTopic3();
        Thread thread3 = new Thread(topic3);
        thread3.start();
    }
}
image.png
七、Spring整合ActiveMQ
1.创建项目

创建spring-activemq-producer

1.1 添加坐标
    <dependencies>
        <!--activeMQ-->
        <!--ActiveMQ客户端完整jar包依赖-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.geronimo.specs</groupId>
                    <artifactId>geronimo-jms_1.1_spec</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--ActiveMQ和Spring整合配置文件标签处理jar包依赖-->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
        </dependency>
        <!--Spring JMS插件相关的jar包依赖-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>

        <!--Active Pool-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-jms-pool</artifactId>
        </dependency>

        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <!-- 日志处理 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>
        <!-- spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
        </dependency>

        <!--javaee-->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jsp-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jstl</artifactId>
        </dependency>

        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
        </dependency>
    </dependencies>
1.2 整合ActiveMQ
  • web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
          http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
         version="2.5">

    <servlet>
        <servlet-name>springmvc</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:spring-*.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>springmvc</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
    
    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>
</web-app>
  • spring-mvc.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--扫包-->
    <context:component-scan base-package="com.hxx.web.controller"/>
    
    <!--添加注解驱动-->
    <mvc:annotation-driven/>

    <!--配置视图解析器-->
    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
        <property name="prefix" value="/WEB-INF/jsp/"/>
        <property name="suffix" value=".jsp"/>
    </bean>

    <!--放行静态资源-->
</beans>
  • spring-service
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--加载资源文件-->
    <context:property-placeholder location="classpath:resource.properties"/>

    <!--扫描bean对象-->
    <context:component-scan base-package="com.hxx.service.impl"/>
</beans>
  • spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!--创建一个连接工厂,连接ActiveMQ,ActiveMQConnectionFactory,需要依赖ActiveMQ提供的amq标签-->
    <!--amq:connectionFactory是bean的子标签,会在Spring容器中创建一个bean对象,
    可以为对象名,类似:<bean id="" class="ActiveMQConnectionFactory"/>-->
    <amq:connectionFactory brokerURL="tcp://192.168.254.128:61616"
                           userName="admin" password="admin" id="amqConnectionFactory"/>

    <!--spring管理JMS相关代码的时候,必须依赖jms标签库,Spring-jms提供标签库-->
    <!--
        定义Spring-jms中的连接工厂对象
        CachingConnectionFactory - spring框架提供的连接工厂对象,不能真正访问MOM容器,
        类似一个工厂的代理对象,需要提供一个真实工厂,实现MOM容器的连接访问
    -->
    <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
        <property name="connectionFactory" ref="amqConnectionFactory"/>
        <property name="maxConnections" value="10"/>
    </bean>

    <!--配置有缓存的ConnectionFactory,Session的缓存大小可定制-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="3"/>
    </bean>

    <!--jmsTemplate配置-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--给定连接工厂-->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <!--默认目的地命名-->
        <property name="defaultDestinationName" value="test-spring"/>
    </bean>
</beans>
2.创建项目

spring-activemq-consumer

2.1 添加依赖
    <dependencies>
        <!--activeMQ-->
        <!--ActiveMQ客户端完整jar包依赖-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.geronimo.specs</groupId>
                    <artifactId>geronimo-jms_1.1_spec</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--ActiveMQ和Spring整合配置文件标签处理jar包依赖-->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
        </dependency>
        <!--Spring JMS插件相关的jar包依赖-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>

        <!--Active Pool-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-jms-pool</artifactId>
        </dependency>

        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <!-- 日志处理 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>
        <!-- spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
        </dependency>

        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
        </dependency>
    </dependencies>
2.2 整合ActiveMQ
  • spring-service.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--扫描bean对象-->
    <context:component-scan base-package="com.hxx.service,com.hxx.listener"/>
</beans>
  • spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <!--创建一个连接工厂,连接ActiveMQ,ActiveMQConnectionFactory,需要依赖ActiveMQ提供的amq标签-->
    <!--amq:connectionFactory是bean的子标签,会在Spring容器中创建一个bean对象,
    可以为对象名,类似:<bean id="" class="ActiveMQConnectionFactory"/>-->
    <amq:connectionFactory brokerURL="tcp://192.168.254.128:61616"
                           userName="admin" password="admin" id="amqConnectionFactory"/>

    <!--spring管理JMS相关代码的时候,必须依赖jms标签库,Spring-jms提供标签库-->
    <!--
        定义Spring-jms中的连接工厂对象
        CachingConnectionFactory - spring框架提供的连接工厂对象,不能真正访问MOM容器,
        类似一个工厂的代理对象,需要提供一个真实工厂,实现MOM容器的连接访问
    -->
    <!--配置有缓存的ConnectionFactory,Session的缓存大小可定制-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="3"/>
    </bean>

    <!--注册监听器-->
    <!--
        开始注册监听
        需要的参数有:
            acknowledge:消息确认机制
            container-type:simple|default
            simple:SimpleMessageListenerContainer最简单的消息监听器容器,只能处理固定数量的JMS会话
            default:DefaultMessageListenerContainer是一种用于异步消息监听的管理类,且支持事务
            destination-type:目的地类型,使用队列作为目的地,
            connection-factory:连接工厂,spring-jms使用的工厂,必须是spring自主创建的
                                不能使用第三方工具创建工程,如:ActiveMQConnectionFactory
    -->
    <jms:listener-container acknowledge="auto" container-type="default"
                            destination-type="queue" connection-factory="cachingConnectionFactory">
        <!--
            在监听器容器中注册某监听对象,
            destination - 设置目的地命名
            ref - 指定监听器对象
        -->
        <jms:listener destination="test-spring" ref="myListener"/>
    </jms:listener-container>
</beans>
  • 创建MyMessageListener
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {

    }
}
  • 测试
public class TestActiveMQ {

    public static void main(String[] args) throws IOException {
        ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext(new String[]{"classpath:spring-jms.xml"
                ,"classpath:spring-service.xml"});
        ac.start();
        System.out.println("spring容器启动");

        System.in.read();
    }
}
image.png
3.测试整合
需求:

1.在Producer中创建User类
2.将User对象传递到ActiveMQ中
3.在Consumer中获取User对象并在控制台打印

3.1 Producer发送消息
@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void addUser(final User user) {
        jmsTemplate.send(new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                //发送消息
                return session.createObjectMessage(user);
            }
        });
    }
}

发送成功


image.png
3.2 Consumer接收消息
  • userServiceImpl.java
@Service
public class UserServiceImpl implements UserService {
    @Override
    public void showUser(User user) {
        System.out.println(user);
    }
}
  • MyMessageListener.java
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {

    @Autowired
    private UserService userService;

    @Override
    public void onMessage(Message message) {
        Serializable obj = null;
        try {
            obj = ((ObjectMessage) message).getObject();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        User user = (User) obj;
        userService.showUser(user);
    }
}
image.png
image.png

相关文章

  • ActiveMq 学习笔记

    1、ActiveMq安装启动 1、从官网http://activemq.apache.org/download.h...

  • ActiveMQ学习笔记

    WHAT 是Apache出品,对spring支持很好,可以很容易内嵌到spring系统里。 ActiveMQ消息形...

  • ActiveMQ学习笔记

    一、ActiveMQ简介 1.什么是ActiveMQ ActiveMQ是Apache出品,最流行的,能力强劲的开源...

  • ActiveMQ学习笔记

    1. JMS基本概念 JMS Java Message Service,Java消息服务,是Java EE中的一个...

  • java消息队列ActiveMQ的简单使用

    activeMQ 是学习java消息队列的实现项目,使用jfinal +jfinal-ext + activeMQ...

  • ActiveMQ Topic消息重发

    MQ学习系列: 消息队列概念与认知 ActiveMQ Topic消息重发 一、ActiveMQ Topic 消息重...

  • ActiveMQ学习(三)Topic订阅模式

    说明 基于ActiveMQ学习(二),学习ActiveMQ的另一种消息模式Topic订阅模式,同时将普通的文本消息...

  • ActiveMQ学习

    JMS JMS(Java Message Service):java消息服务,它是一套API,定义了Java程序访...

  • ActiveMQ学习

    MQ入门总结(三)ActiveMQ的用法和实现

  • 互联网架构师笔记

    互联网架构师学习笔记整理-完善中 一、并发编程 + ActiveMQ + 实战案例 并发编程基础篇 第一天 1、课...

网友评论

      本文标题:ActiveMQ学习笔记

      本文链接:https://www.haomeiwen.com/subject/rckorctx.html