美文网首页
工作队列之公平分发

工作队列之公平分发

作者: 寂静的春天1988 | 来源:发表于2019-04-07 15:15 被阅读0次

我们发现轮询分发会有一个比较大的缺点,就是没有充分的利用资源,即使一个消费者消费完毕也只能等待其他消费者不能继续消费。
一定要将消息应答改为手动应答!

生产者1

package com.demo.workFair;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.demo.util.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class RabbitMQSendMq1 {
    //2、工作队列之公平分发
    /**
     *    生产者1
     * @throws IOException
     * @throws TimeoutException
     * @throws InterruptedException 
     */
    public void sendMq1() throws IOException, TimeoutException, InterruptedException {
        //创建连接
        Connection  connection=RabbitMQ.getConnection();
        //得到通道
        Channel channel =connection.createChannel();
        //得到队列
        channel.queueDeclare("queue3", false, false, false, null);
        //每次最多只发送一个消息
        channel.basicQos(1);
        for (int i = 0; i < 20; i++) {
            String msg="hello world"+i;
            channel.basicPublish("", "queue3", null, msg.getBytes());
            System.out.println(msg);
            Thread.sleep(i*10);
        }
        channel.close();
        connection.close();
    }
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        RabbitMQSendMq1 c=new RabbitMQSendMq1();
        c.sendMq1();
    }
}

消费者1

package com.demo.workFair;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.demo.util.RabbitMQ;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RabbitMQGetMq1 {
    /**
     *    消费者1
     * @throws IOException
     * @throws TimeoutException
     */
    public void getMq1() throws IOException, TimeoutException {
        //创建连接
        Connection  connection=RabbitMQ.getConnection();
        //得到通道
        Channel channel =connection.createChannel();
        //得到队列
        channel.queueDeclare("queue3", false, false, false, null);
        //每次最多只发送一个消息
        channel.basicQos(1);
        DefaultConsumer consumer=new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                try {
                    Thread.sleep(2000);
                    System.out.println("消费者1"+msg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //手动消息回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
                
            };
        };
        //自动应答改成false
        channel.basicConsume("queue3", false, consumer);
    }
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        RabbitMQGetMq1 c=new RabbitMQGetMq1();
        c.getMq1();
    }
}

消费者2

package com.demo.workFair;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.demo.util.RabbitMQ;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RabbitMQGetMq2 {
    //2、工作队列之轮询分发
    /**
     *    消费者2
     * @throws IOException
     * @throws TimeoutException
     */
    public void getMq2() throws IOException, TimeoutException {
        //创建连接
        Connection  connection=RabbitMQ.getConnection();
        //得到通道
        Channel channel =connection.createChannel();
        //得到队列
        channel.queueDeclare("queue3", false, false, false, null);
        //每次最多只发送一个消息
        channel.basicQos(1);
        DefaultConsumer consumer=new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                try {
                    System.out.println("消费者2"+msg);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //手动消息回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
        };
        //自动应答改成false
        channel.basicConsume("queue3", false, consumer);
    }
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        RabbitMQGetMq2 c=new RabbitMQGetMq2();
        c.getMq2();
    }
}

启动后会明显发现消费者2处理了更多的消息。

相关文章

网友评论

      本文标题:工作队列之公平分发

      本文链接:https://www.haomeiwen.com/subject/uvrsiqtx.html