美文网首页
2、JMS开发步骤(队列)

2、JMS开发步骤(队列)

作者: 金石_832e | 来源:发表于2020-03-20 11:26 被阅读0次

    队列一对一

    image.png
    image.png image.png
    观察两者是不是有些似曾相识
    都是一个套路

    依赖

            <!--activemq-->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.11</version>
            </dependency>
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>3.16</version>
            </dependency>
    

    ActiveMQ下的哪个版本,就装哪个<version>Linux中下载的版本号</version>


    不废话直接上代码(生产者)

    package com.yd.telnet.modular.activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @author 
     * @Date 2020/3/19
     */
    public class JmsProduce {
        //为什么是tcp,看源码!
        public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
        public static final String QUEUE_NAME = "queue01";
        public static void main(String[] args) throws JMSException {
            // 1、创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
            // 2、通过连接工厂,获取连接connection
            Connection connection = activeMQConnectionFactory.createConnection();
            // 3、启动连接
            connection.start();
            // 4、创建会话session
            // 两个参数,事务、签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5、创建接收的对象(队列还是主题)
            // destination目的地(queue队列、topic主题)
            //Destination destination = session.createQueue(QUEUE_NAME);
            Queue queue = session.createQueue(QUEUE_NAME);
            // 6、创建消息生产者
            MessageProducer producer = session.createProducer(queue);
            // 7、通过使用producer产生三条消息发送到队列里面
            for(int i = 0;i<3;i++){
                // 逐一创建消息
                TextMessage textMessage = session.createTextMessage("msg------------" + i);
                // 通过producer发送给mq
                producer.send(textMessage);
            }
            // 8、关闭资源(先进后出,同jdbc)
            producer.close();
            session.close();
            connection.close();
        }
    }
    

    看效果

    image.png
    再运行一次,且删除掉其中的三条记录看结果(可以将删除的三条当做已消费)
    image.png
    image.png

    总结
    当有一个消息进入这个队列是,等待消费的消息是1,进入队列的消息是1。
    当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
    再来一条消息时,等待消费的消息是1,进入队列的消息就是2(只增不减)


    不废话直接上代码(消费者)

    package com.yd.telnet.modular.activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @author 
     * @Date 2020/3/19
     */
    public class JmsConsumer {
        public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
        public static final String QUEUE_NAME = "queue01";
        public static void main(String[] args) throws JMSException {
            // 1、创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
            // 2、通过连接工厂,获取连接connection
            Connection connection = activeMQConnectionFactory.createConnection();
            // 3、启动连接
            connection.start();
            // 4、创建会话session
            // 两个参数,事务、签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5、创建接收的对象(队列还是主题)
            // destination目的地(queue队列、topic主题)
            //Destination destination = session.createQueue(QUEUE_NAME);
            Queue queue = session.createQueue(QUEUE_NAME);
            // 6、创建消息消费者
            MessageConsumer consumer = session.createConsumer(queue);
            while(true){
                TextMessage textMessage = (TextMessage) consumer.receive();
                if(null != textMessage){
                    System.out.println("消费者处理消息"+ textMessage.getText());
                }else{
                    break;
                }
            }
            // 7、关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

    运行代码后

    image.png
    image.png

    由于consumer.receive()这个方法是一个不离不弃,啥时候activemq停止啥时候结束,所以产生receive的第二种写法


    image.png

    receive属于同步运行阻塞,所以又有了监听的方式

    package com.yd.telnet.modular.activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    /**
     * @author 张思博
     * @Date 2020/3/19
     */
    public class JmsConsumer {
        public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
        public static final String QUEUE_NAME = "queue01";
        public static void main(String[] args) throws JMSException, IOException {
            // 1、创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
            // 2、通过连接工厂,获取连接connection
            Connection connection = activeMQConnectionFactory.createConnection();
            // 3、启动连接
            connection.start();
            // 4、创建会话session
            // 两个参数,事务、签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5、创建接收的对象(队列还是主题)
            // destination目的地(queue队列、topic主题)
            //Destination destination = session.createQueue(QUEUE_NAME);
            Queue queue = session.createQueue(QUEUE_NAME);
            // 6、创建消息消费者
            MessageConsumer consumer = session.createConsumer(queue);
            // 7、通过监听的方式来消费消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(null != message && message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("监听消费者消费消息"+textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 8、保持控制台不灭
            System.in.read();
            // 9、关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

    该种方式:有消息就消费,没消息就等待

    消息的消费者接收消息可以采用两种方式:
    1.consumer.receive()或 consumer.receive(int timeout);(同步阻塞)
    2.注册一个MessageListener.
    采用第一种方式,消息的接收者会一直等待下去,直到有消息到达或者超时。后一种方式会注册一个监听器,当有消息到达的时候,会调用它的onMessage()方法。(异步非阻塞)
    可以用监听的方式启动两个消费者,生产的消息会被平均分配(自己试)


    image.png

    相关文章

      网友评论

          本文标题:2、JMS开发步骤(队列)

          本文链接:https://www.haomeiwen.com/subject/tudlyhtx.html