美文网首页
jms--消息推送规范(代码书写)

jms--消息推送规范(代码书写)

作者: 李霖神谷 | 来源:发表于2020-06-08 09:34 被阅读0次

    1.书写生产者消息生产的代码:

    package com.shuai;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 向mq推送消息步奏
     */
    public class JmsActivemq {
        public static final String ACTIVEURL = "tcp://192.168.29.128:61616";
        public static final String QUEUE1 = "queue1";
    
        public static void main(String[] args) throws JMSException {
    //        1.创建连接工厂
            ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEURL);
    //        2.生产连接,并开启连接
            Connection connection = activeMqConnectionFactory.createConnection();
            connection.start();
    //        3.创建session会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //        4.创建目的地
            Queue queue = session.createQueue(QUEUE1);
    //        5.创建生产者
            MessageProducer producer = session.createProducer(queue);
    //        6.创建三个消息
            for (int i = 1; i <= 3; i++) {
                //        7.生产者发送消息到创建好的队列里面
                TextMessage textMessage = session.createTextMessage("刘德华" + i);
                producer.send(textMessage);
            }
    //        8.关闭资源
            producer.close();
            session.close();
            connection.close();
            System.out.println("over");
        }
    }
    

    mq效果:


    image.png

    第一框是发送的消息个数,第二个框是等待的消息个数
    2.书写消费者接收消息的代码:
    (1)正常的消费者消费消息:

    package com.shuai;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class JmsActiveCustemer {
        public static final String ACTIVEURL = "tcp://192.168.29.128:61616";
        public static final String QUEUE1 = "queue1";
    
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂
            ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEURL);
    //        2.创建连接
            Connection connection = mqConnectionFactory.createConnection();
            connection.start();
    //        3.创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //        4.创建目的地
            Queue queue = session.createQueue(QUEUE1);
    //        5.创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
    //        6.接受消息打印
            while (true) {
                TextMessage message = (TextMessage) consumer.receive();
                if (null != message) {
                    System.out.println(message.getText());
                } else {
                    break;
                }
            }
    //        7.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
    }
    
    image.png

    (2)使用监听器消费消息,生产者生产一条消息,这边就会监听一条消息, 使用监听器需要添加System.in.read();这条语句保证控制台不关闭。
    consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
    if (null!=message && message instanceof TextMessage){
    TextMessage textMessage=(TextMessage)message;
    System.out.println(textMessage);
    }
    }
    });
    // 保证控制台不关闭
    System.in.read();
    当你开启两个消费者的时候,生产6个消息,每一个消费者消费3个消息
    目的地一共有两种,这一种是队列还有一种是主题,当生产者生产6个消息的时候对于每一个主题都会接收到6个消息。这里要注意先启动消费者在启动生产,如果先启动生产没有订阅的话,生产的消息就是费消息。
    3.MQ的可靠性:
    (1).持久化内容:
    // 持久化操作如果MQ挂了之前生产的消息还是不会丢失,如果设置为非持久化时服务挂之前的消息就会丢失。默认的是持久化操作。主题持久化操作需要在创建主题之后再将连接开启。
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    (2)创建session所带参数事务与签收:
    // 3.创建session会话
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    第一个参数是是否使用事务,设置为true如果出现错误的话会回滚。设置之后需要在关闭连接之前进行提交。签收的话可以根据自己的需求设置为手动或者自动签收。

    相关文章

      网友评论

          本文标题:jms--消息推送规范(代码书写)

          本文链接:https://www.haomeiwen.com/subject/hmpvzhtx.html