一段时间没有更新了,今天来划划水吧!今天准备盘盘 rabbitmq ,首先我们得知道它是个啥!
就不百度百科了,说我理解的吧,rabbitmq是一套开源的消息队列服务,它的同类型产品有Kafka (apache的), ActiveMQ, RocketMQ (阿里的)等。当然这些产品都有自己的特点,没有谁好谁坏,如何选型视场景而定。这里盘rabbitmq,因为中小型企业用的多。
那什么是消息队列呢?我们可以把消息队列比作是一个存放消息的容器,这个容器以队列的形式呈现。队列嘛,跟食堂排队打饭一样,先排队的先打饭。而“消息”指的是在两台计算机间传送的数据单位。比方说你给别人在qq上发送一条数据,这个数据就是一个消息。
同步处理你不禁要问,这玩意有啥用?官方回答是给分布式系统解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。可怕,还是完全不知道是干啥的吧。大佬们都喜欢拿这么一张图扯。
异步处理上面这个图呢就是同步处理的意思(不要被他左上角的字蒙骗。)意思就是你在一个网站上注册一个账号的过程,这个过程包含三件事,注册信息写入数据库、给你邮箱发送注册成功的信息, 给你手机发送短信。发短信必须在发完邮件之后执行。如果这三件事做完后才给你浏览器发送注册成功的页面时,这个过程你会等150ms。你可能觉得这个时间已经很快了,丝毫不影响啥,可勤奋的程序员们老想着把这个时间再缩短,他们认为这还是太慢了,因为可能还有其他情况耗费一些时间,比如网络再故意延迟一会了,那可就让人难受了。下面这个图呢就是能再优化这个时间的异步处理啦。
这个图呢把发邮件和短信做成并行的,同时开始执行,就是说发短信不用等发完邮件才执行。
接下来说解耦, 下面这个图就是两个系统耦合了,库存系统直接调用订单系统的接口。这样订单系统接口一变,也要去改库存系统的代码。要是这两个系统分别是两拨人去开发,一方随便改改接口,说不定两拨人能打起来。
耦合
下面就是解耦场景,库存系统直接跟消息队列打交道,要是订单系统随便改接口代码,消息队列会打死他的,放心!就不需要库存系统这帮人出手了,你说要是库存系统和消息队列吵起来了咋整,放心,消息队列是别人牛逼的人写的,它俩吵起来,八成都会是库存系统做错了!
解耦
流量削锋一般在秒杀,团抢活动这些场景出现。双十一都不陌生,突然在这天访问量剧增。使用消息队列来抗住。
image
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
异步消息,解耦,流量削锋这些消息队列的作用说完了(当然还有作用,自行百度吧,大佬们),该盘代码了。
使用之前呢需要安装rabbitmq 这里不细说,因为比较简单。给个windows上安装的博客 : windows上rabbitmq安装参考
),唯一注意的是使用rabbitmq需要erlang语言环境,就像java应用需要jdk环境一样。玩linux的敲几句命令就完事了,就不用给教程了吧(手动滑稽)。
接下来会慢慢更新 rabbitmq 五种模式的测试代码。源码存放于junan的码云仓库
第一种 简单队列
简单队列
简单队列存在一个生产者,一个消费者,一个队列。
首先需要建立普通maven项目,导入下依赖,只需要这一个依赖即可。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
然后需要个connection工具类,它作用嘛用于和rabbitmq的建立连接。
package com.junan.rabbitmqTest.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <p>
* rabbitmq 工具类产生 rabbitmq 连接
* </p>
*
* @author junan
* @version 1.0.0
* @since 19-5-29
*/
public class ConnectionsUtil {
//设置 rabbitmq 服务ip地址
private static String host = "localhost";
//设置 rabbitmq 服务端口
private static Integer port = 5672;
//设置 rabbitmq 服务登录用户名
private static String username = "admin";
//设置 rabbitmq 服务虚拟主机名
private static String virtualHost = "/test";
//设置 rabbitmq 服务登录密码
private static String password = "chenjunan";
//通过这个方法获取连接
public static Connection getConnection(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
try {
return factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
}
使用这个工具类是对应修改static属性即可。然后建立生产者。
package com.junan.rabbitmqTest.simple;
import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <p>
* 简单消息队列生产者
* </p>
*
* @author junan
* @version 1.0.0
* @since 19-5-29
*/
public class Producer {
//给队列取的名字
private static final String QUEUE_NAME = "rabbitmq_simple";
public static void main(String[] args) {
//从工具类获取连接
Connection connection = ConnectionsUtil.getConnection();
try {
//从连接获取通道
Channel channel = connection.createChannel();
//声明队列(具体参数请查api)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//从通道发送 字节消息
channel.basicPublish("", QUEUE_NAME, null, "hello rabbitmq".getBytes());
//关闭通道和连接
channel.close();
connection.close();
System.out.println("<== 已发送一条消息! ==>");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
接下来是消费者。
package com.junan.rabbitmqTest.simple;
import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p>
* 简单消息队列消费者
* </p>
*
* @author junan
* @version 1.0.0
* @since 19-5-29
*/
public class Consumer {
//给队列取的名字
private static final String QUEUE_NAME = "rabbitmq_simple";
public static void main(String[] args) {
//从工具类获取连接
Connection connection = ConnectionsUtil.getConnection();
try {
//从连接获取通道
Channel channel = connection.createChannel();
//声明队列(具体参数请查api)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//创建消费者(这里只是创建)
DefaultConsumer consumer = new DefaultConsumer(channel) {
//重写这个方法:事件模型,当有消息传来时执行这个方法,相当于listener
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
//打印消息内容
System.out.println(new String(body));
}
};
//这里开始消费 ,需要把创建的消费者传入
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
先运行消费者Consumer的main方法。再运行生产者Producer中的main方法发送一条消息,看Consumer消费者能不能收到,反正我是收到了(嘻嘻)。
producer
consumer
以上代码基本使用了rabbitmq的简单模式用生产者给消费者发送了一条消息。
第二种 work 模式
work
这种模式和简单模式的区别就是可以有多个消费者进行消费,我这里使用两个消费者演示。具体参考代码
这次的生产者一共生产50条消息共两个消费者消费。
package com.junan.rabbitmqTest.work;
import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* <p>
* 工作消息队列生产者
* </p>
*
* @author junan
* @version 1.0.0
* @since 19-5-30
*/
@SuppressWarnings("all")
public class Producer {
//给队列取的名字
private static final String QUEUE_NAME = "rabbitmq_work";
public static void main(String[] args) {
Connection connection = ConnectionsUtil.getConnection();
Channel channel = null;
try {
channel = connection.createChannel();
channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null);
//发送50个消息
for (int i = 0; i < 50; i++) {
String msg = " hello " + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
Thread.sleep(100);
}
System.out.println("<========== 生产者已发送 50 条消息! ==========>");
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if(channel != null)
channel.close();
if(connection != null)
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
以下是两个消费者。
package com.junan.rabbitmqTest.work;
import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p>
* 工作队列消费者一号
* </p>
*
* @author junan
* @version 1.0.0
* @since 19-5-30
*/
@SuppressWarnings("all")
public class Consumer1 {
//给队列取的名字
private static final String QUEUE_NAME = "rabbitmq_work";
public static void main(String[] args) {
Connection connection = ConnectionsUtil.getConnection();
Channel channel = null;
try {
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
//重写这个方法:事件模型,当有消息传来时执行这个方法,相当于listener
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("<== consumer1 ==> " + new String(body));
try {
//这里每次循环休息200ms, 让两个消费者休息时间不同, 看他的运行结果
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println("<========== 消费者一号启动! ==========>");
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.junan.rabbitmqTest.work;
import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p>
* 工作队列消费者二号
* </p>
*
* @author junan
* @version 1.0.0
* @since 19-5-30
*/
@SuppressWarnings("all")
public class Consumer2 {
//给队列取的名字
private static final String QUEUE_NAME = "rabbitmq_work";
public static void main(String[] args) {
Connection connection = ConnectionsUtil.getConnection();
Channel channel = null;
try {
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("<== consumer2 ==> " + new String(body, "utf-8"));
try {
//这里每次循环休息400ms, 让两个消费者休息时间不同, 看他的运行结果
Thread.sleep(400);
} catch (Exception e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println("<========== 消费者二号启动! ==========>");
} catch (Exception e) {
e.printStackTrace();
}
}
}
先启动两个消费者等待消息,然后启动生产者给消息队列发送消息。
结果如下:
消费者1号
消费者2号
可以看出我们虽然设置了这两个消费者每次消费休息不同时长。可是从结果看,这两个消费者采用轮询的方式消费这些消息。也就是消息队列给这两个消费者消息很公平,一人一个的给。不管你忙碌还是空闲。这种方式相比简单队列能减轻一部分压力。
第三种 公平分发
这种方式能需要消费者手动确认收到消息,rabbitmq才会给他分发下一条消息。注意对比work模式的代码
生产者 需要对channel设置每次值发送一条消息给消费者
package com.junan.rabbitmqTest.workFair;
import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* <p>
* 公平分发队列生产者
* </p>
*
* @author junan
* @version 1.0.0
* @since 19-5-30
*/
@SuppressWarnings("all")
public class Producer {
//给队列取的名字
private static final String QUEUE_NAME = "rabbitmq_work";
public static void main(String[] args) {
Connection connection = ConnectionsUtil.getConnection();
Channel channel = null;
try {
channel = connection.createChannel();
//限制每次发送一条消息给消费者
channel.basicQos(1);
channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null);
//发送50个消息
for (int i = 0; i < 50; i++) {
String msg = " hello " + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
Thread.sleep(100);
}
System.out.println("<========== 生产者已发送 50 条消息! ==========>");
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if(channel != null)
channel.close();
if(connection != null)
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
消费者1号 (二号类似就不粘代码了,) 注意设置限制每次发送一条消息给消费者和设置autoAck为false(basicConsume这个方法的第二个参数)
package com.junan.rabbitmqTest.workFair;
import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* <p>
* 公平分发消费者一号
* </p>
*
* @author junan
* @version 1.0.0
* @since 19-5-30
*/
@SuppressWarnings("all")
public class Consumer1 {
//给队列取的名字
private static final String QUEUE_NAME = "rabbitmq_work";
public static void main(String[] args) {
Connection connection = ConnectionsUtil.getConnection();
try {
final Channel channel = connection.createChannel();
//限制每次发送一条消息给消费者
channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
//重写这个方法:事件模型,当有消息传来时执行这个方法,相当于listener
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("<== consumer1 ==> " + new String(body));
try {
//这里每次循环休息200ms, 让两个消费者休息时间不同, 看他的运行结果
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
//设置返回的确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//设置不自动确认消息,当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
channel.basicConsume(QUEUE_NAME, false, consumer);
System.out.println("<========== 消费者一号启动! ==========>");
} catch (Exception e) {
e.printStackTrace();
}
}
}
接下来看结果
消费者1号
消费者2号
可以看到这个效果就比较满意了,谁的空闲时间多就能多拿到一下消息。能充分利用消费者的能力。
ps: autoAck :这是一个boolean参数,等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
---------------------------------------------------------------------- 有时间继续更新。
网友评论