美文网首页
2020-12-21

2020-12-21

作者: 风雨无阻zql | 来源:发表于2020-12-21 20:40 被阅读0次

    ActiveMq队列spring实现,案列如下

    (1)pom.xml引入相关jar

    <!-- activeMQ相关 begin-->

    <dependency>

    <groupId>org.apache.activemq</groupId>

    <artifactId>activemq-all</artifactId>

    <version>5.11.1</version></dependency>

    <dependency>

    <groupId>org.springframework</groupId>

    <artifactId>spring-jms</artifactId>

    <version>4.1.4.RELEASE</version>

    </dependency>

    (2)添加生产者配置activemq-sender.xml

    <description>JMS发布者应用配置</description>

        <!-- CachingConnectionFactory 连接工厂 (有缓存功能)-->

        <bean id="cachingConnectionFactory"

            class="org.springframework.jms.connection.CachingConnectionFactory">

            <!-- Session缓存数量 -->

            <property name="sessionCacheSize" value="20" />

            <property name="targetConnectionFactory"> 

                <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 

                    <!-- MQ地址 账户名 密码--> 

                    <property name="brokerURL" value="tcp://192.168.56.129:61616" />

                    <property name="userName" value="parry" />

                    <property name="password" value="parry123" />

                    <!-- 是否异步发送 -->

                    <property name="useAsyncSend" value="true"/>

                </bean> 

            </property> 

        </bean>

        <!-- 接收消息的目的地(一个主题)点对点队列 -->

        <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">

            <!-- 设置消息主题的名字 -->

            <constructor-arg index="0" value="messages" />

        </bean>

        <!-- 接收配置JMS模版 -->

        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

            <property name="connectionFactory" ref="cachingConnectionFactory" />

            <property name="defaultDestination" ref="destination" />

            <!-- value为true为发布/订阅模式; value为false为点对点模式-->

            <property name="pubSubDomain" value="false"/>

        </bean>

    (3)添加消费者配置activemq-consumer.xml

    <description>JMS订阅者应用配置</description>

        <!-- CachingConnectionFactory 连接工厂 (有缓存功能)-->

        <bean id="cachingConnectionFactory"

            class="org.springframework.jms.connection.CachingConnectionFactory">

            <!-- Session缓存数量 -->

            <property name="sessionCacheSize" value="20" />

            <property name="targetConnectionFactory"> 

                <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 

                    <!-- MQ地址 账户名 密码--> 

                    <property name="brokerURL" value="tcp://192.168.56.129:61616" />

                    <property name="userName" value="parry" />

                    <property name="password" value="parry123" />

                    <!-- 是否异步发送 -->

                    <property name="useAsyncSend" value="true"/>

                </bean> 

            </property> 

        </bean>

        <!-- 接收消息的目的地(一个主题)点对点队列 -->

        <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">

            <!-- 设置消息主题的名字 -->

            <constructor-arg index="0" value="messages" />

        </bean>

        <!-- 消费者配置 (自己定义) -->

        <bean id="consumer" class="com.parry.MQ.funcion.Listener" />

        <!-- 消息监听容器 -->

        <bean id="myListenerContainer"

            class="org.springframework.jms.listener.DefaultMessageListenerContainer">

            <property name="connectionFactory" ref="cachingConnectionFactory" />

            <property name="destination" ref="destination" />

            <property name="messageListener" ref="consumer" />

            <!-- 如果消息的接收速率,大于消息处理的速率时,可以采取线程池方式 -->

            <property name="taskExecutor" ref="queueMessageExecutor"/>

            <!-- 设置固定的线程数 -->

            <property name="concurrentConsumers" value="30"/>

            <!-- 设置动态的线程数 -->

            <property name="concurrency" value="20-50"/>

            <!-- 设置最大的线程数 -->

            <property name="maxConcurrentConsumers" value="80"/>

        </bean>

        <bean id="queueMessageExecutor"

            class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

            <property name="corePoolSize" value="30" />

            <property name="maxPoolSize" value="80" />

            <property name="daemon" value="true" />

            <property name="keepAliveSeconds" value="120" />

        </bean>

     (4)新建一个发送消息的方法

    import javax.jms.JMSException;

    import javax.jms.Message;

    import javax.jms.Session;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.jms.core.JmsTemplate;

    import org.springframework.jms.core.MessageCreator;

    import org.springframework.stereotype.Component;

    /**

    * 发送消息

    * @author Administrator

    *

    */

    @Component

    public class QueueSender {

        @Autowired

        private JmsTemplate myJmsTemplate;

        /**

        * 发送一条消息到指定的队列(目标)

        *

        * @param queueName

        *            队列名称

        * @param message

        *            消息内容

        */

        public void send(String queueName, final String message) {

            myJmsTemplate.send(queueName, new MessageCreator() {

                public Message createMessage(Session session) throws JMSException {

                    return session.createTextMessage(message);

                }

            });

        }

    }

     (5)添加监听器

    package com.parry.MQ.funcion;

    import javax.jms.JMSException;

    import javax.jms.Message;

    import javax.jms.MessageListener;

    import javax.jms.TextMessage;

    /**

    * 接收者监听类

    * @author Administrator

    *

    */

    public class Listener implements MessageListener {

        public void onMessage(Message message) {

            // 业务处理

            try {

                TextMessage message2 = (TextMessage) message;

                System.out.println("接收到信息:" + message2.getText());

            } catch (JMSException e) {

                e.printStackTrace();

            }

        }

    }

    (6)写个一请求测试一下

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Controller;

    import org.springframework.web.bind.annotation.RequestMapping;

    import org.springframework.web.bind.annotation.ResponseBody;

    import com.parry.MQ.funcion.QueueSender;

    @Controller

    public class App {

        @Autowired

        private QueueSender sender;

        @RequestMapping("test")

        @ResponseBody

        public String Test() {

            sender.send("messages", "你好,这是我的第一条消息!");

            return "Hello world";

        }

    }

    (7)开启服务,访问路径测试


    相关文章

      网友评论

          本文标题:2020-12-21

          本文链接:https://www.haomeiwen.com/subject/dqednktx.html