ActiveMQ

作者: 小蜗牛Aaron | 来源:发表于2020-03-06 01:13 被阅读0次

ActiveMQ简介

一款用Java开发的、开源的、支持多种协议的、非常流行的消息服务(消息中间件)。因为它的支持多 协议,支持工业标准的协议,所以我们可以跨平台、跨语言来使用它。

  • 用AMQP工业标准协议进行多平台应用集成;
  • Web应用可基于websocket用STOMP协议与ActiveMQ直接交互
  • 物联网设备用MQTT协议
  • 基于JMS的已有基础设施也支持
  • 还有更多…

当前有两个版本:ActiveMQ 5 "Classic" 和 ActiveMQ Artemis (下一代版本)。
我们讨论 ActiveMQ 5 "Classic" 经典版。
有兴趣可以了解 ActiveMQ Artemis。

linux安装指南

  • Linux: Centos 7 CentOS-7-x86_64-Minimal-1810.iso
  • Jdk 8 jdk-8u221-linux-x64.rpm
安装包下载
wget –c http://mirror.bit.edu.cn/apache/activemq/5.15.9/apache-activemq-5.15.9bin.tar.gz
安装
#创建安装目录
mkdir /usr/activemq
#解压安装包到安装目录
tar -zxvf apache-activemq-5.15.9-bin.tar.gz -C /usr/activemq
# 为方便配置时书写,创建软链接
ln -s /usr/activemq/apache-activemq-5.15.9 /usr/activemq/latest
启停
cd /usr/activemq/latest/bin
# 作为前台进程启动
./activemq console
#作为守护进程启动
./activemq start
#启动输出
INFO: Loading '/usr/activemq/apache-activemq-5.15.9//bin/env' 
INFO: Using java '/usr/bin/java' 
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details 
INFO: pidfile created : '/usr/activemq/apache-activemq5.15.9//data/activemq.pid' (pid '14887')
启动成功检测

访问管理控制台:http://ip:8161/admin 如果防火墙阻止了,请看下面防火墙开发端口
ActiveMQ的管理页面默认开启了身份校验:
账号:admin
密码:admin
或在启动Console 或 日志文件(data/activemq.log)中看到日志输出:

Apache ActiveMQ 5.15.9 (localhost, ID:ntbk11111-50816-1428933306116-0:1) started | org.apache.activemq.broker.BrokerService | main

或用 jps命令查看

[root@localhost latest]# jps 25778 activemq.jar 25805 Jps
停止
./activemq stop
了解activemq 命令的用法(快速了解一下):
./activemq
Usage: ./activemq [--extdir <dir>] [task] [task-options] [task data]

Tasks:
    browse                   - Display selected messages in a specified destination.
    bstat                    - Performs a predefined query that displays useful statistics regarding the specified broker
    consumer                 - Receives messages from the broker
    create                   - Creates a runnable broker instance in the specified path.
    decrypt                  - Decrypts given text
    dstat                    - Performs a predefined query that displays useful tabular statistics regarding the specified destination type
    encrypt                  - Encrypts given text
    export                   - Exports a stopped brokers data files to an archive file
    list                     - Lists all available brokers in the specified JMX context
    producer                 - Sends messages to the broker
    purge                    - Delete selected destination's messages that matches the message selector
    query                    - Display selected broker component's attributes and statistics.
    start                    - Creates and starts a broker using a configuration file, or a broker URI.
    stop                     - Stops a running broker specified by the broker name.

Task Options (Options specific to each task):
    --extdir <dir>  - Add the jar files in the directory to the classpath.
    --version       - Display the version information.
    -h,-?,--help    - Display this help information. To display task specific help, use Main [task] -h,-?,--help

Task Data:
    - Information needed by each specific task.

JMX system property options:
    -Dactivemq.jmx.url=<jmx service uri> (default is: 'service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi')
    -Dactivemq.jmx.user=<user name>
    -Dactivemq.jmx.password=<password>


Tasks provided by the sysv init script:
    kill            - terminate instance in a drastic way by sending SIGKILL
    restart         - stop running instance (if there is one), start new instance
    console         - start broker in foreground, useful for debugging purposes
    status          - check if activemq process is running

Configuration of this script:
    The configuration of this script is read from the following files:
    /etc/default/activemq /home/redis/.activemqrc /home/redis/apache-activemq-5.15.11//bin/env
    This script searches for the files in the listed order and reads the first available file.
    Modify /home/redis/apache-activemq-5.15.11//bin/env or create a copy of that file on a suitable location.
    To use additional configurations for running multiple instances on the same operating system
    rename or symlink script to a name matching to activemq-instance-<INSTANCENAME>.
    This changes the configuration location to /etc/default/activemq-instance-<INSTANCENAME> and
    $HOME/.activemqrc-instance-<INSTANCENAME>.

防火墙开放MQ端口
#Web管理端口默认为8161,通讯端口默认为61616 
firewall-cmd --zone=public --add-port=8161/tcp --permanent 
firewall-cmd --zone=public --add-port=61616/tcp --permanent
重启防火墙
systemctl restart firewalld.service

Linux 服务安装

以普通用户身份运行
useradd activemq 
chown -R activemq:users /usr/activemq
创建全局默认的配置文件
cp /usr/activemq/latest/bin/env /etc/default/activemq 
sed -i '~s/^ACTIVEMQ\_USER=""/ACTIVEMQ\_USER="activemq"/' 
/etc/default/activemq
编辑activemq配置文件,进行如下配置(生产环境需要考虑配置)
  • Configure the java heap to a size suitable to your system environment and usage
  • Consider to move the folders “data”, “tmp” and “conf” out of the installation path
vim /etc/default/activemq

配置内容如下所示

# Active MQ installation dirs # ACTIVEMQ_HOME="<Installationdir>/" 
# ACTIVEMQ_BASE="$ACTIVEMQ_HOME" 
# ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf" 
# ACTIVEMQ_DATA="$ACTIVEMQ_BASE/data" 
# ACTIVEMQ_TMP="$ACTIVEMQ_BASE/tmp"
# Set jvm memory configuration (minimal/maximum amount of memory) ACTIVEMQ_OPTS_MEMORY="-Xms64M -Xmx1G"

修改权限模式

chmod 644 /etc/default/activemq

安装启动脚本

ln -snf  /usr/activemq/latest/bin/activemq /etc/init.d/activemq

激活启动服务

# RHEL 
chkconfig --add activemq 
chkconfig activemq on

或者

systemctl enable activemq

快速上手使用

引入jar
<dependency>    
      <groupId>org.apache.activemq</groupId>    
      <artifactId>activemq-all</artifactId>    
      <version>5.15.9</version> 
</dependency>
编写生产者的Producer
package com.study.activemq.le1.helloworld.queue;
import javax.jms.Connection; 
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer; 
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 简单生产者 */ 
public class Producer {    
  public static void main(String[] args) {        
        new ProducerThread("tcp://mq.study.com:61616", "queue1").start();    
  }
   static class ProducerThread extends Thread {        
        String brokerUrl;        
        String destinationUrl;
        public ProducerThread(String brokerUrl, String destinationUrl) {                    
              this.brokerUrl = brokerUrl;
              this.destinationUrl = destinationUrl;
        }
        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory;
            Connection conn;
            Session session;
            try {
                // 1、创建连接工厂 
                connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
                // 2、创建连接
                conn = connectionFactory.createConnection();
                conn.start(); // 一定要start
                // 3、创建会话(可以创建一个或者多个session)
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 4、创建消息发送目标 (Topic or Queue)
                Destination destination = session.createQueue(destinationUrl);
                // 5、用目的地创建消息生产者
                MessageProducer producer = session.createProducer(destination);                // 设置递送模式(持久化 / 不持久化)                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                // 6、创建一条文本消息 
               String text = "Hello world! From: " + Thread.currentThread().getName() + " : "                        + System.currentTimeMillis();                TextMessage message = session.createTextMessage(text);
                // 7、通过producer 发送消息 
               System.out.println("Sent message: " + text);

               producer.send(message);
                // 8、 清理、关闭连接
                session.close();
                conn.close(); 
           } catch (JMSException e) {
                e.printStackTrace();
            } 
       }
    }
 }

编写生产者
package com.study.activemq.le1.helloworld.queue;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer; 
import javax.jms.Session; 
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 简单消费者 */ // http://activemq.apache.org/consumer-features.html 
public class Consumer {
    public static void main(String[] args) {
        new ConsumerThread("tcp://mq.study.com:61616", "queue1").start();
        new ConsumerThread("tcp://mq.study.com:61616", "queue1").start();
  } }
class ConsumerThread extends Thread {
    String brokerUrl;
    String destinationUrl;
    public ConsumerThread(String brokerUrl, String destinationUrl) {        
      this.brokerUrl = brokerUrl;
      this.destinationUrl = destinationUrl; 
   }
    @Override
    public void run() {
        ActiveMQConnectionFactory connectionFactory; 
       Connection conn;
        Session session;
        MessageConsumer consumer;
   
 
        try {            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.64.10:61616");

        Connection connection = factory.createConnection();
        connection.setExceptionListener(new TestActiveMQConsumer());
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createQueue("myQueue");

        MessageConsumer consumer = session.createConsumer(destination);


        Message message = consumer.receive();

        System.out.println("收到消息");
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage) message;
            System.out.println(textMessage.getText());
        }else{
            System.out.println(message);
        }
        consumer.close();
        session.close();
        connection.close();
}
}

使用spring-boot集成

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>avtiveMQ</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.messaginghub</groupId>
            <artifactId>pooled-jms</artifactId>
            <version>1.0.6</version>
        </dependency>
    </dependencies>
</project>
生产者代码
package com.arlley.active.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@SpringBootApplication
public class Main {

    @Bean
    public MessageConverter jacksonJmsMessageConverter(){
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @Resource
    public JmsTemplate jmsTemplate;

    @PostConstruct
    public void sendMessage(){
        jmsTemplate.convertAndSend("Message", new User("1", "lala"));
    }

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}
消费者代码
package com.arlley.active.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@SpringBootApplication
public class Consumer {

    @Bean
    public MessageConverter jacksonJmsMessageConverter(){
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @JmsListener(destination = "Message")
    public void receive(User user){
        System.out.println(user.getId()+":"+user.getName());
    }

    public static void main(String[] args) {
        SpringApplication.run(Consumer.class, args);
    }
}

相关文章

网友评论

    本文标题:ActiveMQ

    本文链接:https://www.haomeiwen.com/subject/fersrhtx.html