建议大家如果没有看前一篇文章的时候,还是看一看第一篇文章,因为上篇文章的确把很多的概念都讲解的比较清楚。我发现有很多东西在单独使用rabbitmq是做不了的,例如自定义message投递的id,所以我希望快速的把这几篇介绍的博文写完,然后进入springboot的整合篇,但是我不建议新手一上来就开始使用springboot的整合,就想我在群里面听到的,不知道channel为何物更别提其他的概念了,只有一个稳扎稳打的基础在往高级的地方学习的时候才不费力。
一、简单工作队列
image.png我想大概这种模式的应用场景也就剩下了应用层面的解耦了吧,话不多话,下面直接用代码展示
二、生产者代码:
public class Producer {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException{
final Connection conn = ConnUtils.getConn();
final Channel channel = conn.createChannel();
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null);
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 这个目前在单独使用rabbitmq的时候没有办法找到自定义这个消息标识的办法,但是在和springboot整合之后会提供这样的方法
System.out.println(multiple);
System.out.println("wtf 需要这么热吗:::::"+deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("啊哈哈,你被拒绝了……");
}
});
// 这个地方也可以搞一个线程来进行发送
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 ".getBytes());
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +1".getBytes());
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +2".getBytes());
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +3".getBytes());
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +4".getBytes());
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +5".getBytes());
channel.close();
conn.close();
}
}
三、两个消费者(只需要把代码拷贝一份就可以了)
public class Consumer01 {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = ConnUtils.getConn();
final Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
System.out.println("Recv001"+"message == "+new String(body,"utf-8"));
channel.basicAck(deliveryTag,false);
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
先启动两个消费者,因为消息太少,如果先启动生产者,在启动消费者,一个消费者立马就消费完了。
四、结果分析
image.pngimage.png
我们发现两个消费者总是已奇偶的形式出现的,加入两个消费者的消费能力不一样,消费者1消费能力比较高,但是以这种模式的话,那么整个系统的消费能力的上线就有比较弱的消费者2来决定了。所以下面介绍一种公平分发模式:公平指的是能者多劳
我们在channel申明的下面加一行代码:我们分别设置consumer1的消费能力为3,consumer2的消费者能力为1
/**
* prefetchCount:告诉MQ不要同时给一个消费者推送超过prefetchCount个消息,
* 即一点prefetchCount个消息没有应答,该消费者就会发生阻塞
* global:指的是该设置是针对该consumer还是针对channel级别
*/
channel.basicQos(3,false);
下面我们在观察结果:
image.png
image.png
我们可以看到奇偶的模式不见了,而且消费者1的吞吐量是大于消费者2的
本节到这里就结束了,有很多的介绍希望大家多去看看前面的文章。
网友评论