ActiveMq队列spring实现,案列如下
(1)pom.xml引入相关jar
<!-- activeMQ相关 begin-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.1</version></dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.4.RELEASE</version>
</dependency>
(2)添加生产者配置activemq-sender.xml
<description>JMS发布者应用配置</description>
<!-- CachingConnectionFactory 连接工厂 (有缓存功能)-->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="20" />
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- MQ地址 账户名 密码-->
<property name="brokerURL" value="tcp://192.168.56.129:61616" />
<property name="userName" value="parry" />
<property name="password" value="parry123" />
<!-- 是否异步发送 -->
<property name="useAsyncSend" value="true"/>
</bean>
</property>
</bean>
<!-- 接收消息的目的地(一个主题)点对点队列 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息主题的名字 -->
<constructor-arg index="0" value="messages" />
</bean>
<!-- 接收配置JMS模版 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="defaultDestination" ref="destination" />
<!-- value为true为发布/订阅模式; value为false为点对点模式-->
<property name="pubSubDomain" value="false"/>
</bean>
(3)添加消费者配置activemq-consumer.xml
<description>JMS订阅者应用配置</description>
<!-- CachingConnectionFactory 连接工厂 (有缓存功能)-->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="20" />
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- MQ地址 账户名 密码-->
<property name="brokerURL" value="tcp://192.168.56.129:61616" />
<property name="userName" value="parry" />
<property name="password" value="parry123" />
<!-- 是否异步发送 -->
<property name="useAsyncSend" value="true"/>
</bean>
</property>
</bean>
<!-- 接收消息的目的地(一个主题)点对点队列 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息主题的名字 -->
<constructor-arg index="0" value="messages" />
</bean>
<!-- 消费者配置 (自己定义) -->
<bean id="consumer" class="com.parry.MQ.funcion.Listener" />
<!-- 消息监听容器 -->
<bean id="myListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="consumer" />
<!-- 如果消息的接收速率,大于消息处理的速率时,可以采取线程池方式 -->
<property name="taskExecutor" ref="queueMessageExecutor"/>
<!-- 设置固定的线程数 -->
<property name="concurrentConsumers" value="30"/>
<!-- 设置动态的线程数 -->
<property name="concurrency" value="20-50"/>
<!-- 设置最大的线程数 -->
<property name="maxConcurrentConsumers" value="80"/>
</bean>
<bean id="queueMessageExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="30" />
<property name="maxPoolSize" value="80" />
<property name="daemon" value="true" />
<property name="keepAliveSeconds" value="120" />
</bean>
(4)新建一个发送消息的方法
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
/**
* 发送消息
* @author Administrator
*
*/
@Component
public class QueueSender {
@Autowired
private JmsTemplate myJmsTemplate;
/**
* 发送一条消息到指定的队列(目标)
*
* @param queueName
* 队列名称
* @param message
* 消息内容
*/
public void send(String queueName, final String message) {
myJmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
(5)添加监听器
package com.parry.MQ.funcion;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 接收者监听类
* @author Administrator
*
*/
public class Listener implements MessageListener {
public void onMessage(Message message) {
// 业务处理
try {
TextMessage message2 = (TextMessage) message;
System.out.println("接收到信息:" + message2.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
(6)写个一请求测试一下
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.parry.MQ.funcion.QueueSender;
@Controller
public class App {
@Autowired
private QueueSender sender;
@RequestMapping("test")
@ResponseBody
public String Test() {
sender.send("messages", "你好,这是我的第一条消息!");
return "Hello world";
}
}
(7)开启服务,访问路径测试
网友评论