美文网首页
电商项目(五)

电商项目(五)

作者: kelaody | 来源:发表于2019-07-29 12:17 被阅读0次

    1 ActiveMQ简介

    1.1 ActiveMQ是什么

    ActiveMQ是一个消息队列应用服务器。支持JMS****规范

    1.1.1 JMS概述

    全称:Java Message Service ,即为Java消息服务,是一套java消息服务的API标准。(标准即接口)

    实现了JMS标准的系统,称之为JMS Provider。

    1.1.2 消息队列

    1.1.2.1 概念

    消息队列是在消息的传输过程中保存消息的容器,提供一种不同进程或者同一进程不同线程直接通讯的方式。

    [图片上传失败...(image-9b371a-1564373794322)]

    Producer:消息生产者,负责产生和发送消息到 Broker;

    Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;

    Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;

    1.1.2.2 常见消息队列应用

    (1)、ActiveMQ

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

    (2)、RabbitMQ

    RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。开发语言为Erlang。

    (3)、RocketMQ

    由阿里巴巴定义开发的一套消息队列应用服务。

    1.2 ActiveMQ能做什么

    (1)实现两个不同应用(程序)之间的消息通讯。

    (2)实现同一个应用,不同模块之间的消息通讯。

    1.3 ActiveMQ下载

    ActiveMQ官网地址: http://activemq.apache.org

    ActiveMQ下载地址:http://activemq.apache.org/download-archives.html

    --可供下载的历史版本

    --说明:

    ActiveMQ 5.10.x以上版本必须使用JDK1.8才能正常使用。

    ActiveMQ 5.9.x及以下版本使用JDK1.7即可正常使用。

    |

    [图片上传失败...(image-8a6a26-1564373794300)]

    |

    --根据操作系统,选择下载版本。(本教程下载Linux版本)

    |

    [图片上传失败...(image-2cabe4-1564373794300)]

    |

    1.4 ActiveMQ主要特点

    (1)支持多语言、多协议客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

    (2)对Spring的支持,ActiveMQ可以很容易整合到Spring的系统里面去。

    (3)支持高可用、高性能的集群模式。

    2 入门示例

    2.1 需求

    使用ActiveMQ实现消息队列模型。

    2.2 配置步骤说明

    (1)搭建ActiveMQ消息服务器。

    (2)创建一个java项目。

    (3)创建消息生产者,发送消息。

    (4)创建消息消费者,接收消息。

    2.3 第一部分:搭建ActiveMQ消息服务器

    2.3.1 第一步:下载、上传至Linux

    --说明:确保已经安装了jdk

    |

    [图片上传失败...(image-c63245-1564373794300)]

    |

    2.3.2 第二步:安装到/usr/local/activemq目录

    (1)解压到/usr/local目录下

    |

    [root@node07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz -C /usr/local

    |

    (2)修改名称为activemq

    |

    [root@node07192 ~]# cd /usr/local/

    [root@node07192 local]# mv apache-activemq-5.9.0/ activemq

    |

    2.3.3 第三步:启动ActiveMQ服务器

    --说明:ActiveMQ是免安装软件,解压即可启动服务。

    |

    [root@node07192 local]# cd activemq/bin

    [root@node07192 bin]# ./activemq start

    |

    --查看ActiveMQ启动状态

    |

    [root@node07192 bin]# ./activemq status

    [图片上传失败...(image-f26a6a-1564373794300)]

    |

    2.3.4 第四步:浏览器访问ActiveMQ管理界面

    2.3.4.1 Step1:查看ActiveMQ管理界面的服务端口。在/conf/jetty.xml中

    --访问管理控制台的服务端口,默认为:8161

    |

    [root@node07192 bin]# cd ../conf

    [root@node07192 conf]# vim jetty.xml

    [图片上传失败...(image-9e080e-1564373794300)]

    |

    2.3.4.2 Step2:查看ActiveMQ用户、密码。在/conf/users.properties中:

    --默认的用户名、密码均为amdin

    |

    [root@node07192 conf]# vim users.properties

    [图片上传失败...(image-633981-1564373794300)]

    |

    2.3.4.3 Step3:访问ActiveMQ管理控制台。地址:http://ip:8161/

    --注意:防火墙是没有配置该服务的端口的。

    因此,要访问该服务,必须在防火墙中配置。

    (1****)修改防火墙,开放8161****端口

    |

    [root@node07192 conf]# vim /etc/sysconfig/iptables

    [图片上传失败...(image-8f9245-1564373794300)]

    |

    (2****)重启防火墙

    |

    [root@node07192 conf]# service iptables restart

    |

    (3)登录管理控制台

    --登陆,用户名、密码均为admin

    |

    [图片上传失败...(image-9c205-1564373794300)]

    |

    --控制台主界面

    |

    [图片上传失败...(image-48d268-1564373794300)]

    |

    --搭建ActiveMQ服务器成功!!!

    2.4 第二部分:创建java项目,导入jar包

    --导包说明:

    ActiveMQ的解压包中,提供了运行ActiveMQ的所有jar。

    |

    [图片上传失败...(image-a464ce-1564373794300)]

    |

    --创建项目

    |

    [图片上传失败...(image-f2670f-1564373794300)]

    |

    2.5 第三部分:创建消息生成者,发送消息

    --说明:ActiveMQ是实现了JMS规范的。在实现消息服务的时候,必须基于API接口规范。

    2.5.1 JMS常用的API说明

    下述API都是接口类型,定义在javax.jms包中,是JMS标准接口定义。ActiveMQ完全实现这一套api标准。

    2.5.1.1 ConnectionFactory

    链接工厂, 用于创建链接的工厂类型。

    2.5.1.2 Connection

    链接,用于建立访问ActiveMQ连接的类型, 由链接工厂创建。

    2.5.1.3 Session

    会话, 一次持久有效、有状态的访问,由链接创建。

    2.5.1.4 Destination & Queue & Topic

    目的地, 即本次访问ActiveMQ消息队列的地址,由Session会话创建。

    (1)interface Queue extends Destination

    (2)Queue:队列模型,只有一个消费者。消息一旦被消费,默认删除。

    (3)Topic:主题订阅中的消息,会发送给所有的消费者同时处理。

    2.5.1.5 Message

    消息,在消息传递过程中数据载体对象,是所有消息【文本消息TextMessage,对象消息ObjectMessage等】具体类型的顶级接口,可以通过会话创建或通过会话从ActiveMQ服务中获取。

    2.5.1.6 MessageProducer

    消息生成者, 在一次有效会话中, 用于发送消息给ActiveMQ服务的工具,由Session会话创建。

    2.5.1.7 MessageCustomer

    消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于ActiveMQ服务中获取消息的工具,由Session会话创建。

    我们定义的消息生产者和消费者,都是基于上面API实现的。

    2.5.2 第一步:创建MyProducer类,定义sendMessage方法

    |

    package cn.gzsxt.mq.producer;

    import javax.jms.Connection;

    import javax.jms.ConnectionFactory;

    import javax.jms.Destination;

    import javax.jms.JMSException;

    import javax.jms.Message;

    import javax.jms.MessageProducer;

    import javax.jms.Session;

    import org.apache.activemq.ActiveMQConnectionFactory;

    public class MyProducer {

    // 定义链接工厂

    ConnectionFactory connectionFactory = null;

    // 定义链接

    Connection connection = null;

    // 定义会话

    Session session = null;

    // 定义目的地

    Destination destination = null;

    // 定义消息生成者

    MessageProducer producer = null;

    // 定义消息

    Message message = null;

    public void sendToMQ(){

    try{

    /*

    • 创建链接工厂

    • ActiveMQConnectionFactory - 由ActiveMQ实现的ConnectionFactory接口实现类.

    • 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)

    • userName - 访问ActiveMQ服务的用户名, 用户名可以通过<u>jetty</u>-realm.properties配置文件配置.

    • password - 访问ActiveMQ服务的密码, 密码可以通过<u>jetty</u>-realm.properties配置文件配置.

    • brokerURL - 访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号

    • 此链接基于TCP/IP协议.

    */

    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

    // 创建链接对象

    connection = connectionFactory.createConnection();

    // 启动链接

    connection.start();

    /*

    • 创建会话对象

    • 方法 - connection.createSession(boolean transacted, <u>int</u> acknowledgeMode);

    • transacted - 是否使用事务, 可选值为true|false

    • true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, 建议传递的acknowledgeMode参数值为

    • Session.SESSION_TRANSACTED

    • false - 不使用事务, 设置此变量值, 则acknowledgeMode参数必须设置.

    • acknowledgeMode - 消息确认机制, 可选值为:

    • Session.AUTO_ACKNOWLEDGE - 自动确认消息机制

    • Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制

    • Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制

    */

    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列

    destination = session.createQueue("test-mq");

    // 创建消息生成者, 创建的消息生成者与某目的地对应, 即方法参数目的地.

    producer = session.createProducer(destination);

    // 创建消息对象, 创建一个文本消息, 此消息对象中保存要传递的文本数据.

    message = session.createTextMessage("hello,activeme");

    // 发送消息

    producer.send(message);

    System.out.println("消息发送成功!");

    }catch(Exception e){

    e.printStackTrace();

    System.out.println("访问ActiveMQ服务发生错误!!");

    }finally{

    try {

    // 回收消息发送者资源

    if(null != producer)

    producer.close();

    } catch (JMSException e) {

    e.printStackTrace();

    }

    try {

    // 回收会话资源

    if(null != session)

    session.close();

    } catch (JMSException e) {

    e.printStackTrace();

    }

    try {

    // 回收链接资源

    if(null != connection)

    connection.close();

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    }

    }

    |

    2.5.3 第二步:创建一个测试类MessageTest

    --添加junit类库,快捷键ctrl+1

    |

    package cn.gzsxt.mq.test;

    import org.junit.Test;

    import cn.gzsxt.mq.producer.MyProducer;

    public class MessageTest {

    @Test

    public void sendToMQ(){

    MyProducer producer = new MyProducer();

    producer.sendToMQ();

    }

    }

    |

    2.5.4 第三步:测试

    (1)设置防火墙,配置61616端口。注意修改之后重启防火墙

    (2)测试结果:

    --查看控制台

    |

    [图片上传失败...(image-6aa9d4-1564373794299)]

    |

    --查看ActiveMQ管理控制界面

    |

    [图片上传失败...(image-4252b4-1564373794299)]

    |

    --消息发送成功!!!

    2.6 第四部分:创建消息消费者,消费消息

    2.6.1 第一步:创建MyConsumer类

    |

    package cn.gzsxt.mq.consumer;

    import javax.jms.Connection;

    import javax.jms.ConnectionFactory;

    import javax.jms.Destination;

    import javax.jms.JMSException;

    import javax.jms.Message;

    import javax.jms.MessageConsumer;

    import javax.jms.Session;

    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnectionFactory;

    /**

    • @ClassName:MyConsumer

    • @Description: 消息消费者代码

    */

    public class MyConsumer {

    // 定义链接工厂

    ConnectionFactory connectionFactory = null;

    // 定义链接

    Connection connection = null;

    // 定义会话

    Session session = null;

    // 定义目的地

    Destination destination = null;

    // 定义消息消费者

    MessageConsumer consumer = null;

    // 定义消息

    Message message = null;

    public void recieveFromMQ(){

    try{

    /*

    • 创建链接工厂

    • ActiveMQConnectionFactory - 由ActiveMQ实现的ConnectionFactory接口实现类.

    • 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)

    • userName - 访问ActiveMQ服务的用户名, 用户名可以通过<u>jetty</u>-realm.properties配置文件配置.

    • password - 访问ActiveMQ服务的密码, 密码可以通过<u>jetty</u>-realm.properties配置文件配置.

    • brokerURL - 访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号

    • 此链接基于TCP/IP协议.

    */

    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

    // 创建链接对象

    connection = connectionFactory.createConnection();

    // 启动链接

    connection.start();

    /*

    • 创建会话对象

    • 方法 - connection.createSession(boolean transacted, <u>int</u> acknowledgeMode);

    • transacted - 是否使用事务, 可选值为true|false

    • true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, 建议传递的acknowledgeMode参数值为

    • Session.SESSION_TRANSACTED

    • false - 不使用事务, 设置此变量值, 则acknowledgeMode参数必须设置.

    • acknowledgeMode - 消息确认机制, 可选值为:

    • Session.AUTO_ACKNOWLEDGE - 自动确认消息机制

    • Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制

    • Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制

    */

    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列

    destination = session.createQueue("test-mq");

    // 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地.

    consumer = session.createConsumer(destination);

    // 从ActiveMQ服务中获取消息

    message = consumer.receive();

    TextMessage tMsg = (TextMessage) message;

    System.out.println("从MQ中获取的消息是:"+tMsg.getText());

    }catch(Exception e){

    e.printStackTrace();

    System.out.println("访问ActiveMQ服务发生错误!!");

    }finally{

    try {

    // 回收消息消费者资源

    if(null != consumer)

    consumer.close();

    } catch (JMSException e) {

    e.printStackTrace();

    }

    try {

    // 回收会话资源

    if(null != session)

    session.close();

    } catch (JMSException e) {

    e.printStackTrace();

    }

    try {

    // 回收链接资源

    if(null != connection)

    connection.close();

    } catch (JMSException e) {

    e.printStackTrace();

    }

    }

    }

    }

    |

    2.6.2 第二步:修改测试类MessageTest,新增测试方法

    |

    @Test

    public void recieveFromMQ(){

    MyConsumer consumer = new MyConsumer();

    consumer.recieveFromMQ();

    }

    |

    2.6.3 第三步:测试

    --查看Eclipse控制台

    --查看ActiveMQ管理控制界面

    相关文章

      网友评论

          本文标题:电商项目(五)

          本文链接:https://www.haomeiwen.com/subject/qlxbrctx.html