美文网首页
linux下 activemq集群配置

linux下 activemq集群配置

作者: 老汉健身 | 来源:发表于2018-06-05 11:06 被阅读6次

    1.简述:回想老王打电话讲故事案例.

    2.优势:解耦,异步,横向扩展,顺序保障,安全可靠...

    3.JMS(java message service),是java平台中关于面向消息中间件的API,用于两个程序或分布式系统中发送消息,进行异步通信,是一种协议.AMQP与其类似,但可以跨平台跨语言.

    4.ActiveMQ配置集群

    ActiveMQ失效转移(failover),当其中一台消息服务器宕机时,能将消息转移到其他消息服务器上.

    transportOptions参数说明:

    randomize 默认为true,表示在uri连接时采用随机策略

    initialReconnectDelay 默认为10毫秒,表示第一次尝试连接之间等待的时间

    maxReconnectDelay 默认为30000毫秒,表示最长重连的时间间隔

    静态连接器配置(适合服务器数量比较少的情况)

    <networkConnectors>

            <networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>

    </networkConnectors>

    下面演示三台服务器的完美集群方案

    实践:

    1.在Linux下的/usr/local目录下新建activemqs的文件夹,并在该文件夹下继续创建一个共享数据的文件夹并命名为kahadb.

    将activemq的安装包解压并cp3份至activemqs文件夹下并重命名为amq-a,amq-b,amq-c,并依次配置conf目录下的activemq.xml和jetty.xml配置文件

    2.配置activemq.xml文件:

    一共三个地方需要修改,第一个是共享文件夹(kahadb)地址,第二是注释掉不需要的uri,第三是添加networkConnector.值得注意的是,在配置非伪分布式mq时,共享文件夹地址需要重新建立所有服务器都能访问到的共享文件夹,具体实现可以百度.所有需要修改的地方如下图:

    3.配置jetty.xml文件

    将端口号修改为自身对应的端口号即可,以便后台管理页面访问查看数据.

    4.至此,所有服务器已配完,依次启动三台服务器即可. ./amq-a/bin/activemq start      ./amq-b/bin/activemq start   ./amq-c/bin/activemq start

    启动成功后通过ps -ef | grep -i activemq 和 netstat -anp | grep 61616 /61617/61618分别取查看是否启动成功和端口状态,确认无误后,分别3个服务器的防火墙端口,以便demo和后台管理页面可以成功访问.以amq-a为例:

    开启防火墙61616端口:/sbin/iptables -I INPUT -p tcp --dport 61616 -j ACCEPT +回车 iptables-save +回车 ;

    开启后台管理页面的8161端口: /sbin/iptables -I INPUT -p tcp --dport 8161 -j ACCEPT +回车 iptables-save +回车 ;

    5.编写demo进行测试,下面贴出demo中的消息生产者代码和消费者代码:

    /**

    * 消息生产者

    * @auther xpc 252645816@qq.com

    * @date 2018/6/4 15:55

    */

    public class AppProducer {

        private static final String uri ="failover:(tcp://192.168.119.129:61617,tcp://192.168.119.129:61618)?randomize=true";

    private static final String queueName ="queue-test";

    public static void main(String[] args) {

            try {

                //1.创建连接工厂

                ConnectionFactory connectionFactory =new ActiveMQConnectionFactory(uri);

    //2.创建连接

                Connection connection =connectionFactory.createConnection();

    //3.启动连接

                connection.start();

    //4.创建会话

                Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

    //5.创建一个目标

                Destination destination =session.createQueue(queueName);

    //6.创建一个生产者

                MessageProducer producer =session.createProducer(destination);

    for (int i =0;i <100;i++) {

                    //7.创建消息

                    TextMessage textMessage =session.createTextMessage("test" +i);

    //8.发布消息

                    producer.send(textMessage);

    System.out.println("发送消息test"+i);

    }

                //9.关闭连接

                connection.close();

    } catch (JMSException e) {

                e.printStackTrace();

    }

    }

    /**

    * 消息消费者

    * @auther xpc 252645816@qq.com

    * @date 2018/6/4 18:51

    */

    public class AppConsumer {

        private static final String uri ="failover:(tcp://192.168.119.129:61616,tcp://192.168.119.129:61617,tcp://192.168.119.129:61618)?randomize=true";

    private static final String queueName ="queue-test";

    public static void main(String[] args) {

            try {

                //1.创建连接工厂

                ConnectionFactory connectionFactory =new ActiveMQConnectionFactory(uri);

    //2.创建连接

                Connection connection =connectionFactory.createConnection();

    //3.启动连接

                connection.start();

    //4.创建会话

                Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

    //5.创建一个目标

                Destination destination =session.createQueue(queueName);

    //6.创建一个消费者

                MessageConsumer consumer =session.createConsumer(destination);

    //7.创建一个监听器

                consumer.setMessageListener(new MessageListener() {

                    public void onMessage(Message message) {

                        TextMessage textMessage =(TextMessage) message;

    try {

                            System.out.println("接收消息"+textMessage.getText());

    } catch (JMSException e) {

                            e.printStackTrace();

    }

    }

                });

    } catch (JMSException e) {

                e.printStackTrace();

    }

    }

    }

    6.分别运行消息生产者,消息消费者,并通过后台管理页面(linux的ip+端口号:8161...,用户名和密码默认都为admin)查看详情.

    相关文章

      网友评论

          本文标题:linux下 activemq集群配置

          本文链接:https://www.haomeiwen.com/subject/katysftx.html