美文网首页
ActiveMQ入门

ActiveMQ入门

作者: 長得太帥忚四種檌 | 来源:发表于2017-12-20 10:59 被阅读10次
    • ActiveMQ安装
    1.先从官网下载ActiveMQ工程
    http://activemq.apache.org/
    下载地址:
    http://www.apache.org/dyn/closer.cgi?filename=/activemq/5.15.2/apache-activemq-5.15.2-bin.zip&action=download
    
    2.解压缩下载到的压缩包
    
    3.进入解压后的文件夹中, 进入bin目录中, 根据自己的系统版本, 找到对应的文件启动服务
     例如, 我的是windows 64 位系统, 所以, 我进入的是bin\win64, 双击activemq.bat即可启动服务了!
    

    点对点模式

    • 发送消息
            //1.创建一个连接工厂对象,指定服务的IP和端口号
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2.使用工厂对象创建一个连接对象
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            //4.使用连接对象创建一个Session对象
            //第一个参数表示是否开启事务,一般不开启:false
            //当第一个参数为false时,第二个参数才会有意义
            //第二个参数表示应答模式:可以是手动应答或者自动应答, 一般是自动应答:Session.AUTO_ACKNOWLEDGE
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.使用Session对象创建一个queue对象
            Queue queue = session.createQueue("text-queue");
            //6.使用Session对象创建一个生产者对象
            MessageProducer producer = session.createProducer(queue);
            //7.创建一条消息
            TextMessage message = new ActiveMQTextMessage();
            message.setText("text message");
            //8.使用生产者对象发送消息
            producer.send(message);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
            System.out.println("消息发送完成!");
    
    • 接收消息

    第一种方式:

            //创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //使用工厂产生连接对象
            Connection connection = factory.createConnection();
            //开启连接
            connection.start();
            //通过连接对象创建一个session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个queue对象
            Queue queue = session.createQueue("text-queue");
            //通过session创建一个消费者
            MessageConsumer consumer = session.createConsumer(queue);
            //使用消费者接收消息
            while(true) {
                Message message = consumer.receive(2000);//2秒后没接收到消息返回null
                if(message==null) {
                    break;
                }
                TextMessage textMessage = (TextMessage) message;
                //打印接收到的消息
                String text = textMessage.getText();
                
                System.out.println(text);
            }
            
            //关闭资源
            consumer.close();
            session.close();
            connection.close();
    

    第二种方式:

            //创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //使用工厂产生连接对象
            Connection connection = factory.createConnection();
            //开启连接
            connection.start();
            //通过连接对象创建一个session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个queue对象
            Queue queue = session.createQueue("text-queue");
            //通过session创建一个消费者
            MessageConsumer consumer = session.createConsumer(queue);
            //使用消费者接收消息
            /*while(true) {
                Message message = consumer.receive(2000);//2秒后没接收到消息返回null
                if(message==null) {
                    break;
                }
                TextMessage textMessage = (TextMessage) message;
                //打印接收到的消息
                String text = textMessage.getText();
                
                System.out.println(text);
            }*/
            
            
            consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    //打印接收到的消息
                    String text="";
                    try {
                        text = textMessage.getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    System.out.println(text);
                }
            });
            
            System.in.read();//让程序等待, 等待消息被读取出来, 测试时用的
            //关闭资源
            consumer.close();
            session.close();
            connection.close();
    

    订阅模式

    • 发送消息
            //创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //使用工厂创建连接对象
            Connection connection = factory.createConnection();
            //使用开启连接
            connection.start();
            //使用连接对象创建Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            //使用Session对象创建Topic对象
            Topic topic = session.createTopic("text-topic");
            
            //使用Session创建生产者对象
            MessageProducer producer = session.createProducer(topic);
            
            //创建一个消息对象
            TextMessage textMessage = session.createTextMessage("这是一个topic消息");
            
            //使用生产者发送消息
            producer.send(textMessage);
            //关闭资源
            producer.close();
            session.close();
            connection.close();
            System.out.println("消息发送完成!");
    
    
    • 消息接收
            //创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //创建连接对象
            Connection connection = factory.createConnection();
            //开启连接
            connection.start();
            //创建session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建topic对象
            Topic topic = session.createTopic("text-topic");
            //创建消费者对象
            MessageConsumer consumer = session.createConsumer(topic);
            System.out.println("客户端3启动了...");
            //接收消息
            consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        String text = textMessage.getText();
                        //打印消息
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            
            //程序等待
            System.in.read();
            //关闭资源
            consumer.close();
            session.close();
            connection.close();
    

    Spring 整合 ActiveMQ

    • 加入jar包:

      • spring-jms.jar
      • spring-context-support.jar
    • 发送消息
      在spring配置文件中加入以下内容:

        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.168:61616" />
        </bean>
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory"
            class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="targetConnectionFactory" />
        </bean>
        <!-- 配置生产者 -->
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
            <property name="connectionFactory" ref="connectionFactory" />
        </bean>
        <!--这个是队列目的地,点对点的 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg>
                <value>spring-queue</value>
            </constructor-arg>
        </bean>
        <!--这个是主题目的地,一对多的 -->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="item-add-topic" />
        </bean>
    
    
    • 发送消息代码示例:
            //初始化spring容器: 测试用
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:/spring/applicationContext-activemq.xml");
            //从容器中获得JMSTemplate对象
            JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
            //从容器在获得Destination对象
            Queue queue = applicationContext.getBean(Queue.class);
            //第一个参数:指定发送的目的地
            //第二个参数:消息的构造器对象
            jmsTemplate.send(queue, new MessageCreator() {
                
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage("使用spring和active整合发送queue消息aaaaaa");
                    return textMessage;
                }
            });
    
    
    • 接收消息
      在spring配置文件中配置:
    
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.168:61616" />
        </bean>
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory"
            class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="targetConnectionFactory" />
        </bean>
        <!--这个是队列目的地,点对点的 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg>
                <value>spring-queue</value>
            </constructor-arg>
        </bean>
        <!--这个是主题目的地,一对多的 -->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="item-add-topic" />
        </bean>
        
        <!-- 配置消息监听器1 -->
        <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener"/>
    
        <!-- 消息监听容器1 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queueDestination" />
            <property name="messageListener" ref="myMessageListener" />
        </bean>
        <!-- 配置消息监听器2 -->
        <bean id="itemAddListener" class="com.taotao.search.listener.ItemAddListener"/>
    
        <!-- 消息监听容器2 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="topicDestination" />
            <property name="messageListener" ref="itemAddListener" />
        </bean>
    
    

    接收消息示例代码

    /**
     * 接收activemq队列消息的监听器
     * <p>Title: MyMessageListener</p>
     * <p>Description: </p>
     * <p>Company: www.itcast.cn</p> 
     * @version 1.0
     */
    public class MyMessageListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            //取消息的内容
            try {
                TextMessage textMessage = (TextMessage) message;
                //取内容
                String text = textMessage.getText();
                System.out.println(text);
                //其他业务逻辑
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:ActiveMQ入门

          本文链接:https://www.haomeiwen.com/subject/pmtiwxtx.html