安装Erlang语言
RabbitMQ 是基于 Erlang 语言开发的,安装Erlang语言
百度网盘:
链接:https://pan.baidu.com/s/1yMiGsaN0V-50v4fL7TlL0A
提取码:29vx
1、otp_win64_18.1安装,配置path环境变量
2、cmd测试:erl
图片.png
安装RabbitMQ
百度网盘:
链接:https://pan.baidu.com/s/1yMiGsaN0V-50v4fL7TlL0A
提取码:29vx
1、rabbitmq-server-3.6.5.exe安装
2、运行cmd,配置插件:"E:\system\RabbitMQ\RabbitMQ\rabbitmq_server-3.6.5\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
3、重启:net stop RabbitMQ && net start RabbitMQ
4、访问:http://127.0.0.1:15672/
5、账号:guest 密码:guest
图片.png
模式
协议:与ActiveMQ不一样, Rabbitmq 使用的是一种叫做 AMQP 的协议来通信,这种模式可以解决复杂的业务需求
模式:与ActiveMQ不同,ActiveMQ消息放到队列等待消费者获取,RabbitMQ拿到
消息交给交换机,交换机通过策略决定发给哪个队列
fanout模板代码
1、pom.xml
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.3.1</version>
</dependency>
</dependencies>
2、判断RabbitMQ是否启动
package com.llhc;
import javax.swing.JOptionPane;
import cn.hutool.core.util.NetUtil;
public class RabbitMQUtil {
public static void main(String[] args) {
checkServer();
}
public static void checkServer() {
if(NetUtil.isUsableLocalPort(15672)) {
JOptionPane.showMessageDialog(null, "RabbitMQ 服务器未启动 ");
System.exit(1);
}
}
}
3、消息发送
package com.llhc;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生成者
*/
public class TestProducer {
public final static String EXCHANGE_NAME="fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtil.checkServer();
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (int i = 0; i < 100; i++) {
String message = "direct 消息 " +i;
//发送消息到队列中
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("发送消息: " + message);
}
//关闭通道和连接
channel.close();
connection.close();
}
}
4、消息接收
package com.llhc;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.hutool.core.util.RandomUtil;
public class TestDriectCustomer {
public final static String EXCHANGE_NAME="fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//为当前消费者取随机名
String name = "consumer-"+ RandomUtil.randomString(5);
//判断服务器是否启动
RabbitMQUtil.checkServer();
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//交换机声明(参数为:交换机名称;交换机类型)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//获取一个临时队列
String queueName = channel.queueDeclare().getQueue();
//队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println(name +" 等待接受消息");
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(name + " 接收到消息 '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}
direct模板代码
1、pom.xml
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.3.1</version>
</dependency>
</dependencies>
2、判断RabbitMQ是否启动
package com.llhc;
import javax.swing.JOptionPane;
import cn.hutool.core.util.NetUtil;
public class RabbitMQUtil {
public static void main(String[] args) {
checkServer();
}
public static void checkServer() {
if(NetUtil.isUsableLocalPort(15672)) {
JOptionPane.showMessageDialog(null, "RabbitMQ 服务器未启动 ");
System.exit(1);
}
}
}
3、生产者
package cn.how2j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生成者
*/
public class TestDriectProducer {
public final static String QUEUE_NAME="direct_queue";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtil.checkServer();
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
for (int i = 0; i < 100; i++) {
String message = "direct 消息 " +i;
//发送消息到队列中
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("发送消息: " + message);
}
//关闭通道和连接
channel.close();
connection.close();
}
}
4、接收者
package cn.how2j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.hutool.core.util.RandomUtil;
public class TestDriectCustomer {
private final static String QUEUE_NAME = "direct_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//为当前消费者取随机名
String name = "consumer-"+ RandomUtil.randomString(5);
//判断服务器是否启动
RabbitMQUtil.checkServer();
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
System.out.println(name +" 等待接受消息");
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(name + " 接收到消息 '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
topic模板代码
1、pom.xml
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.3.1</version>
</dependency>
</dependencies>
2、判断RabbitMQ是否启动
package com.llhc;
import javax.swing.JOptionPane;
import cn.hutool.core.util.NetUtil;
public class RabbitMQUtil {
public static void main(String[] args) {
checkServer();
}
public static void checkServer() {
if(NetUtil.isUsableLocalPort(15672)) {
JOptionPane.showMessageDialog(null, "RabbitMQ 服务器未启动 ");
System.exit(1);
}
}
}
3、生产者
package cn.how2j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生成者
*/
public class TestProducer {
public final static String EXCHANGE_NAME="topics_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtil.checkServer();
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String[] routing_keys = new String[] { "usa.news", "usa.weather",
"europe.news", "europe.weather" };
String[] messages = new String[] { "美国新闻", "美国天气",
"欧洲新闻", "欧洲天气" };
for (int i = 0; i < routing_keys.length; i++) {
String routingKey = routing_keys[i];
String message = messages[i];
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message
.getBytes());
System.out.printf("发送消息到路由:%s, 内容是: %s%n ", routingKey,message);
}
//关闭通道和连接
channel.close();
connection.close();
}
}
4、接收usa*
package cn.how2j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.hutool.core.util.RandomUtil;
public class TestCustomer4USA {
public final static String EXCHANGE_NAME="topics_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//为当前消费者取名称
String name = "consumer-usa";
//判断服务器是否启动
RabbitMQUtil.checkServer();
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//交换机声明(参数为:交换机名称;交换机类型)
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//获取一个临时队列
String queueName = channel.queueDeclare().getQueue();
//接受 USA 信息
channel.queueBind(queueName, EXCHANGE_NAME, "usa.*");
System.out.println(name +" 等待接受消息");
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(name + " 接收到消息 '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}
5、接收 *.news
package cn.how2j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.hutool.core.util.RandomUtil;
public class TestCustomer4News {
public final static String EXCHANGE_NAME="topics_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//为当前消费者取名称
String name = "consumer-news";
//判断服务器是否启动
RabbitMQUtil.checkServer();
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//交换机声明(参数为:交换机名称;交换机类型)
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//获取一个临时队列
String queueName = channel.queueDeclare().getQueue();
//接受 USA 信息
channel.queueBind(queueName, EXCHANGE_NAME, "*.news");
System.out.println(name +" 等待接受消息");
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(name + " 接收到消息 '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}
网友评论