美文网首页消息中间件
【ActiveMQ】集群部署

【ActiveMQ】集群部署

作者: 佐蓝Gogoing | 来源:发表于2019-06-24 00:13 被阅读1次

    前提


    服务器 IP
    192.168.0.102
    192.168.0.106

    为了保证机器之间网络互通,建议关闭防火墙:systemctl stop firewalld.service

    1. Static 静态集群部署

    在 broker 配置中添加其他的 broker 地址

    <networkConnectors>
        <networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616)"/>
    </networkConnectors>
    

    1.1. 修改配置文件

    修改 192.168.0.102 上的 /var/activemq/conf/activemq.xml,在<broker></broker>标签中添加以下代码

        <networkConnectors>
            <networkConnector uri="static:(tcp://192.168.0.106:61616)"/>
        </networkConnectors>
    

    修改 192.168.0.106 上的 /var/activemq/conf/activemq.xml,在<broker></broker>标签中添加以下代码

        <networkConnectors>
            <networkConnector uri="static:(tcp://192.168.0.102:61616)"/>
        </networkConnectors>
    

    1.2. 验证

    启动所有 broker

    1.2.1. web管理界面验证

    在 web 管理界面的 NetWork 中可以查看到远程连接

    192.168.0.106 有 192.168.0.102 的连接

    i

    192.168.0.102 有 192.168.0.106 的连接

    1.2.2. 实际验证

    将生产者与消费者分别连接到不同的节点上,看生产者发送的消息,在消费者中能不能收到。

    将生产者连接到 192.168.0.102

    public class Producer {
        public static void main(String[] args) {
            // 生产者用192.168.0.102
            String brokerUrl = "failover:(tcp://192.168.0.102:61616)?initialReconnectDelay=100";
            new ProducerThread(brokerUrl, "queue1").start();
        }
    
        static class ProducerThread extends Thread {
            String brokerUrl;
            String destinationUrl;
    
            public ProducerThread(String brokerUrl, String destinationUrl) {
                this.brokerUrl = brokerUrl;
                this.destinationUrl = destinationUrl;
            }
    
            @Override
            public void run() {
                ActiveMQConnectionFactory connectionFactory;
                Connection conn;
                Session session;
    
                try {
                    // 1、创建连接工厂
                    connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
                    // 2、创建连接对象md
                    conn = connectionFactory.createConnection();
                    conn.start();
                    // 3、创建会话
                    session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    // 4、创建点对点发送的目标
                     Destination destination = session.createQueue(destinationUrl);
                    // 5、创建生产者消息
                    MessageProducer producer = session.createProducer(destination);
                    // 设置生产者的模式,有两种可选 持久化 / 不持久化
                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                    // 6、创建一条文本消息
                    String text = "Hello world!";
                    TextMessage message = session.createTextMessage(text);
                    for (int i = 0; i < 1; i++) {
                        // 7、发送消息
                        producer.send(message);
                    }
                    // 8、 关闭连接
                    session.close();
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    将消费者连接到 192.168.0.106

    public class ConsumerNetowork {
        public static void main(String[] args) throws InterruptedException {
            // 消费者用192.168.0.106
            String brokerUrl = "failover:(tcp://192.168.0.106:61616)?initialReconnectDelay=100";
            ConsumerThread queue1 = new ConsumerThread(brokerUrl, "queue1");
            queue1.start();
            queue1.join();
        }
    }
    
    class ConsumerThread extends Thread {
    
        String brokerUrl;
        String destinationUrl;
    
        public ConsumerThread(String brokerUrl, String destinationUrl) {
            this.brokerUrl = brokerUrl;
            this.destinationUrl = destinationUrl;
        }
    
        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory;
            Connection conn;
            Session session;
            MessageConsumer consumer;
    
            try {
                // brokerURL http://activemq.apache.org/connection-configuration-uri.html
                // 1、创建连接工厂
                connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
                // 2、创建连接对象
                conn = connectionFactory.createConnection();
                conn.start(); // 一定要启动
                // 3、创建会话(可以创建一个或者多个session)
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 4、创建点对点接收的目标,queue - 点对点
                Destination destination = session.createQueue(destinationUrl);
    
                // 5、创建消费者消息 http://activemq.apache.org/destination-options.html
                consumer = session.createConsumer(destination);
    
                // 6、接收消息
                consumer.setMessageListener(message -> {
                    try {
                        if (message instanceof TextMessage) {
                            System.out.println("收到文本消息:" + ((TextMessage) message).getText());
                        } else {
                            System.out.println(message);
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                });
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    打开消费者,然后用生产者发送到 192.168.0.102,可以看到 192.168.0.106 上的消费者可以接收到数据

     INFO | Successfully connected to tcp://192.168.0.106:61616
    收到文本消息:Hello world!
    

    2. Dynamic 自动发现集群部署方

    ActiveMQ 通过组播方式将自己的信息发送出去,接收到的信息的机器再来连接这个发送源。默认情况下,ActiveMQ 发送的是机器名,可以通过配置修改成发送IP地址。注意机器间的网络

    2.1. 修改配置文件

    修改每台机器上的 /var/activemq/conf/activemq.xml,在<broker></broker>标签中添加以下代码

    <networkConnectors>
        <networkConnector uri="multicast://default"/>
    </networkConnectors>
    

    修改 transportConnector,增加 discoveryUri 属性,并添加 publishedAddressPolicy

    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default">
         <publishedAddressPolicy>
             <publishedAddressPolicy publishedHostStrategy="IPADDRESS"></publishedAddressPolicy>
         </publishedAddressPolicy>
    </transportConnector>
    

    2.2. 验证

    和上面静态的一样

    相关文章

      网友评论

        本文标题:【ActiveMQ】集群部署

        本文链接:https://www.haomeiwen.com/subject/buqgqctx.html