ActiveMQ简介
一款用Java开发的、开源的、支持多种协议的、非常流行的消息服务(消息中间件)。因为它的支持多 协议,支持工业标准的协议,所以我们可以跨平台、跨语言来使用它。
- 用AMQP工业标准协议进行多平台应用集成;
- Web应用可基于websocket用STOMP协议与ActiveMQ直接交互
- 物联网设备用MQTT协议
- 基于JMS的已有基础设施也支持
- 还有更多…
当前有两个版本:ActiveMQ 5 "Classic" 和 ActiveMQ Artemis (下一代版本)。
我们讨论 ActiveMQ 5 "Classic" 经典版。
有兴趣可以了解 ActiveMQ Artemis。
linux安装指南
- Linux: Centos 7 CentOS-7-x86_64-Minimal-1810.iso
- Jdk 8 jdk-8u221-linux-x64.rpm
安装包下载
wget –c http://mirror.bit.edu.cn/apache/activemq/5.15.9/apache-activemq-5.15.9bin.tar.gz
安装
#创建安装目录
mkdir /usr/activemq
#解压安装包到安装目录
tar -zxvf apache-activemq-5.15.9-bin.tar.gz -C /usr/activemq
# 为方便配置时书写,创建软链接
ln -s /usr/activemq/apache-activemq-5.15.9 /usr/activemq/latest
启停
cd /usr/activemq/latest/bin
# 作为前台进程启动
./activemq console
#作为守护进程启动
./activemq start
#启动输出
INFO: Loading '/usr/activemq/apache-activemq-5.15.9//bin/env'
INFO: Using java '/usr/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/usr/activemq/apache-activemq5.15.9//data/activemq.pid' (pid '14887')
启动成功检测
访问管理控制台:http://ip:8161/admin 如果防火墙阻止了,请看下面防火墙开发端口
ActiveMQ的管理页面默认开启了身份校验:
账号:admin
密码:admin
或在启动Console 或 日志文件(data/activemq.log)中看到日志输出:
Apache ActiveMQ 5.15.9 (localhost, ID:ntbk11111-50816-1428933306116-0:1) started | org.apache.activemq.broker.BrokerService | main
或用 jps命令查看
[root@localhost latest]# jps 25778 activemq.jar 25805 Jps
停止
./activemq stop
了解activemq 命令的用法(快速了解一下):
./activemq
Usage: ./activemq [--extdir <dir>] [task] [task-options] [task data]
Tasks:
browse - Display selected messages in a specified destination.
bstat - Performs a predefined query that displays useful statistics regarding the specified broker
consumer - Receives messages from the broker
create - Creates a runnable broker instance in the specified path.
decrypt - Decrypts given text
dstat - Performs a predefined query that displays useful tabular statistics regarding the specified destination type
encrypt - Encrypts given text
export - Exports a stopped brokers data files to an archive file
list - Lists all available brokers in the specified JMX context
producer - Sends messages to the broker
purge - Delete selected destination's messages that matches the message selector
query - Display selected broker component's attributes and statistics.
start - Creates and starts a broker using a configuration file, or a broker URI.
stop - Stops a running broker specified by the broker name.
Task Options (Options specific to each task):
--extdir <dir> - Add the jar files in the directory to the classpath.
--version - Display the version information.
-h,-?,--help - Display this help information. To display task specific help, use Main [task] -h,-?,--help
Task Data:
- Information needed by each specific task.
JMX system property options:
-Dactivemq.jmx.url=<jmx service uri> (default is: 'service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi')
-Dactivemq.jmx.user=<user name>
-Dactivemq.jmx.password=<password>
Tasks provided by the sysv init script:
kill - terminate instance in a drastic way by sending SIGKILL
restart - stop running instance (if there is one), start new instance
console - start broker in foreground, useful for debugging purposes
status - check if activemq process is running
Configuration of this script:
The configuration of this script is read from the following files:
/etc/default/activemq /home/redis/.activemqrc /home/redis/apache-activemq-5.15.11//bin/env
This script searches for the files in the listed order and reads the first available file.
Modify /home/redis/apache-activemq-5.15.11//bin/env or create a copy of that file on a suitable location.
To use additional configurations for running multiple instances on the same operating system
rename or symlink script to a name matching to activemq-instance-<INSTANCENAME>.
This changes the configuration location to /etc/default/activemq-instance-<INSTANCENAME> and
$HOME/.activemqrc-instance-<INSTANCENAME>.
防火墙开放MQ端口
#Web管理端口默认为8161,通讯端口默认为61616
firewall-cmd --zone=public --add-port=8161/tcp --permanent
firewall-cmd --zone=public --add-port=61616/tcp --permanent
重启防火墙
systemctl restart firewalld.service
Linux 服务安装
以普通用户身份运行
useradd activemq
chown -R activemq:users /usr/activemq
创建全局默认的配置文件
cp /usr/activemq/latest/bin/env /etc/default/activemq
sed -i '~s/^ACTIVEMQ\_USER=""/ACTIVEMQ\_USER="activemq"/'
/etc/default/activemq
编辑activemq配置文件,进行如下配置(生产环境需要考虑配置)
- Configure the java heap to a size suitable to your system environment and usage
- Consider to move the folders “data”, “tmp” and “conf” out of the installation path
vim /etc/default/activemq
配置内容如下所示
# Active MQ installation dirs # ACTIVEMQ_HOME="<Installationdir>/"
# ACTIVEMQ_BASE="$ACTIVEMQ_HOME"
# ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf"
# ACTIVEMQ_DATA="$ACTIVEMQ_BASE/data"
# ACTIVEMQ_TMP="$ACTIVEMQ_BASE/tmp"
# Set jvm memory configuration (minimal/maximum amount of memory) ACTIVEMQ_OPTS_MEMORY="-Xms64M -Xmx1G"
修改权限模式
chmod 644 /etc/default/activemq
安装启动脚本
ln -snf /usr/activemq/latest/bin/activemq /etc/init.d/activemq
激活启动服务
# RHEL
chkconfig --add activemq
chkconfig activemq on
或者
systemctl enable activemq
快速上手使用
引入jar
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
编写生产者的Producer
package com.study.activemq.le1.helloworld.queue;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 简单生产者 */
public class Producer {
public static void main(String[] args) {
new ProducerThread("tcp://mq.study.com:61616", "queue1").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
// 2、创建连接
conn = connectionFactory.createConnection();
conn.start(); // 一定要start
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息发送目标 (Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、用目的地创建消息生产者
MessageProducer producer = session.createProducer(destination); // 设置递送模式(持久化 / 不持久化) producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、创建一条文本消息
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + System.currentTimeMillis(); TextMessage message = session.createTextMessage(text);
// 7、通过producer 发送消息
System.out.println("Sent message: " + text);
producer.send(message);
// 8、 清理、关闭连接
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
编写生产者
package com.study.activemq.le1.helloworld.queue;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 简单消费者 */ // http://activemq.apache.org/consumer-features.html
public class Consumer {
public static void main(String[] args) {
new ConsumerThread("tcp://mq.study.com:61616", "queue1").start();
new ConsumerThread("tcp://mq.study.com:61616", "queue1").start();
} }
class ConsumerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ConsumerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
MessageConsumer consumer;
try { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.64.10:61616");
Connection connection = factory.createConnection();
connection.setExceptionListener(new TestActiveMQConsumer());
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
System.out.println("收到消息");
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
}else{
System.out.println(message);
}
consumer.close();
session.close();
connection.close();
}
}
使用spring-boot集成
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>avtiveMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
<version>1.0.6</version>
</dependency>
</dependencies>
</project>
生产者代码
package com.arlley.active.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@SpringBootApplication
public class Main {
@Bean
public MessageConverter jacksonJmsMessageConverter(){
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Resource
public JmsTemplate jmsTemplate;
@PostConstruct
public void sendMessage(){
jmsTemplate.convertAndSend("Message", new User("1", "lala"));
}
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
消费者代码
package com.arlley.active.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
@SpringBootApplication
public class Consumer {
@Bean
public MessageConverter jacksonJmsMessageConverter(){
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@JmsListener(destination = "Message")
public void receive(User user){
System.out.println(user.getId()+":"+user.getName());
}
public static void main(String[] args) {
SpringApplication.run(Consumer.class, args);
}
}
网友评论