美文网首页
集群代码实现

集群代码实现

作者: 长孙俊明 | 来源:发表于2019-10-22 14:08 被阅读0次

    amqp协议版本

    import org.apache.qpid.jms.JmsConnectionFactory;
    
    import javax.jms.*;
    
    public class Producer {
    
        public static void main(String[] args) {
            String protocol = "amqp://120.25.242.46:5672";
            protocol = "failover:(amqp://120.24.52.100:5672,amqp://120.79.71.22:5672,amqp://47.106.142.44:5672)";
            new ProducerThread(protocol, "queue1").start();
        }
    
        static class ProducerThread extends Thread {
            String brokerUrl;
            String destinationUrl;
            public ProducerThread(String brokerUrl, String destinationUrl) {
                this.brokerUrl = brokerUrl;
                this.destinationUrl = destinationUrl;
            }
    
            public void run() {
                JmsConnectionFactory connectionFactory;
                Connection conn;
                Session session;
                try {
                    // 1 创建连接工厂
                    connectionFactory = new JmsConnectionFactory(null, null, brokerUrl);
                    // 2 创建连接
                    conn = connectionFactory.createConnection();
                    conn.start();
                    // 3 创建会话
                    session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    // 4 创建消息发送目标
                    Destination destination = session.createQueue(destinationUrl);
                    // 5 用亩的地创建消息生产者
                    MessageProducer producer = session.createProducer(destination);
                    // 6 设置递送模式
                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                    producer.setPriority(7);
                    // 7 通过producer 发送消息
                    TextMessage textMessage = session.createTextMessage("11111111");
                    producer.send(textMessage);
                    session.close();
                    conn.close();
                } catch(Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    import org.apache.qpid.jms.JmsConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 简单消费者
     */
    // http://activemq.apache.org/consumer-features.html
    public class Consumer {
        public static void main(String[] args) {
            String protocol = "amqp://120.25.242.46:5672";
            protocol = "failover:(amqp://120.24.52.100:5672,amqp://120.79.71.22:5672,amqp://47.106.142.44:5672)";
            new ConsumerThread(protocol, "queue1").start();
        }
    }
    
    class ConsumerThread extends Thread {
    
        String brokerUrl;
        String destinationUrl;
    
        public ConsumerThread(String brokerUrl, String destinationUrl) {
            this.brokerUrl = brokerUrl;
            this.destinationUrl = destinationUrl;
        }
    
        @Override
        public void run() {
            JmsConnectionFactory connectionFactory;
            Connection conn;
            Session session;
            MessageConsumer consumer;
    
            try {
                // brokerURL
                // http://activemq.apache.org/connection-configuration-uri.html
                // 1、创建连接工厂
                connectionFactory = new JmsConnectionFactory(null, null, this.brokerUrl);
    
                // 2、创建连接对象
                conn = connectionFactory.createConnection();
                conn.start(); // 一定要启动
    
                // 3、创建会话(可以创建一个或者多个session)
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                // 4、创建消息消费目标(Topic or Queue)
                Destination destination = session.createQueue(destinationUrl);
    
                // 5、创建消息消费者 http://activemq.apache.org/destination-options.html
                consumer = session.createConsumer(destination);
    
                // 6、异步接收消息
                consumer.setMessageListener(new MessageListener() {
    
                    @Override
                    public void onMessage(Message message) {
                        if (message instanceof TextMessage) {
                            try {
                                System.out.println("Time: " + System.currentTimeMillis() + " 收到文本消息:"
                                        + ((TextMessage) message).getText());
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        else {
                            System.out.println("message=" + message);
                        }
                    }
                });
                try {
                    // 担心收不到消息就关闭了,先睡眠一秒
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                consumer.close();
                 session.close();
                 conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    <dependency>
                <groupId>org.apache.qpid</groupId>
                <artifactId>qpid-jms-client</artifactId>
                <version>0.37.0</version>
            </dependency>
            <dependency>
                <groupId>org.fusesource.mqtt-client</groupId>
                <artifactId>mqtt-client</artifactId>
                <version>1.12</version>
            </dependency>
    

    tcp协议版本

    package com.example.activemq.cluster_queue.tcp;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 简单生产者
     */
    public class Producer {
        public static void main(String[] args) {
            String protocol = "failover:(tcp://120.24.52.100:61616,tcp://120.79.71.22:61616,tcp://47.106.142.44:61616)";
            new ProducerThread(protocol, "queue3").start();
        }
    
        static class ProducerThread extends Thread {
            String brokerUrl;
            String destinationUrl;
    
            public ProducerThread(String brokerUrl, String destinationUrl) {
                this.brokerUrl = brokerUrl;
                this.destinationUrl = destinationUrl;
            }
    
            @Override
            public void run() {
                ActiveMQConnectionFactory connectionFactory;
                Connection conn;
                Session session;
    
                try {
                    // 1、创建连接工厂
                    connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
    
                    // 2、创建连接
                    conn = connectionFactory.createConnection();
                    conn.start(); // 一定要start
    
                    // 3、创建会话(可以创建一个或者多个session)
                    session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                    // 4、创建消息发送目标 (Topic or Queue)
                    Destination destination = session.createQueue(destinationUrl);
    
                    // 5、用目的地创建消息生产者
                    MessageProducer producer = session.createProducer(destination);
                    // 设置递送模式(持久化 / 不持久化)
                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                    producer.setPriority(7);
    
                    // 6、创建一条文本消息
                    String text = "Hello world! From: " + Thread.currentThread().getName() + " : "
                            + System.currentTimeMillis();
                    TextMessage message = session.createTextMessage(text);
    
                    // 7、通过producer 发送消息
                    System.out.println("Sent message: " + text);
                    producer.send(message);
    
                    // 8、 清理、关闭连接
                    session.close();
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    import javax.jms.Connection;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 简单消费者
     */
    // http://activemq.apache.org/failover-transport-reference.html
    public class FailoverConsumer {
        public static void main(String[] args) throws Exception {
            String protocol = "failover:(tcp://120.24.52.100:61616,tcp://120.79.71.22:61616,tcp://47.106.142.44:61616)";
            new ConsumerThread(protocol, "queue3").start();
            System.in.read();
        }
    }
    
    class ConsumerThread extends Thread {
    
        String brokerUrl;
        String destinationUrl;
    
        public ConsumerThread(String brokerUrl, String destinationUrl) {
            this.brokerUrl = brokerUrl;
            this.destinationUrl = destinationUrl;
        }
    
        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory;
            Connection conn;
            Session session;
            MessageConsumer consumer;
    
            try {
                // brokerURL
                // http://activemq.apache.org/connection-configuration-uri.html
                // 1、创建连接工厂
                connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
    
                // 2、创建连接对象
                conn = connectionFactory.createConnection();
                conn.start(); // 一定要启动
    
                // 3、创建会话(可以创建一个或者多个session)
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                // 4、创建消息消费目标(Topic or Queue)
                Destination destination = session.createQueue(destinationUrl);
    
                // 5、创建消息消费者 http://activemq.apache.org/destination-options.html
                consumer = session.createConsumer(destination);
    
                // 6、异步接收消息
                consumer.setMessageListener(new MessageListener() {
    
                    @Override
                    public void onMessage(Message message) {
                        if (message instanceof TextMessage) {
                            try {
                                System.out.println(
                                        Thread.currentThread().getName() + " 收到文本消息:" + ((TextMessage) message).getText());
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        } else {
                            System.out.println(message);
                        }
                    }
                });
    
                // consumer.close();
                // session.close();
                // conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    <dependency>
                <groupId>org.apache.qpid</groupId>
                <artifactId>qpid-jms-client</artifactId>
                <version>0.37.0</version>
            </dependency>
            <dependency>
                <groupId>org.fusesource.mqtt-client</groupId>
                <artifactId>mqtt-client</artifactId>
                <version>1.12</version>
            </dependency>
    

    相关文章

      网友评论

          本文标题:集群代码实现

          本文链接:https://www.haomeiwen.com/subject/tvsqvctx.html