1.简述:回想老王打电话讲故事案例.
2.优势:解耦,异步,横向扩展,顺序保障,安全可靠...
3.JMS(java message service),是java平台中关于面向消息中间件的API,用于两个程序或分布式系统中发送消息,进行异步通信,是一种协议.AMQP与其类似,但可以跨平台跨语言.
4.ActiveMQ配置集群
ActiveMQ失效转移(failover),当其中一台消息服务器宕机时,能将消息转移到其他消息服务器上.
transportOptions参数说明:
randomize 默认为true,表示在uri连接时采用随机策略
initialReconnectDelay 默认为10毫秒,表示第一次尝试连接之间等待的时间
maxReconnectDelay 默认为30000毫秒,表示最长重连的时间间隔
静态连接器配置(适合服务器数量比较少的情况)
<networkConnectors>
<networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>
下面演示三台服务器的完美集群方案
实践:
1.在Linux下的/usr/local目录下新建activemqs的文件夹,并在该文件夹下继续创建一个共享数据的文件夹并命名为kahadb.
将activemq的安装包解压并cp3份至activemqs文件夹下并重命名为amq-a,amq-b,amq-c,并依次配置conf目录下的activemq.xml和jetty.xml配置文件
2.配置activemq.xml文件:
一共三个地方需要修改,第一个是共享文件夹(kahadb)地址,第二是注释掉不需要的uri,第三是添加networkConnector.值得注意的是,在配置非伪分布式mq时,共享文件夹地址需要重新建立所有服务器都能访问到的共享文件夹,具体实现可以百度.所有需要修改的地方如下图:
3.配置jetty.xml文件
将端口号修改为自身对应的端口号即可,以便后台管理页面访问查看数据.
4.至此,所有服务器已配完,依次启动三台服务器即可. ./amq-a/bin/activemq start ./amq-b/bin/activemq start ./amq-c/bin/activemq start
启动成功后通过ps -ef | grep -i activemq 和 netstat -anp | grep 61616 /61617/61618分别取查看是否启动成功和端口状态,确认无误后,分别3个服务器的防火墙端口,以便demo和后台管理页面可以成功访问.以amq-a为例:
开启防火墙61616端口:/sbin/iptables -I INPUT -p tcp --dport 61616 -j ACCEPT +回车 iptables-save +回车 ;
开启后台管理页面的8161端口: /sbin/iptables -I INPUT -p tcp --dport 8161 -j ACCEPT +回车 iptables-save +回车 ;
5.编写demo进行测试,下面贴出demo中的消息生产者代码和消费者代码:
/**
* 消息生产者
* @auther xpc 252645816@qq.com
* @date 2018/6/4 15:55
*/
public class AppProducer {
private static final String uri ="failover:(tcp://192.168.119.129:61617,tcp://192.168.119.129:61618)?randomize=true";
private static final String queueName ="queue-test";
public static void main(String[] args) {
try {
//1.创建连接工厂
ConnectionFactory connectionFactory =new ActiveMQConnectionFactory(uri);
//2.创建连接
Connection connection =connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination =session.createQueue(queueName);
//6.创建一个生产者
MessageProducer producer =session.createProducer(destination);
for (int i =0;i <100;i++) {
//7.创建消息
TextMessage textMessage =session.createTextMessage("test" +i);
//8.发布消息
producer.send(textMessage);
System.out.println("发送消息test"+i);
}
//9.关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 消息消费者
* @auther xpc 252645816@qq.com
* @date 2018/6/4 18:51
*/
public class AppConsumer {
private static final String uri ="failover:(tcp://192.168.119.129:61616,tcp://192.168.119.129:61617,tcp://192.168.119.129:61618)?randomize=true";
private static final String queueName ="queue-test";
public static void main(String[] args) {
try {
//1.创建连接工厂
ConnectionFactory connectionFactory =new ActiveMQConnectionFactory(uri);
//2.创建连接
Connection connection =connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination =session.createQueue(queueName);
//6.创建一个消费者
MessageConsumer consumer =session.createConsumer(destination);
//7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage =(TextMessage) message;
try {
System.out.println("接收消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
6.分别运行消息生产者,消息消费者,并通过后台管理页面(linux的ip+端口号:8161...,用户名和密码默认都为admin)查看详情.
网友评论