ActiveMQ-API(二)

作者: Airycode | 来源:发表于2018-05-30 14:22 被阅读71次

    MessageProducer:MessageProducer是一个由Session创建的对象,用来向Destination发送消息。
    void send(Destination destination,Message message);
    void send(Destination destination,Message message,int deliveryMode,int priority,long timeToLive);
    void send(Message message);
    void send(Destination destination,Message message,int deliveryMode,int priority,long timeToLive);其中deliveryMode为传送模式,priority为消息优先级,timeToLive为消息过期时间。
    ActiveMQ支持两种消息传送模式:PERSISTENT和NON_PERSISTENT两种。如果不指定传送模式,那么默认是持久性消息。如果容忍消息丢失,那么使用非持久性消息可以改善性能和减少存储的开销。
    activemq.xml配置

    <policyEntry queue="queueName" prioritizedMessages="true" />
    

    消息优先级从0-9是个级别,0-4是普通消息,5-9是加急消息。如果不指定优先级,则默认是4.JMS不要求严格按照这十个优先级发送消息,但是必须保证单次加急消息要先与普通消息到达,但也仅仅是先到达,并不能保证顺序消费机制。
    默认情况下,消息永不会过期,如果消息在特定周期内失去意义,那么可以设置过期时间,时间单位为毫秒。
    测试api代码:

    package bhz.mq.helloworld;
    
    import java.util.concurrent.TimeUnit;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Sender {
    
        public static void main(String[] args) throws Exception{
            //第一步:建立ConnectionFactory工厂对象,需要填入用户名,密码,以及要链接的地址,均使用默认即可,默认的端口为"tcp://localhost:61616"
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");
            //第二步:通过ConnectionFactory工厂对象我们创建一个Connection链接,并且调用Connection的start方法开启链接,Connection默认是关闭的。
            Connection connection = connectionFactory.createConnection();
            connection.start();
            //第三步:通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,参数配置2为签收模式,一般我们设置自动签收。
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //第四步:通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中
            Destination destination = session.createQueue("first");
            //第五步:我们需要通过Session对象创建消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumer
            MessageProducer producer = session.createProducer(null);
            //第六步:我们可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode),我们稍后介绍。
            //producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //第七步:最后我们使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据,同理客户端使用receive方法进行接收数据。
            for (int i=0;i<9;i++) {
                TextMessage msg = session.createTextMessage("我是消息内容"+i);
                //第一个参数 目标地址
                //第二个参数 具体的数据信息
                //第三个参数 传送数据的模式
                //第四个参数 优先级
                //第五个参数 消息的过期时间
                producer.send(destination,msg,DeliveryMode.PERSISTENT,i,1000*5);
                
                TimeUnit.SECONDS.sleep(1);
            }
            if (connection != null) {
                connection.close();
            }
        }
        
    }
    
    package bhz.mq.helloworld;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Receiver {
    
        public static void main(String[] args) throws Exception {
            //第一步:建立ConnectionFactory工厂对象,需要填入用户名,密码,以及要链接的地址,均使用默认即可,默认的端口为"tcp://localhost:61616"
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");
            //第二步:通过ConnectionFactory工厂对象我们创建一个Connection链接,并且调用Connection的start方法开启链接,Connection默认是关闭的。
            Connection connection = connectionFactory.createConnection();
            connection.start();
            //第三步:通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,参数配置2为签收模式,一般我们设置自动签收。
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //第四步:通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue即队列,在发布订阅模式中就是Topic
            Destination destination = session.createQueue("first");
            //第五步:通过Session创建MessageConsumer
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                TextMessage msg = (TextMessage) consumer.receive();
                //手动签收
                //msg.acknowledge();
                System.out.println("消费数据:"+msg.getText());
            }
        }
        
    }
    
    

    测试优先级

    <!--
        Licensed to the Apache Software Foundation (ASF) under one or more
        contributor license agreements.  See the NOTICE file distributed with
        this work for additional information regarding copyright ownership.
        The ASF licenses this file to You under the Apache License, Version 2.0
        (the "License"); you may not use this file except in compliance with
        the License.  You may obtain a copy of the License at
    
        http://www.apache.org/licenses/LICENSE-2.0
    
        Unless required by applicable law or agreed to in writing, software
        distributed under the License is distributed on an "AS IS" BASIS,
        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        See the License for the specific language governing permissions and
        limitations under the License.
    -->
    <!-- START SNIPPET: example -->
    <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <!-- Allows us to use system properties as variables in this configuration file -->
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
                <value>file:${activemq.conf}/credentials.properties</value>
            </property>
        </bean>
    
       <!-- Allows accessing the server log -->
        <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
              lazy-init="false" scope="singleton"
              init-method="start" destroy-method="stop">
        </bean>
    
        <!--
            The <broker> element is used to configure the ActiveMQ broker.
        -->
        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
    
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">" >
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
    
                             http://activemq.apache.org/slow-consumer-handling.html
    
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                    <policyEntry queue="first" prioritizedMessages="true" />
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
    
    
            <!--
                The managementContext is used to configure how ActiveMQ is exposed in
                JMX. By default, ActiveMQ uses the MBean server that is started by
                the JVM. For more information, see:
    
                http://activemq.apache.org/jmx.html
            -->
            <managementContext>
                <managementContext createConnector="false"/>
            </managementContext>
    
            <!--
                Configure message persistence for the broker. The default persistence
                mechanism is the KahaDB store (identified by the kahaDB tag).
                For more information, see:
    
                http://activemq.apache.org/persistence.html
            -->
            <persistenceAdapter>
                <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
                <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
            </persistenceAdapter>
    
    
              <!--
                The systemUsage controls the maximum amount of space the broker will
                use before disabling caching and/or slowing down producers. For more information, see:
                http://activemq.apache.org/producer-flow-control.html
              -->
              <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage percentOfJvmHeap="70" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="100 gb"/>
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="50 gb"/>
                    </tempUsage>
                </systemUsage>
            </systemUsage>
    
            <!--
                The transport connectors expose ActiveMQ over a given protocol to
                clients and other brokers. For more information, see:
    
                http://activemq.apache.org/configuring-transports.html
            -->
            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            </transportConnectors>
    
            <!-- destroy the spring context on shutdown to stop jetty -->
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
            </shutdownHooks>
    
            <plugins>
                <simpleAuthenticationPlugin>
                    <users>
                        <authenticationUser username="bhz" password="bhz" groups="users,admins"/>
                    </users>
                </simpleAuthenticationPlugin>
            </plugins>
            
        </broker>
        
        <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
           <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
           <property name="url" value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/>
           <property name="username" value="root"/>
           <property name="password" value="root"/>
           <property name="poolPreparedStatements" value="true"/>
      </bean>
        
    
        <!--
            Enable web consoles, REST and Ajax APIs and demos
            The web consoles requires by default login, you can disable this in the jetty.xml file
    
            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
        -->
        <import resource="jetty.xml"/>
    
    </beans>
    <!-- END SNIPPET: example -->
    

    测试代码:

    package bhz.mq.helloworld;
    
    import java.util.concurrent.TimeUnit;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Sender {
    
        public static void main(String[] args) throws Exception{
            //第一步:建立ConnectionFactory工厂对象,需要填入用户名,密码,以及要链接的地址,均使用默认即可,默认的端口为"tcp://localhost:61616"
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");
            //第二步:通过ConnectionFactory工厂对象我们创建一个Connection链接,并且调用Connection的start方法开启链接,Connection默认是关闭的。
            Connection connection = connectionFactory.createConnection();
            connection.start();
            //第三步:通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,参数配置2为签收模式,一般我们设置自动签收。
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //第四步:通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中
            Destination destination = session.createQueue("first");
            //第五步:我们需要通过Session对象创建消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumer
            MessageProducer producer = session.createProducer(null);
            //第六步:我们可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode),我们稍后介绍。
            //producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //第七步:最后我们使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据,同理客户端使用receive方法进行接收数据。
            for (int i=0;i<9;i++) {
                TextMessage msg = session.createTextMessage("我是消息内容"+i);
                //第一个参数 目标地址
                //第二个参数 具体的数据信息
                //第三个参数 传送数据的模式
                //第四个参数 优先级
                //第五个参数 消息的过期时间
                producer.send(destination,msg,DeliveryMode.PERSISTENT,i,1000*60);
                
                //TimeUnit.SECONDS.sleep(1);
            }
            if (connection != null) {
                connection.close();
            }
        }
        
    }
    
    package bhz.mq.helloworld;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Receiver {
    
        public static void main(String[] args) throws Exception {
            //第一步:建立ConnectionFactory工厂对象,需要填入用户名,密码,以及要链接的地址,均使用默认即可,默认的端口为"tcp://localhost:61616"
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");
            //第二步:通过ConnectionFactory工厂对象我们创建一个Connection链接,并且调用Connection的start方法开启链接,Connection默认是关闭的。
            Connection connection = connectionFactory.createConnection();
            connection.start();
            //第三步:通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,参数配置2为签收模式,一般我们设置自动签收。
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //第四步:通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue即队列,在发布订阅模式中就是Topic
            Destination destination = session.createQueue("first");
            //第五步:通过Session创建MessageConsumer
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                TextMessage msg = (TextMessage) consumer.receive();
                //手动签收
                //msg.acknowledge();
                System.out.println("消费数据:"+msg.getText());
            }
        }
        
    }
    
    

    相关文章

      网友评论

        本文标题:ActiveMQ-API(二)

        本文链接:https://www.haomeiwen.com/subject/xlbtsftx.html