美文网首页消息中间件
【ActiveMQ】主从数据共享部署

【ActiveMQ】主从数据共享部署

作者: 佐蓝Gogoing | 来源:发表于2019-06-23 23:28 被阅读0次

    主从共享部署分为Shared FileSystem Master-SlaveShared DatabaseMaster-Slave,数据源分别为文件系统(如 NAS)和数据库,这里以 MySQL 数据库为例。

    1. 添加数据库驱动

    为了让 ActiveMQ 支持数据库,先要在${ACTICEMQ_HOME}/lib/extra添加数据库的驱动包

    [root@localhost extra]# pwd
    /var/activemq/lib/extra
    [root@localhost extra]# ls
    mqtt-client-1.15.jar  mysql-connector-java-5.1.47.jar
    

    2. 修改配置文件

    开打 ${ACTICEMQ_HOME}/conf/activemq.xml
    在 broker 节点 添加 persistent="true" 使其支持持久化

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" persistent="true" dataDirectory="${activemq.data}" schedulerSupport="true">
    

    修改持久化适配器为数据库

    <persistenceAdapter>
        <jdbcPersistenceAdapter dataSource="#mysql-ds" useDatabaseLock="false" transactionIsolation="4"/>
    </persistenceAdapter>
    

    在 broker 节点后添加数据源

    <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://192.168.0.100:3306/test_activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value="root123"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean>
    

    3. 启动多个 ActiveMQ

    启动多个配置文件相同的 ActiveMQ,多个 ActiveMQ 开启时,同一时间只有一个服务器可以拿到 MySQL 的连接。

    4. 代码

    消费者

    public class ConsumerFailover {
        public static void main(String[] args) throws InterruptedException {
            // 非failover的公共参数配置通过nested.*,例如 failover:(...)?nested.wireFormat.maxInactivityDuration=1000
            // ?randomize=false 随机选择,默认是顺序
            // 指定优先切换 failover:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)?priorityBackup=true&priorityURIs=tcp://local1:61616,tcp://local2:61616
            // maxReconnectDelay重连的最大间隔时间(毫秒)
            String brokerUrl = "failover:(tcp://192.168.0.102:61616,tcp://192.168.0.106:61616)?initialReconnectDelay=100";
            ConsumerThread queue1 = new ConsumerThread(brokerUrl, "queue1");
            queue1.start();
            queue1.join();
        }
    }
    
    class ConsumerThread extends Thread {
    
        String brokerUrl;
        String destinationUrl;
    
        public ConsumerThread(String brokerUrl, String destinationUrl) {
            this.brokerUrl = brokerUrl;
            this.destinationUrl = destinationUrl;
        }
    
        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory;
            Connection conn;
            Session session;
            MessageConsumer consumer;
    
            try {
                // brokerURL http://activemq.apache.org/connection-configuration-uri.html
                // 1、创建连接工厂
                connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
                // 2、创建连接对象
                conn = connectionFactory.createConnection();
                conn.start(); // 一定要启动
                // 3、创建会话(可以创建一个或者多个session)
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 4、创建点对点接收的目标,queue - 点对点
                Destination destination = session.createQueue(destinationUrl);
    
                // 5、创建消费者消息 http://activemq.apache.org/destination-options.html
                consumer = session.createConsumer(destination);
    
                // 6、接收消息
                consumer.setMessageListener(message -> {
                    try {
                        if (message instanceof TextMessage) {
                            System.out.println("收到文本消息:" + ((TextMessage) message).getText());
                        } else {
                            System.out.println(message);
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                });
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    启动后客户端会按顺序去连接服务器,连接上了第一个

     INFO | Successfully connected to tcp://192.168.0.106:61616
    

    生产者

    public class Producer {
        public static void main(String[] args) {
            // 添加两个中间件,可以实现故障转移
            String brokerUrl = "failover:(tcp://192.168.0.102:61616,tcp://192.168.0.106:61616)?initialReconnectDelay=100";
            new ProducerThread(brokerUrl, "queue1").start();
        }
    
        static class ProducerThread extends Thread {
            String brokerUrl;
            String destinationUrl;
    
            public ProducerThread(String brokerUrl, String destinationUrl) {
                this.brokerUrl = brokerUrl;
                this.destinationUrl = destinationUrl;
            }
    
            @Override
            public void run() {
                ActiveMQConnectionFactory connectionFactory;
                Connection conn;
                Session session;
    
                try {
                    // 1、创建连接工厂
                    connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
                    // 2、创建连接对象md
                    conn = connectionFactory.createConnection();
                    conn.start();
                    // 3、创建会话
                    session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    // 4、创建点对点发送的目标
                     Destination destination = session.createQueue(destinationUrl);
                    // 5、创建生产者消息
                    MessageProducer producer = session.createProducer(destination);
                    // 设置生产者的模式,有两种可选 持久化 / 不持久化
                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                    // 6、创建一条文本消息
                    String text = "Hello world!";
                    TextMessage message = session.createTextMessage(text);
                    for (int i = 0; i < 1; i++) {
                        // 7、发送消息
                        producer.send(message);
                    }
                    // 8、 关闭连接
                    session.close();
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    运行以下生产者的代码,可以看到消费者接收到了

     INFO | Successfully connected to tcp://192.168.0.106:61616
    收到文本消息:Hello world!
    

    5. 模拟故障

    通过手动关闭 ActiveMQ 模拟故障场景,消费者断开后连接到第二个服务器

    java.io.EOFException
        at java.io.DataInputStream.readInt(DataInputStream.java:392)
        at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
        at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
        at java.lang.Thread.run(Thread.java:748)
     INFO | Successfully connected to tcp://192.168.0.106:61616
    

    相关文章

      网友评论

        本文标题:【ActiveMQ】主从数据共享部署

        本文链接:https://www.haomeiwen.com/subject/gfgwqctx.html