一、目录分析

从上面看出一般的项目目录类似,简单分析下其作用:
- bin:为启动目录。
- conf:为配置文件,核心文件有activemq.xml(核心配置,加载jetty.xml等文件和管理数据持久化)、jetty.xml(配置启动端口)、jetty-realm.properties(配置用户名密码)几个。在登录web控制台的时候,需要用户名和密码。
- data:是消息持久化的地方,默认使用kahadb,当然我们可以采用leveldb,或者采用JDBC存储到MySQL,或者干脆不使用持久化机制。 -
- webapps:使用jetty提供的web控制台
二、启动ActiveMQ

在win下直接点击如下脚本即可启动,访问目录:http://localhost:8161

用户名密码可在jetty-realm.properties文件中查看。
# 用户名: ,密码, [,角色 ...]
admin: admin, admin
user: user, user
三、 HelloWorld
基于P2P的简单例子
//参数1:是否启用事务;参数2:签收模式:
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
简单说,就是消费者接受到消息后,需要告诉消息服务器,
我收到消息了。当消息服务器收到回执后,本条消息将失效。
因此签收将对PTP模式产生很大影响。如果消费者收到消息后,
并不签收,那么本条消息继续有效,很可能会被其他消费者消费掉!
- AUTO_ACKNOWLEDGE:表示在消费者receive消息的时候自动的签收
- CLIENT_ACKNOWLEDGE:表示消费者receive消息后必须手动的调用acknowledge()方法进行签收。更好用,我们可以手动控制,如果消息处理失败,那么消息还有效,任然会继续处理,直到成功。
- DUPS_OK_ACKNOWLEDGE:签不签收无所谓了,只要消费者能够容忍重复的消息接受,当然这样会降低Session的开销
package com.lw.activemq.p2p;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.j2ee.statistics.JMSProducerStats;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author liwen
* @date:2017年12月13日 下午2:27:15
* @Function: 生产者
* @version 1.0
*/
public class JMSProducter {
private static ActiveMQConnectionFactory connectionFactory;
private static Connection conn;
private static Queue destination;
private static Session session;
private static MessageProducer producer;
public static void main(String[] args) {
try {
// 创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
// 获取连接
conn = connectionFactory.createConnection();
conn.start();
// 创建session
//参数1:是否启用事务;参数2:签收模式:
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建消息队列
destination = session.createQueue("helloworld");
// 创建生产者
producer = session.createProducer(destination);
// 设置(非)持久化特性,如果非持久化,则意味MQ消息重启后会导致消息丢失
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 发送消息
sendMessage(session, producer);
// 提交
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
private static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < 10; i++) {
// 创建文本消息
TextMessage msg = session.createTextMessage("activemq 消息:" + i);
System.out.println("Producter 消息:" + i );
producer.send(msg);
}
}
}
package com.lw.activemq.p2p;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author liwen
* @date:2017年12月13日 下午2:27:24
* @Function: 消费者
* @version 1.0
*/
public class JMSConsumer {
private static org.apache.activemq.ActiveMQConnectionFactory connectionFactory;
private static Connection conn;
private static Session session;
private static Queue destination;
private static MessageConsumer consumer;
public static void main(String[] args) {
try {
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
conn = connectionFactory.createConnection();
conn.start();
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建队列,要么是Queue,要么是Topic
destination = session.createQueue("helloworld");
consumer = session.createConsumer(destination);
// 接收消息
/*
* while (true) { TextMessage text = (TextMessage) consumer.receive(); if (text
* != null) { System.out.println("consumer收到的消息:" + text.getText()); } else {
* break; } }
*/
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage text = (TextMessage) message;
try {
String msg = text.getText();
System.out.println("consumer收到消息:" + msg);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
上面只是演示消息为字符串的例子,实际项目中可能为java类的比较多。ActiveMQ当然也支持。

从上图可以看出,ActiveMQ可支持流、Map、java类(需要序列化)等消息格式。
四、 保证消息的成功处理
使用CLIENT_ACKNOWLEDGE模式解决,写在接收端。如果接收端不确认消息,那么activemq将会把这条消息一直保留,直到有一个接收端确认了消息的接收。即调用acknowledge方法。

网友评论