前提
服务器 IP
192.168.0.102
192.168.0.106
为了保证机器之间网络互通,建议关闭防火墙:
systemctl stop firewalld.service
1. Static 静态集群部署
在 broker 配置中添加其他的 broker 地址
<networkConnectors>
<networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616)"/>
</networkConnectors>
1.1. 修改配置文件
修改 192.168.0.102 上的 /var/activemq/conf/activemq.xml,在<broker></broker>
标签中添加以下代码
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.0.106:61616)"/>
</networkConnectors>
修改 192.168.0.106 上的 /var/activemq/conf/activemq.xml,在<broker></broker>
标签中添加以下代码
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.0.102:61616)"/>
</networkConnectors>
1.2. 验证
启动所有 broker
1.2.1. web管理界面验证
在 web 管理界面的 NetWork 中可以查看到远程连接
192.168.0.106 有 192.168.0.102 的连接
i192.168.0.102 有 192.168.0.106 的连接
1.2.2. 实际验证
将生产者与消费者分别连接到不同的节点上,看生产者发送的消息,在消费者中能不能收到。
将生产者连接到 192.168.0.102
public class Producer {
public static void main(String[] args) {
// 生产者用192.168.0.102
String brokerUrl = "failover:(tcp://192.168.0.102:61616)?initialReconnectDelay=100";
new ProducerThread(brokerUrl, "queue1").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
// 2、创建连接对象md
conn = connectionFactory.createConnection();
conn.start();
// 3、创建会话
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建点对点发送的目标
Destination destination = session.createQueue(destinationUrl);
// 5、创建生产者消息
MessageProducer producer = session.createProducer(destination);
// 设置生产者的模式,有两种可选 持久化 / 不持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、创建一条文本消息
String text = "Hello world!";
TextMessage message = session.createTextMessage(text);
for (int i = 0; i < 1; i++) {
// 7、发送消息
producer.send(message);
}
// 8、 关闭连接
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
将消费者连接到 192.168.0.106
public class ConsumerNetowork {
public static void main(String[] args) throws InterruptedException {
// 消费者用192.168.0.106
String brokerUrl = "failover:(tcp://192.168.0.106:61616)?initialReconnectDelay=100";
ConsumerThread queue1 = new ConsumerThread(brokerUrl, "queue1");
queue1.start();
queue1.join();
}
}
class ConsumerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ConsumerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
MessageConsumer consumer;
try {
// brokerURL http://activemq.apache.org/connection-configuration-uri.html
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
// 2、创建连接对象
conn = connectionFactory.createConnection();
conn.start(); // 一定要启动
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建点对点接收的目标,queue - 点对点
Destination destination = session.createQueue(destinationUrl);
// 5、创建消费者消息 http://activemq.apache.org/destination-options.html
consumer = session.createConsumer(destination);
// 6、接收消息
consumer.setMessageListener(message -> {
try {
if (message instanceof TextMessage) {
System.out.println("收到文本消息:" + ((TextMessage) message).getText());
} else {
System.out.println(message);
}
} catch (JMSException e) {
e.printStackTrace();
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
打开消费者,然后用生产者发送到 192.168.0.102,可以看到 192.168.0.106 上的消费者可以接收到数据
INFO | Successfully connected to tcp://192.168.0.106:61616
收到文本消息:Hello world!
2. Dynamic 自动发现集群部署方
ActiveMQ 通过组播方式将自己的信息发送出去,接收到的信息的机器再来连接这个发送源。默认情况下,ActiveMQ 发送的是机器名,可以通过配置修改成发送IP地址。注意机器间的网络。
2.1. 修改配置文件
修改每台机器上的 /var/activemq/conf/activemq.xml,在<broker></broker>
标签中添加以下代码
<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
修改 transportConnector,增加 discoveryUri 属性,并添加 publishedAddressPolicy
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default">
<publishedAddressPolicy>
<publishedAddressPolicy publishedHostStrategy="IPADDRESS"></publishedAddressPolicy>
</publishedAddressPolicy>
</transportConnector>
2.2. 验证
和上面静态的一样
网友评论