ActiveMQ

作者: 老魏313 | 来源:发表于2017-12-12 17:09 被阅读0次

    ActiveMQ 使用版本: 5.13.5
    下载地址:
    链接:https://pan.baidu.com/s/1jIvN0gu 密码:am2r

    jdk 版本1.8
    Mac版下载地址:
    链接:https://pan.baidu.com/s/1gfnLWAZ 密码:tj6t

    启动mq步骤:

    1. 首先进到mq 的bin 文件夹下
      cd + 文件路径/bin
    2. 启动 activemq
      ./activemq start
    启动mq.png
    1. 在启动时可能会报错, 应该是没有执行权限
      使用 ls -l 命令查看权限
      修改权限命令自行百度
    查看权限.png
    1. 验证是否启动成功:
      http://127.0.0.1:8161/admin/queues.jsp
      账号和密码一般都是admin
      image.png

    编写mq代码

    创建maven项目

    pom文件 添加依赖:

        <dependencies>
            <!-- spring核心配置-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>${org.springframework.version}</version>
            </dependency>
            <!--spring test 结合 junit 进行测试-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>3.2.4.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>${junit.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.11.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.4.2</version>
            </dependency>
        </dependencies>
    

    点对点消息模式

    如果有两个消费者同时开启, 两个消费者都会收到部分数据, 加起来的数据为一份. 因此要保证消息目的地唯一.

    1. 创建生产者:
        // 获取默认的用户名, 密码, 地址
        private static final String UserName = ActiveMQConnection.DEFAULT_USER;
        private static final String password = ActiveMQConnection.DEFAULT_PASSWORD;
        private static final String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory(UserName, password, url);
            try {
                // 创建连接
                Connection connection = factory.createConnection();
                // 启动连接
                connection.start();
    
                // 是否支持事务,如果为true,则第二个参数被设置为SESSION_TRANSACTED
                //Session.AUTO_ACKNOWLEDGE为自动确认,不管成功失败
                //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端调用acknowledge方法时服务器删除消息
                //DUPS_OK_ACKNOWLEDGE允许重复确认模式。
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                // 消息的目的地
                Destination destination = session.createQueue("FirstQueue");
    
                // 消费生产者
                MessageProducer producer = session.createProducer(destination);
    
                for (int i = 0; i < 5; i++) {
                    // 创建文本消息
                    TextMessage message = session.createTextMessage("message" + i);
                    producer.send(message);
                }
                // 如果创建session时为true, 则放开下面语句
    //            session.commit();
                session.close();
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    1. 创建消费者
    ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
            try {
                Connection connection = factory.createConnection();
                connection.start();
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 点对点模式
                Destination destination = session.createQueue("test1");
                // 创建消费者
                MessageConsumer consumer = session.createConsumer(destination);
                while (true){
                    TextMessage message = (TextMessage) consumer.receive();
                    if (message!=null){
                        System.out.println("收到的信息为: "+ message.getText());
                    }else {
                        break;
                    }
                }
                session.close();
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    

    订阅模式

    与点对点模式代码雷同, 只写不同部分
    只是生产者和消费者中的消息目的地创建的方式不同

        // 创建消息目的地时将createQueue("name") 换成createTopic("name")
        Destination destination = session.createTopic("test1");
    

    持久化订阅

    生产者模块代码不变, 消费者添加身份识别
    消费者代码修改如下:

     ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
            try {
                Connection connection = factory.createConnection();
    
                // 持久化订阅时添加
                connection.setClientID("bbb");
                connection.start();
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                // 持久化订阅时添加
                Topic topic = session.createTopic("test1");
                MessageConsumer consumer = session.createDurableSubscriber(topic, "bbb");
    
                while (true){
                    TextMessage message = (TextMessage) consumer.receive();
                    if (message!=null){
                        System.out.println("收到的信息为: "+ message.getText());
                    }else {
                        break;
                    }
                }
                session.close();
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
    

    注:该模式下一定要先注册消费者, 然后再发送消息

    消息过滤

    1. 生产者
    // 设置超时时间
    producer.setTimeToLive(10 * 1000);
    MapMessage message1 = session.createMapMessage();
    message1.setString("name", "laowei");
    message1.setIntProperty("age", 19);
    
    MapMessage message2 = session.createMapMessage();
    message2.setString("name", "xiaowang");
    message2.setIntProperty("age", 10);
    
    /**
     *message : 发送的消息
     * DeliveryMode: 是否持久化
     * priority优先级
     * timeToLive 消息过期时间
     */
    producer.send(message1, DeliveryMode.NON_PERSISTENT, 4,1000*60*10);
    producer.send(message2, DeliveryMode.NON_PERSISTENT, 4,1000*60*10);
    
    1. 消费者:
    String condition = "age>=20";
    Destination destination = session.createTopic("test1");
    MessageConsumer consumer = session.createConsumer(destination, condition);
    

    activeMQ 与spring 相结合

    配置:

    1. activemq.properties
    ## ActiveMQ Config
    activemq.brokerURL=tcp\://127.0.0.1\:61616
    activemq.userName=admin
    activemq.password=admin
    activemq.pool.maxConnections=10
    #queueName
    activemq.queueName=myspringqueue
    
    1. spring-context.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd"
           default-autowire="byName">
           <!-- 读入配置属性文件 -->
           <context:property-placeholder location="classpath:activemq.properties" />
           <!-- 注释配置 -->
           <context:annotation-config />
           <!-- 扫描包起始位置 -->
           <context:component-scan base-package="com.laowei.springmq" />
           <import resource="classpath:spring-activemq.xml" />
    </beans>
    
    1. sping-activemq.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
           <!-- 第三方MQ工厂: ConnectionFactory -->
           <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
                  <!-- ActiveMQ Address -->
                  <property name="brokerURL" value="${activemq.brokerURL}" />
                  <property name="userName" value="${activemq.userName}"></property>
                  <property name="password" value="${activemq.password}"></property>
           </bean>
    
           <!--
               ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
               可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包
            -->
           <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
                  <property name="connectionFactory" ref="targetConnectionFactory" />
                  <property name="maxConnections" value="${activemq.pool.maxConnections}" />
           </bean>
           <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
           <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
                  <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
                  <property name="targetConnectionFactory" ref="pooledConnectionFactory" />
           </bean>
    
           <!--这个是目的地-->
           <bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
                  <constructor-arg>
                         <value>${activemq.queueName}</value>
                  </constructor-arg>
           </bean>
           <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
           <!-- 队列模板 -->
           <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
                  <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
                  <property name="connectionFactory" ref="connectionFactory"/>
                  <property name="defaultDestinationName" value="${activemq.queueName}"></property>
           </bean>
           <!-- 配置自定义监听:MessageListener -->
           <bean id="msgQueueMessageListener" class="com.laowei.springmq.Consumer"></bean>
    
           <!-- 将连接工厂、目标对了、自定义监听注入jms模板 -->
           <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                  <property name="connectionFactory" ref="connectionFactory" />
                  <property name="destination" ref="msgQueue" />
                  <property name="messageListener" ref="msgQueueMessageListener" />
           </bean>
    </beans>
    
    1. 生产者代码:
    @Service("activeMQProducer")
    public class Product {
        private JmsTemplate jmsTemplate;
    
        public JmsTemplate getJmsTemplate() {
            return jmsTemplate;
        }
    
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
    
        public void sendMessage(final String info){
            jmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(info);
                }
            });
        }
    }
    
    1. 消费者
    public class Consumer implements SessionAwareMessageListener<Message> {
        public void onMessage(Message message, Session session) throws JMSException {
            if (message instanceof TextMessage){
                System.out.println("*******" +((TextMessage) message).getText());
            }
        }
    }
    
    1. 测试代码:
    public class TestProduct extends BaseJunit4Test{
        @Autowired
        private Product product;
        @Test
        public void sendmessage() throws InterruptedException {
            while (true){
                Thread.sleep(3000);
                product.sendMessage("hahaha");
            }
        }
    }
    

    代码地址:
    https://github.com/weijun8687/ActiveMQ.git

    参考文章:
    http://blog.csdn.net/fulai0_0/article/details/52127320
    http://www.mytju.com/classcode/news_readNews.asp?newsID=486

    相关文章

      网友评论

          本文标题:ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/ajfnixtx.html