美文网首页
(4)activemq的基本使用

(4)activemq的基本使用

作者: Mrsunup | 来源:发表于2018-12-14 16:52 被阅读0次

    如果服务端没有安装activemq可以参考文:https://www.jianshu.com/p/41fe776151e9
    如果不是很了解jms的规范可以参考文章:https://www.jianshu.com/p/c6355fafdd6f

    1.queue的事务发送和非事务发送

    • 事务queue发送
    /**
     * @Project: activemq
     * @description:  队列的事务发送
     * @author: sunkang
     * @create: 2018-12-08 22:28
     * @ModificationHistory who      when       What
     **/
    public class JMSQueueTransactionProducer {
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
            Connection connection = null;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                //当createSession的第一参数为true,表示事务发送,为false表示非事务发送,事务发送必须要commit,
                Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
                 //创建目的地
                Destination  destination  = session.createQueue("muqueue");
                //创建发送者
                MessageProducer producer  = session.createProducer(destination);
                //发送信息
                for(int i=0;i<10;i++){
                    //创建文本信息
                    TextMessage message = session.createTextMessage("helloworld");
                    //创建map信息
                    MapMessage mapMessage =   session.createMapMessage();
                    mapMessage.setString("123","123");
                    mapMessage.setString("456","456");
                    //创建流对象信息
                    StreamMessage  streamMessage  = session.createStreamMessage();
                    streamMessage.writeString("i am stramMessage");
                    producer.send(message);
                    producer.send(mapMessage);
                    producer.send(streamMessage);
                }
                //事务发送,必须要commit才能发送成功
                session.commit();
                session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                if( connection !=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    执行完之后发现控制台的myque多了30条的数据


    • 非事务的queue发送
    /**
     * @Project: activemq
     * @description:  非事务的queue发送
     * @author: sunkang
     * @create: 2018-12-14 15:23
     **/
    public class JMSQueueNoTransactionProducer {
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
            Connection connection = null;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                //当createSession的第一参数为true,表示事务发送,为false表示非事务发送
                //当非事务发送的时候,第二个参数有三个选择:
                //     Session.AUTO_ACKNOWLEDGE 表示回话自动确认
                //    Session.CLIENT_ACKNOWLEDGE 表示需要消费者调用消息的 acknowledge方法进行确认消息,会确认之前的所有的消息,这个对消息提供者无效
                //    Session.DUPS_OK_ACKNOWLEDGE 表示消费者的消息延迟确认,这个对消息提供者无效
                Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
                //创建目的地
                Destination destination  = session.createQueue("muqueue");
                //创建发送者
                MessageProducer producer  = session.createProducer(destination);
                //发送信息
                for(int i=0;i<10;i++){
                    //创建文本信息
                    TextMessage message = session.createTextMessage("helloworld");
                    //创建map信息
                    MapMessage mapMessage =   session.createMapMessage();
                    mapMessage.setString("123","123");
                    mapMessage.setString("456","456");
                    //创建流对象信息
                    StreamMessage streamMessage  = session.createStreamMessage();
                    streamMessage.writeString("i am stramMessage");
                    producer.send(message);
                    producer.send(mapMessage);
                    producer.send(streamMessage);
                }
    
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                if( connection !=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    2.queue的消息接收

    在这里演示通过消费者receive的方式事务接收和通过消费者设置MessageListener的监听器的方式主动接收消息

    • 消费者receive的方式事务确认消息
    /**
     * @Project: activemq
     * @description: 消费者receive的方式事务接收
     * @author: sunkang
     * @create: 2018-12-08 22:56
     * @ModificationHistory who      when       What
     **/
    public class JMSQueueClientReceiver {
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
            Connection connection = null;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                //客户端的事务型
                Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
                //创建目的地
                Destination destination  = session.createQueue("muqueue");
                //创建发送者
                MessageConsumer consumer  = session.createConsumer(destination);
                //接收消息
                while(true){
                    Message message   = consumer.receive();
                    parseMessageTypeAndPrint(message);
                    //消息确认
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                if( connection !=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        //解析消息的类型,并打印输入消息
        public static void  parseMessageTypeAndPrint(Message message){
            //支持六种对象,接收端需要根据六种对象来判断
            if(message instanceof BytesMessage ){
    
            }else if(message instanceof  StreamMessage){
                StreamMessage  streamMessage = (StreamMessage) message;
                try {
                    String str = streamMessage.readString();
                    System.out.println("streamMessage:"+ str);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
    
            }else if(message instanceof TextMessage){
                TextMessage  textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }else if(message instanceof  ObjectMessage){
    
            }else if (message instanceof  MapMessage){
                MapMessage  mapMessage = (MapMessage) message;
                Enumeration<String> enumerations = null;
                try {
                    enumerations = mapMessage.getMapNames();
                    while(enumerations.hasMoreElements()){
                        String key = enumerations.nextElement();
                        Object value   = mapMessage.getObject(key);
                        System.out.println(key+ ":"+value);
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }else if( message instanceof  Message){
    
            }
        }
    }
    
    • 消费者设置监听器的方式手动确认消息
    /**
     * @Project: activemq
     * @description: 消费者设置监听器的方式手动确认消息
     * @author: sunkang
     * @create: 2018-12-08 23:07
     * @ModificationHistory who      when       What
     **/
    public class JMSQueueListenerReceiver {
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
            Connection connection = null;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                //设置非事务的手动确认模式
                final Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
                //创建目的地
                Destination destination  = session.createQueue("muqueue");
                //创建发送者
                MessageConsumer consumer  = session.createConsumer(destination);
                //创建监听的方式接受消息
                MessageListener listener  =new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        parseMessageTypeAndPrint(message);
                        //消息手动确认接收
                        try {
                            message.acknowledge();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                };
                consumer.setMessageListener(listener);
                //让线程阻塞
                System.in.read();
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if( connection !=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        //解析消息的类型,并打印输入消息
        public static void  parseMessageTypeAndPrint(Message message){
            //支持六种对象,接收端需要根据六种对象来判断
            if(message instanceof BytesMessage ){
    
            }else if(message instanceof  StreamMessage){
                StreamMessage  streamMessage = (StreamMessage) message;
                try {
                    String str = streamMessage.readString();
                    System.out.println("streamMessage:"+ str);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
    
            }else if(message instanceof TextMessage){
                TextMessage  textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }else if(message instanceof  ObjectMessage){
    
            }else if (message instanceof  MapMessage){
                MapMessage  mapMessage = (MapMessage) message;
                Enumeration<String> enumerations = null;
                try {
                    enumerations = mapMessage.getMapNames();
                    while(enumerations.hasMoreElements()){
                        String key = enumerations.nextElement();
                        Object value   = mapMessage.getObject(key);
                        System.out.println(key+ ":"+value);
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }else if( message instanceof  Message){
    
            }
        }
    }
    

    3.topic的消息发送和消息的topic的消息接收

    下面演示了简单的topic的创建的过程,设置了消息的持久化为非持久化,并且消息发送的过程开启了事务

    • topic的消息发送
    /**
     * @Project: activemq
     * @description: topic的消息发送
     * @author: sunkang
     * @create: 2018-12-08 22:28
     * @ModificationHistory who      when       What
     **/
    public class JMSTopicProducer {
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
            Connection connection = null;
            try {
                 connection = connectionFactory.createConnection();
                 connection.start();
                 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
                 //创建目的地
                Destination  destination  = session.createTopic("myTopic");
                //创建发送者
                MessageProducer producer  = session.createProducer(destination);
                //设置持久化模型,消息不持久化
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                //创建需要发送的信息
                TextMessage message = session.createTextMessage("Hello myfriend");
                //发送信息
                producer.send(message);
                session.commit();
                session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                if( connection !=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    • 看下控制台的topic输出,发现多了个myTopic的主题,并且message的进队列为的数量为1

    • 消息的topic的消息接收

    由于topic上的生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息,这会让后来订阅的消费者发送消息的丢失,于是就有了离线的持久化订阅

    在这里显示两种消费者的订阅方式 ,一种为非持久化订阅方式,一种为持久化订阅方式,持久化订阅方式只能对pub/sub模型起作用,对p2p模型不起作用,因为p2p模型不存在时间的相关性。

    (1)消费者的非持久化订阅接收消息
    非持久化订阅方式需要先启动消费者来监听消息,然后在启动消息提供者发送消息,才可以接收到消息

    /**
     * @Project: activemq
     * @description: topic非持久化订阅方式
     * @author: sunkang
     * @create: 2018-12-08 22:56
     * @ModificationHistory who      when       What
     **/
    public class JMSTopicReceiver {
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
            Connection connection = null;
            try {
                connection = connectionFactory.createConnection();
                connection.setClientID("topicId");
                connection.start();
                Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
                //创建目的地
                Destination destination  = session.createTopic("myTopic");
                //创建发送者
                MessageConsumer consumer  = session.createConsumer(destination);
                TextMessage textMessagemessage = (TextMessage) consumer.receive();
                System.out.println(textMessagemessage.getText());
    
                session.commit();
                session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                if( connection !=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    • (2)消费者的持久化订阅接收消息

    持久订阅时,客户端向 JMS 服务器注册一个自己身份的 ID,当这个客户端处于离线时,JMS Provider 会为这个 ID 保存所有发送到主题的消息,当客户再次连接到 JMS Provider 时,会根据自己的 ID 得到所有当自己处于离线时发送到主题的消息

    需要先启动消费端先往JMS 服务器注册身份id,然后关闭消费端,然后启动消息提供者,然后在启动消费端,这个时候消费者就可以消费到离线的消息了

    当启动消息提供者发送消息的时候,看监控中心的变化,此时topicId标识与myTopic进行了关联了


    然后启动消费者消费者时,就可以消费离线的消息了

    代码入下

    /**
     * @Project: activemq
     * @description: 持久化订阅的接收离线消息方式
     * @author: sunkang
     * @create: 2018-12-08 22:56
     * @ModificationHistory who      when       What
     **/
    public class JMSPersistentTopicReceiver {
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
            Connection connection = null;
            try {
                connection = connectionFactory.createConnection();
                //设置自己的身份id
                connection.setClientID("topicId");
                connection.start();
                Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
                //创建目的地
                Topic destination  = session.createTopic("myTopic");
                //创建持久化订阅
                MessageConsumer consumer  = session.createDurableSubscriber(destination,"topicId");
                TextMessage textMessagemessage = (TextMessage) consumer.receive();
                System.out.println(textMessagemessage.getText());
                session.commit();
                session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                if( connection !=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:(4)activemq的基本使用

          本文链接:https://www.haomeiwen.com/subject/exjghqtx.html