美文网首页
JW-Activemq

JW-Activemq

作者: Zephyr_07 | 来源:发表于2019-04-17 14:48 被阅读0次

    ActiveMQ是Apache研发的消息中间件,是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。

    消息的传递有两种形式:
    ①点对点 ,即生产者和消费者一一对应。
    ②发布/订阅形式(广播),即一生产者产生消息并发送后,可以由多个消费者接收。
    JMS提供五种不同的消息正文格式。
    StreamMessage - Java原始数据流
    MapMessage - 键值对
    TextMessage - 字符串
    ObjectMessage - 序列化的JAVA对象
    BytesMessage - 字节流

    解决后台访问error:405问题:
    机器名:etc/sysconfig/network文件中查看

    NETWORKING=yes
    HOSTNAME=localhost.localdomain
    

    修改hosts文件127.0.0.1 域名映射。

    127.0.0.1 localhost localhost.localdomain
    

    Queue方式,消息默认持久化

    Producer

        /**
         * 点到点形式发送消息
         * <p>Title: testQueueProducer</p>
         * <p>Description: </p>
         * @throws Exception
         */
        @Test
        public void testQueueProducer() throws Exception {
            //1、创建一个连接工厂对象,需要指定服务的ip及端口。
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
            //2、使用工厂对象创建一个Connection对象。
            Connection connection = connectionFactory.createConnection();
            //3、开启连接,调用Connection对象的start方法。
            connection.start();
            //4、创建一个Session对象。
            //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
            //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
            Queue queue = session.createQueue("test-queue");
            //6、使用Session对象创建一个Producer对象。
            MessageProducer producer = session.createProducer(queue);
            //7、创建一个Message对象,可以使用TextMessage。
            /*TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("hello Activemq");*/
            TextMessage textMessage = session.createTextMessage("hello activemq");
            //8、发送消息
            producer.send(textMessage);
            //9、关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    

    Consumer

        @Test
        public void testQueueConsumer() throws Exception {
            //创建一个ConnectionFactory对象连接MQ服务器
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
            //创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //使用Connection对象创建一个Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个Destination对象。queue对象
            Queue queue = session.createQueue("test-queue");
            //使用Session对象创建一个消费者对象。
            MessageConsumer consumer = session.createConsumer(queue);
            //接收消息
            consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message message) {
                    //打印结果
                    TextMessage textMessage = (TextMessage) message;
                    String text;
                    try {
                        text = textMessage.getText();
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    
                }
            });
            //等待接收消息
            System.in.read();
            //关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    

    Topic方式,消息默认不持久化

    Producer

        @Test
        public void testTopicProducer() throws Exception {
            //1、创建一个连接工厂对象,需要指定服务的ip及端口。
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
            //2、使用工厂对象创建一个Connection对象。
            Connection connection = connectionFactory.createConnection();
            //3、开启连接,调用Connection对象的start方法。
            connection.start();
            //4、创建一个Session对象。
            //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
            //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic
            Topic topic = session.createTopic("test-topic");
            //6、使用Session对象创建一个Producer对象。
            MessageProducer producer = session.createProducer(topic);
            //7、创建一个Message对象,可以使用TextMessage。
            /*TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("hello Activemq");*/
            TextMessage textMessage = session.createTextMessage("topic message");
            //8、发送消息
            producer.send(textMessage);
            //9、关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    

    Consumer

        @Test
        public void testTopicConsumer() throws Exception {
            //创建一个ConnectionFactory对象连接MQ服务器
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
            //创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //使用Connection对象创建一个Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个Destination对象。topic对象
            Topic topic = session.createTopic("test-topic");
            //使用Session对象创建一个消费者对象。
            MessageConsumer consumer = session.createConsumer(topic);
            //接收消息
            consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message message) {
                    //打印结果
                    TextMessage textMessage = (TextMessage) message;
                    String text;
                    try {
                        text = textMessage.getText();
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    
                }
            });
            System.out.println("topic消费者3启动。。。。");
            //等待接收消息
            System.in.read();
            //关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    

    整合Spring

    引入相关jar包:

            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
            </dependency>
    

    spring-activemq.xml配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
        
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.161:61616" />
        </bean>
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory"
            class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="targetConnectionFactory" />
        </bean>
        <!--这个是队列目的地,点对点的 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg>
                <value>spring-queue</value>
            </constructor-arg>
        </bean>
        <!--这个是主题目的地,一对多的 -->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="itemAddTopic" />
        </bean>
        <bean id="myMessageListener" class="cn.e3mall.search.message.MyMessageListener"/>
        <!-- 消息监听容器 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queueDestination" />
            <property name="messageListener" ref="myMessageListener" />
        </bean>
        <!-- 监听商品添加消息,同步索引库 -->
        <bean id="itemAddMessageListener" class="cn.e3mall.search.message.ItemAddMessageListener"/>
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="topicDestination" />
            <property name="messageListener" ref="itemAddMessageListener" />
        </bean>
    </beans>
    

    发送消息

    public class ActiveMqSpring {
    
        @Test
        public void sendMessage() throws Exception {
            //初始化spring容器
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
            //从容器中获得JmsTemplate对象。
            JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
            //从容器中获得一个Destination对象。
            Destination destination = (Destination) applicationContext.getBean("queueDestination");
            //发送消息
            jmsTemplate.send(destination, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("send activemq message");
                }
            });
        }
    }
    

    接收消息

    ItemAddMessageListener.java

    public class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            //取消息内容
            TextMessage textMessage = (TextMessage) message;
            try {
                String text = textMessage.getText();
                System.out.println(text);
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
            
        }
    
    }
    

    spring-activemq.xml配置接收器,随spring启动开启监听

        <!-- 监听商品添加消息,同步索引库 -->
        <bean id="itemAddMessageListener" class="cn.e3mall.search.message.ItemAddMessageListener"/>
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="topicDestination" />
            <property name="messageListener" ref="itemAddMessageListener" />
        </bean>
    

    相关文章

      网友评论

          本文标题:JW-Activemq

          本文链接:https://www.haomeiwen.com/subject/ecavwqtx.html