一、RibbitMQ的基础介绍
1. 为什么要使用MQ
2. 与其他MQ的区别
- ActiveMQ:使用Java开发,遵循JMS规范,使用方便,支持多种协议。但是有丢失消息的风险并且速度较慢
- RabbitMQ:使用Erlang开发(用于解决高并发的问题),可以解决并发问题。但是只支持AMQP协议且不能动态扩展
二、RabbitMQ的安装
1. 安装Erlang环境(这一步参照博客 <a>https://www.jianshu.com/p/27197d58e94c</a>)
-
安装阿里的yum源(我在安装的时候下载速度很慢,所以这边使用阿里的yum源来安装erlang)
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo yum-y install make gcc gcc-c++kernel-devel m4 ncurses-devel openssl-devel java-devel unixODBC-devel
-
安装erlang的yum源
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -Uvh erlang-solutions-1.0-1.noarch.rpm rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
-
然后就可以直接安装erlang了
yum -y install erlang
-
通过下面的命令就可以查看是否安装完成了
erl
2. 安装RibbitMQ
-
下载Rabbit的yum源(下载下来的时候名字很乱,改个名字)
wget https://bintray.com/rabbitmq/rpm/download_file?file_path=rabbitmq-server%2Fv3.7.x%2Fel%2F7%2Fnoarch%2Frabbitmq-server-3.7.14-1.el7.noarch.rpm mv download_file\?file_path\=rabbitmq-server%2Fv3.7.x%2Fel%2F7%2Fnoarch%2Frabbitmq-server-3.7.14-1.el7.noarch.rpm rabbitmq-server-3.7.14-el7.noarch.rpm
-
安装
yum -y install rabbitmq-server-3.7.14-el7.noarch.rpm
- 如果报有依赖需要解决,就直接使用yum下载这个依赖就好了
-
启动服务
rabbitmq-server start
-
后台启动
rabbitmq-server -datached
- 启动后使用amqp协议,默认在5672端口
三、RabbitMQ初步使用
1. 搭建管理平台
-
初步搭建没有任何插件,我们使用下面的命令下载并启用RabbitMQ的管理地址
rabbitmq-plugins enable rabbitmq_management
-
现在就可以访问该节点的15672端口使用
guest/guest
来登陆管理界面-
如果不是在localhost下访问,我们还需要修改
/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.14/ebin/rabbit.app
文件,将{loopback_users, [<<”guest”>>]}
改为{loopback_users, []}
{default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_user_tags, [administrator]}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, {loopback_users, []}, {password_hashing_module, rabbit_password_hashing_sha256}, {server_properties, []},
再重启就OK了
-
Virtural Host 用于区分不同业务,每个VH都是独立的,互不影响的。不同的团队用不同的VH,相互隔离
-
2. 点对点简单队列
点对点简单队列:一个生产者投递消息给队列,只允许一个消费者进行消费,(如果存在消费者集群,则会均摊消费,使用取模算法)每个消息只会消费一次
生产者生产的消息直接投递给队列服务器,然后队列服务器直接推送或消费者自行拉取消息
-
ACK应答模式
- 自动应答:当消费者收到消息后,不论是否处理,消费者都会自动应答消费。
- 手动应答:消费者在代码里显式的回复ACK
-
导入依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> </dependency>
-
封装一个链接工具
public class MQConnectionUtils { /** * 创建新的链接 * @return */ public static Connection connect() throws IOException, TimeoutException { //创建链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置链接参数 connectionFactory.setHost("192.168.3.203"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); return connectionFactory.newConnection(); } }
-
生产者
public class Producer { /**队列名称*/ private static final String QUEUE_NAME = "libi_QUEUE"; public static void main(String[] args) throws IOException, TimeoutException { //建立链接 Connection connection = MQConnectionUtils.connect(); //创建通道 Channel channel = connection.createChannel(); //创建一个队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //创建消息 String message = "Libi_Message"; //发送消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); //关闭通道和链接 channel.close(); connection.close(); System.out.println("消息投递成功!"); } }
-
消费者
public class Consumer { /**队列名称*/ private static final String QUEUE_NAME = "libi_QUEUE"; public static void main(String[] args) throws IOException, TimeoutException { //建立链接 Connection connection = MQConnectionUtils.connect(); //创建通道 Channel channel = connection.createChannel(); //消费者关联一个队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ //使用匿名内部类重写获取消息的方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg = new String(body, "UTF-8"); System.out.println("活动生产者消息:"+msg); } }; //设置应答模式,true表示自动应答,false表示手动应答 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); //关闭通道和链接 channel.close(); connection.close(); System.out.println("消息消费成功!"); } }
3. 公平队列(需要自己在代码里实现)
-
均摊消费的缺点:当消费者处理消息的能力不一致时,如果还是均摊处理信息,则会造成资源浪费(对消费慢点节点不公平),需要实现"能者多劳"
-
公平队列的实现思路(BaseQos方法):当有n个消费者在上一条消息还没有处理完成时(还没有发送ACK),消息队列就不会发送下一条消息给它,给另外一个消费者。在生产者通过如下代码开启Qos
channel.basicQos(n);
-
如果这时消费者在代码里忘记应答了,那么就会陷入阻塞
4.发布订阅模式
生产者投递消息给交换机,交换机根据路由策略转发到不同的队列服务器中,队列服务器再给消费者进行消费
-
交换机策略
-
Direct:直接交换机,一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个
routing_key
,当消息被发送的时候,需要指定一个binding_key
,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key
也是支持应用到多个队列中的。就是说直接交换机可以更具生产者的
routing_key
和消费者的binding_key
进行匹配,只有一样才会转发这个消息 -
Fanout:扇形交换机,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
-
Topic:主题交换机,发送到主题交换机上的消息需要携带指定规则的
routing_key
,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。主题交换机的
routing_key
需要有一定的规则,交换机和队列的binding_key
需要采用*.#.*.....
的格式,每个部分用.
分开,其中:-
*
表示一个单词 -
#
表示任意数量(零个或多个)单词。
假设有一条消息的
routing_key
为fast.rabbit.white
,那么带有这样binding_key
的几个队列都会接收这条消息: -
-
-
Handler:首都交换机,首部交换机是忽略
routing_key
的一种路由方式。路由器和交换机路由的规则是通过Headers
信息来交换的,这个有点像HTTP
的Headers
。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash
的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash
的内容匹配上的时候,消息就会被写入队列。 -
生产者(在发送消息的时候传入exchange的参数)
/** * 使用Fanout类型的交换机,交换器转给发全部的队列 */ public class Producer { //交换机名称 static final String EXCHANGE_NAME = "fanout_destination"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MQConnectionUtils.connect(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg = "my_fanout_meg"; //发送消息(路由策略为空串) channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); channel.close(); connection.close(); } }
-
消费者
public class EmailConsumer { private static String QUEUE_NAME = "Email_Queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MQConnectionUtils.connect(); Channel channel = connection.createChannel(); //消费者声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //消费者绑定交换机 channel.queueBind(QUEUE_NAME, Producer.EXCHANGE_NAME,""); //监听消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("邮件消费者:" + msg); } }; channel.basicConsume(QUEUE_NAME, defaultConsumer); } }
消费者的chanel和connection没有关闭,可以多启动几个,就会发现所有的消费者都可以收到生产者传入的信息
网友评论