美文网首页
ActiveMQ入门使用

ActiveMQ入门使用

作者: 依弗布德甘 | 来源:发表于2020-02-23 12:26 被阅读0次

演示环境: Centos7、jdk8、activemq5.15.8
下载地址: http://activemq.apache.org/activemq-5158-release.html
解压: tar -zxvf apache-activemq-5.15.8-bin.tar.gz -C /var
修改目录名称 mv /var/apache-activemq-5.15.8/ /var/activemq/
启动: ./bin/activemq start
停止:./bin/activemq stop

操作练习

1、创建一个systemd服务文件:vi /usr/lib/systemd/system/activemq.service

2、 放入内容

[Unit]
Description=ActiveMQ service
After=network.target

[Service]
Type=forking
ExecStart=/var/activemq/bin/activemq start
ExecStop=/var/activemq/bin/activemq stop
User=root
Group=root
Restart=always
RestartSec=9
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=activemq

[Install]
WantedBy=multi-user.target

3、 找到java命令所在的目录 whereis java

4、设置activemq配置文件/var/activemq/bin/env中的JAVA_HOME

# Location of the java installation
# Specify the location of your java installation using JAVA_HOME, or specify the
# path to the "java" binary using JAVACMD
# (set JAVACMD to "auto" for automatic detection)
JAVA_HOME="/usr/local/java/jdk1.8.0_181"
JAVACMD="auto"

5、 通过systemctl管理activemq启停

  • 启动activemq服务: systemctl start activemq
  • 查看服务状态: systemctl status activemq
  • 创建软件链接:ln -s /usr/lib/systemd/system/activemq.service /etc/systemd/system/multi-user.target.wants/activemq.service
  • 开机自启: systemctl enable activemq
  • 检测是否开启成功(enable): systemctl list-unit-files |grep activemq

6、 防火墙配置,Web管理端口默认为8161(admin/admin),通讯端口默认为61616

  • 添加并重启防火墙
firewall-cmd --zone=public --add-port=8161/tcp --permanent
firewall-cmd --zone=public --add-port=61616/tcp --permanent
systemctl restart firewalld.service
  • 或者直接关闭防火墙: systemctl stop firewalld.service

7、 修改web管理系统的部分配置,配置文件在/var/activemq/conf

  • 端口修改
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
  <!-- the default port number for the web console -->
  <property name="host" value="0.0.0.0"/>
  <!--此处即为管理平台的端口-->
  <property name="port" value="8161"/>
</bean>
  • 关闭登录
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
  <property name="name" value="BASIC" />
  <property name="roles" value="user,admin" />
  <!-- 改为false即可关闭登陆 -->
  <property name="authenticate" value="true" />
</bean>
  • 其他配置: /var/activemq/conf/jetty-realm.properties
## ---------------------------------------------------------------------------
# 在此即可维护账号密码,格式:
# 用户名:密码,角色
# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: admin, admin
user: 123, user

8、 JAVA客户端的使用

  • 标准客户端使用
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.8</version>
</dependency>
  • Spring中使用: http://spring.io/guides/gs/messaging-jms/
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>5.1.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-broker</artifactId>
    <version>5.15.8</version>
    <exclusions>
    <exclusion>
        <artifactId>geronimo-jms_1.1_spec</artifactId>
        <groupId>org.apache.geronimo.specs</groupId>
    </exclusion>
    </exclusions>
</dependency>

程序示例代码

  • 生产者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 简单生产者
 */
public class Producer {
    public static void main(String[] args) {
        new ProducerThread("tcp://activemq.tony.com:61616", "queue1").start();
    }

    static class ProducerThread extends Thread {
        String brokerUrl;
        String destinationUrl;

        public ProducerThread(String brokerUrl, String destinationUrl) {
            this.brokerUrl = brokerUrl;
            this.destinationUrl = destinationUrl;
        }

        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory;
            Connection conn;
            Session session;

            try {
                // 1、创建连接工厂
                connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
                // 2、创建连接对象md
                conn = connectionFactory.createConnection();
                conn.start();
                // 3、创建会话
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 4、创建点对点发送的目标
                 Destination destination = session.createQueue(destinationUrl);
                // 5、创建生产者消息
                MessageProducer producer = session.createProducer(destination);
                // 设置生产者的模式,有两种可选 持久化 / 不持久化
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                // 6、创建一条文本消息
                String text = "Hello world!";
                TextMessage message = session.createTextMessage(text);
                for (int i = 0; i < 1; i++) {
                    // 7、发送消息
                    producer.send(message);
                }
                // 8、 关闭连接
                session.close();
                conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

  • 消费者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 简单消费者
 */
// http://activemq.apache.org/consumer-features.html
public class Consumer {
    public static void main(String[] args) {
        new ConsumerThread("tcp://activemq.tony.com:61616", "queue1").start();
        new ConsumerThread("tcp://activemq.tony.com:61616", "queue1").start();
    }
}

class ConsumerThread extends Thread {

    String brokerUrl;
    String destinationUrl;

    public ConsumerThread(String brokerUrl, String destinationUrl) {
        this.brokerUrl = brokerUrl;
        this.destinationUrl = destinationUrl;
    }

    @Override
    public void run() {
        ActiveMQConnectionFactory connectionFactory;
        Connection conn;
        Session session;
        MessageConsumer consumer;

        try {
            // brokerURL http://activemq.apache.org/connection-configuration-uri.html
            // 1、创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
            // 2、创建连接对象
            conn = connectionFactory.createConnection();
            conn.start(); // 一定要启动
            // 3、创建会话(可以创建一个或者多个session)
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 4、创建点对点接收的目标,queue - 点对点
            Destination destination = session.createQueue(destinationUrl);

            // 5、创建消费者消息 http://activemq.apache.org/destination-options.html
            consumer = session.createConsumer(destination);

            // 6、接收消息(没有消息就持续等待)
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                System.out.println("收到文本消息:" + ((TextMessage) message).getText());
            } else {
                System.out.println(message);
            }

            consumer.close();
            session.close();
            conn.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Spring Boot 实现

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.core.JmsTemplate;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Producer {

    @Autowired
    private JmsTemplate jmsTemplate;

    @PostConstruct
    public void init() {
        jmsTemplate.convertAndSend("queue1", "Hello Spring 4");
    }

    public static void main(String[] args) {
        SpringApplication.run(Producer.class, args);
    }
}

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.core.JmsTemplate;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Producer {

    @Autowired
    private JmsTemplate jmsTemplate;

    @PostConstruct
    public void init() {
        jmsTemplate.convertAndSend("queue1", "Hello Spring 4");
    }

    public static void main(String[] args) {
        SpringApplication.run(Producer.class, args);
    }
}

相关文章

  • ActiveMQ入门使用

    演示环境: Centos7、jdk8、activemq5.15.8下载地址: http://activemq.ap...

  • ActiveMQ从入门到精通(一)

    这是关于消息中间件ActiveMQ的一个系列专题文章,将涵盖JMS、ActiveMQ的初步入门及API详细使用、两...

  • java消息服务之ActiveMQ入门(0x01)

    java消息服务之ActiveMQ入门(0x02) ActiveMQ能做什么 大多数情况下ActiveMQ被用于做...

  • java消息服务之ActiveMQ入门(0x02)

    java消息服务之ActiveMQ入门(0x01) 概要 ActiveMQ 的 request-response通...

  • ActiveMQ学习

    MQ入门总结(三)ActiveMQ的用法和实现

  • 【JAVA】ActiveMQ

    消息队列ActiveMQ的使用详解 ActiveMQ安装 下载地址:http://activemq.apache....

  • ActiveMQ入门

    ActiveMQ安装 点对点模式 发送消息 接收消息 第一种方式: 第二种方式: 订阅模式 发送消息 消息接收 S...

  • 【ActiveMQ】入门

    1. ActiveMQ 是什么 ActiveMQ 是 Apache 出品,能力强劲的开源消息总线,虽然现在没那么流...

  • ActiveMQ入门

    JMS是什么 JMS是SUN提出是为了统一MOM系统接口的规范,他包含点对点,以及发布订阅两种消息模型,提供可靠消...

  • SpringBoot集成ActiveMQ

    Spring Boot 集成ActiveMQ 使用ActiveMQ版本5.14.0,spring Boot版本1....

网友评论

      本文标题:ActiveMQ入门使用

      本文链接:https://www.haomeiwen.com/subject/xatjqhtx.html