我们发现轮询分发会有一个比较大的缺点,就是没有充分的利用资源,即使一个消费者消费完毕也只能等待其他消费者不能继续消费。
一定要将消息应答改为手动应答!
生产者1
package com.demo.workFair;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.util.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class RabbitMQSendMq1 {
//2、工作队列之公平分发
/**
* 生产者1
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
public void sendMq1() throws IOException, TimeoutException, InterruptedException {
//创建连接
Connection connection=RabbitMQ.getConnection();
//得到通道
Channel channel =connection.createChannel();
//得到队列
channel.queueDeclare("queue3", false, false, false, null);
//每次最多只发送一个消息
channel.basicQos(1);
for (int i = 0; i < 20; i++) {
String msg="hello world"+i;
channel.basicPublish("", "queue3", null, msg.getBytes());
System.out.println(msg);
Thread.sleep(i*10);
}
channel.close();
connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
RabbitMQSendMq1 c=new RabbitMQSendMq1();
c.sendMq1();
}
}
消费者1
package com.demo.workFair;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.util.RabbitMQ;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RabbitMQGetMq1 {
/**
* 消费者1
* @throws IOException
* @throws TimeoutException
*/
public void getMq1() throws IOException, TimeoutException {
//创建连接
Connection connection=RabbitMQ.getConnection();
//得到通道
Channel channel =connection.createChannel();
//得到队列
channel.queueDeclare("queue3", false, false, false, null);
//每次最多只发送一个消息
channel.basicQos(1);
DefaultConsumer consumer=new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body);
try {
Thread.sleep(2000);
System.out.println("消费者1"+msg);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//手动消息回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
};
//自动应答改成false
channel.basicConsume("queue3", false, consumer);
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
RabbitMQGetMq1 c=new RabbitMQGetMq1();
c.getMq1();
}
}
消费者2
package com.demo.workFair;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.util.RabbitMQ;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RabbitMQGetMq2 {
//2、工作队列之轮询分发
/**
* 消费者2
* @throws IOException
* @throws TimeoutException
*/
public void getMq2() throws IOException, TimeoutException {
//创建连接
Connection connection=RabbitMQ.getConnection();
//得到通道
Channel channel =connection.createChannel();
//得到队列
channel.queueDeclare("queue3", false, false, false, null);
//每次最多只发送一个消息
channel.basicQos(1);
DefaultConsumer consumer=new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body);
try {
System.out.println("消费者2"+msg);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//手动消息回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
};
//自动应答改成false
channel.basicConsume("queue3", false, consumer);
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
RabbitMQGetMq2 c=new RabbitMQGetMq2();
c.getMq2();
}
}
启动后会明显发现消费者2处理了更多的消息。
网友评论