ActiveMQ 使用版本: 5.13.5
下载地址:
链接:https://pan.baidu.com/s/1jIvN0gu 密码:am2r
jdk 版本1.8
Mac版下载地址:
链接:https://pan.baidu.com/s/1gfnLWAZ 密码:tj6t
启动mq步骤:
- 首先进到mq 的bin 文件夹下
cd + 文件路径/bin - 启动 activemq
./activemq start
- 在启动时可能会报错, 应该是没有执行权限
使用 ls -l 命令查看权限
修改权限命令自行百度
- 验证是否启动成功:
http://127.0.0.1:8161/admin/queues.jsp
账号和密码一般都是admin
image.png
编写mq代码
创建maven项目
pom文件 添加依赖:
<dependencies>
<!-- spring核心配置-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${org.springframework.version}</version>
</dependency>
<!--spring test 结合 junit 进行测试-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>3.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
点对点消息模式
如果有两个消费者同时开启, 两个消费者都会收到部分数据, 加起来的数据为一份. 因此要保证消息目的地唯一.
- 创建生产者:
// 获取默认的用户名, 密码, 地址
private static final String UserName = ActiveMQConnection.DEFAULT_USER;
private static final String password = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(UserName, password, url);
try {
// 创建连接
Connection connection = factory.createConnection();
// 启动连接
connection.start();
// 是否支持事务,如果为true,则第二个参数被设置为SESSION_TRANSACTED
//Session.AUTO_ACKNOWLEDGE为自动确认,不管成功失败
//Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端调用acknowledge方法时服务器删除消息
//DUPS_OK_ACKNOWLEDGE允许重复确认模式。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 消息的目的地
Destination destination = session.createQueue("FirstQueue");
// 消费生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 5; i++) {
// 创建文本消息
TextMessage message = session.createTextMessage("message" + i);
producer.send(message);
}
// 如果创建session时为true, 则放开下面语句
// session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
- 创建消费者
ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
try {
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 点对点模式
Destination destination = session.createQueue("test1");
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
while (true){
TextMessage message = (TextMessage) consumer.receive();
if (message!=null){
System.out.println("收到的信息为: "+ message.getText());
}else {
break;
}
}
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
订阅模式
与点对点模式代码雷同, 只写不同部分
只是生产者和消费者中的消息目的地创建的方式不同
// 创建消息目的地时将createQueue("name") 换成createTopic("name")
Destination destination = session.createTopic("test1");
持久化订阅
生产者模块代码不变, 消费者添加身份识别
消费者代码修改如下:
ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
try {
Connection connection = factory.createConnection();
// 持久化订阅时添加
connection.setClientID("bbb");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 持久化订阅时添加
Topic topic = session.createTopic("test1");
MessageConsumer consumer = session.createDurableSubscriber(topic, "bbb");
while (true){
TextMessage message = (TextMessage) consumer.receive();
if (message!=null){
System.out.println("收到的信息为: "+ message.getText());
}else {
break;
}
}
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
注:该模式下一定要先注册消费者, 然后再发送消息
消息过滤
- 生产者
// 设置超时时间
producer.setTimeToLive(10 * 1000);
MapMessage message1 = session.createMapMessage();
message1.setString("name", "laowei");
message1.setIntProperty("age", 19);
MapMessage message2 = session.createMapMessage();
message2.setString("name", "xiaowang");
message2.setIntProperty("age", 10);
/**
*message : 发送的消息
* DeliveryMode: 是否持久化
* priority优先级
* timeToLive 消息过期时间
*/
producer.send(message1, DeliveryMode.NON_PERSISTENT, 4,1000*60*10);
producer.send(message2, DeliveryMode.NON_PERSISTENT, 4,1000*60*10);
- 消费者:
String condition = "age>=20";
Destination destination = session.createTopic("test1");
MessageConsumer consumer = session.createConsumer(destination, condition);
activeMQ 与spring 相结合
配置:
- activemq.properties
## ActiveMQ Config
activemq.brokerURL=tcp\://127.0.0.1\:61616
activemq.userName=admin
activemq.password=admin
activemq.pool.maxConnections=10
#queueName
activemq.queueName=myspringqueue
- spring-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd"
default-autowire="byName">
<!-- 读入配置属性文件 -->
<context:property-placeholder location="classpath:activemq.properties" />
<!-- 注释配置 -->
<context:annotation-config />
<!-- 扫描包起始位置 -->
<context:component-scan base-package="com.laowei.springmq" />
<import resource="classpath:spring-activemq.xml" />
</beans>
- sping-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 第三方MQ工厂: ConnectionFactory -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- ActiveMQ Address -->
<property name="brokerURL" value="${activemq.brokerURL}" />
<property name="userName" value="${activemq.userName}"></property>
<property name="password" value="${activemq.password}"></property>
</bean>
<!--
ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包
-->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="${activemq.pool.maxConnections}" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>
<!--这个是目的地-->
<bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>${activemq.queueName}</value>
</constructor-arg>
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<!-- 队列模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestinationName" value="${activemq.queueName}"></property>
</bean>
<!-- 配置自定义监听:MessageListener -->
<bean id="msgQueueMessageListener" class="com.laowei.springmq.Consumer"></bean>
<!-- 将连接工厂、目标对了、自定义监听注入jms模板 -->
<bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="msgQueue" />
<property name="messageListener" ref="msgQueueMessageListener" />
</bean>
</beans>
- 生产者代码:
@Service("activeMQProducer")
public class Product {
private JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessage(final String info){
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(info);
}
});
}
}
- 消费者
public class Consumer implements SessionAwareMessageListener<Message> {
public void onMessage(Message message, Session session) throws JMSException {
if (message instanceof TextMessage){
System.out.println("*******" +((TextMessage) message).getText());
}
}
}
- 测试代码:
public class TestProduct extends BaseJunit4Test{
@Autowired
private Product product;
@Test
public void sendmessage() throws InterruptedException {
while (true){
Thread.sleep(3000);
product.sendMessage("hahaha");
}
}
}
代码地址:
https://github.com/weijun8687/ActiveMQ.git
参考文章:
http://blog.csdn.net/fulai0_0/article/details/52127320
http://www.mytju.com/classcode/news_readNews.asp?newsID=486
网友评论