美文网首页
ActiveMQ(二)原生API

ActiveMQ(二)原生API

作者: 慕容鸿煊 | 来源:发表于2019-03-24 16:33 被阅读0次

    Maven配置

         <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.8.0</version>
            </dependency>
    

    生产者

    
    public class CdProducer {
    
        //默认连接用户名
        private static final String USERNAME
                = ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD
                = ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKEURL
                = "failover://tcp://58.87.114.150:61616";
        //发送的消息数量
        private static final int SENDNUM = 10;
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;
            Connection connection = null;
            Session session;
            Destination destination;
            MessageProducer messageProducer;
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
            try {
                connection = connectionFactory.createConnection();
                connection.start();
    
                session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("cd.queue,topic://cd.mark");
                //destination = session.createTopic("VirtualTopic.vtgroup");
                messageProducer = session.createProducer(destination);
                for(int i=0;i<SENDNUM;i++){
                    String msg = "CdProducer "+i+" "+System.currentTimeMillis();
                    TextMessage message = session.createTextMessage(msg);
                    System.out.println("发送消息:"+msg);
                    messageProducer.send(message);
                }
                session.commit();
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                if(connection!=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    
    }
    
    

    消费者 queue

    public class CdConsumerQueueA {
    
        private static final String USERNAME
                = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
        private static final String PASSWORD
                = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
        private static final String BROKEURL
                ="failover://tcp://58.87.114.150:61616";//默认连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;//连接工厂
            Connection connection = null;//连接
    
            Session session;//会话 接受或者发送消息的线程
            Destination destination;//消息的目的地
    
            MessageConsumer messageConsumer;//消息的消费者
    
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(CdConsumerQueueA.USERNAME,
                    CdConsumerQueueA.PASSWORD, CdConsumerQueueA.BROKEURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个连接HelloWorld的消息队列
                //destination = session.createTopic("HelloTopic8");
                destination = session.createQueue("cd.queue");
    
                //创建消息消费者
                messageConsumer = session.createConsumer(destination);
                messageConsumer.setMessageListener(new MessageListener() {
                    public void onMessage(Message message) {
                        try {
                            System.out.println("Accept msg : "
                                    +((TextMessage)message).getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
    
    
            } catch (JMSException e) {
                e.printStackTrace();
            }
    
        }
    }
    

    消费者 topic

    public class CdConsumerTopicA {
    
        private static final String USERNAME
                = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
        private static final String PASSWORD
                = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
        private static final String BROKEURL
                = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;//连接工厂
            Connection connection = null;//连接
    
            Session session;//会话 接受或者发送消息的线程
            Destination destination;//消息的目的地
    
            MessageConsumer messageConsumer;//消息的消费者
    
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(CdConsumerTopicA.USERNAME,
                    CdConsumerTopicA.PASSWORD, CdConsumerTopicA.BROKEURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个连接HelloWorld的消息队列
                //destination = session.createQueue("Consumer.A.VirtualTopic.vtgroup");
                destination = session.createTopic("cd.mark");
    
                //创建消息消费者
                messageConsumer = session.createConsumer(destination);
                messageConsumer.setMessageListener(new MessageListener() {
                    public void onMessage(Message message) {
                        try {
                            System.out.println("Accept msg : "
                                    +((TextMessage)message).getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
    
    
            } catch (JMSException e) {
                e.printStackTrace();
            }
    
        }
    }
    

    相关文章

      网友评论

          本文标题:ActiveMQ(二)原生API

          本文链接:https://www.haomeiwen.com/subject/lqvuvqtx.html