美文网首页
工作队列之轮询分发

工作队列之轮询分发

作者: 寂静的春天1988 | 来源:发表于2019-04-07 14:10 被阅读0次

    直接上代码
    生产者1

    package com.demo.controller;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.demo.util.RabbitMQ;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class RabbitMQSendMq1 {
        //2、工作队列之轮询分发
        /**
         *    生产者1
         * @throws IOException
         * @throws TimeoutException
         * @throws InterruptedException 
         */
        public void sendMq1() throws IOException, TimeoutException, InterruptedException {
            //创建连接
            Connection  connection=RabbitMQ.getConnection();
            //得到通道
            Channel channel =connection.createChannel();
            //得到队列
            channel.queueDeclare("queue1", false, false, false, null);
            
            for (int i = 0; i < 20; i++) {
                String msg="hello world"+i;
                channel.basicPublish("", "queue1", null, msg.getBytes());
                Thread.sleep(i*10);
            }
            channel.close();
            connection.close();
        }
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            RabbitMQSendMq1 c=new RabbitMQSendMq1();
            c.sendMq1();
        }
    }
    
    

    消费者1

    package com.demo.controller;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.demo.util.RabbitMQ;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class RabbitMQGetMq1 {
        /**
         *    消费者1
         * @throws IOException
         * @throws TimeoutException
         */
        public void getMq1() throws IOException, TimeoutException {
            //创建连接
            Connection  connection=RabbitMQ.getConnection();
            //得到通道
            Channel channel =connection.createChannel();
            //得到队列
            channel.queueDeclare("queue1", false, false, false, null);
            DefaultConsumer consumer=new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                    String msg=new String(body);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("1==="+msg);
                };
            };
            channel.basicConsume("queue1", true, consumer);
        }
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            RabbitMQGetMq1 c=new RabbitMQGetMq1();
            c.getMq1();
        }
    }
    
    package com.demo.controller;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.demo.util.RabbitMQ;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class RabbitMQGetMq2 {
        //2、工作队列之轮询分发
        /**
         *    消费者2
         * @throws IOException
         * @throws TimeoutException
         */
        public void getMq2() throws IOException, TimeoutException {
            //创建连接
            Connection  connection=RabbitMQ.getConnection();
            //得到通道
            Channel channel =connection.createChannel();
            //得到队列
            channel.queueDeclare("queue1", false, false, false, null);
            DefaultConsumer consumer=new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                    String msg=new String(body);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("2==="+msg);
                };
            };
            channel.basicConsume("queue1", true, consumer);
        }
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            RabbitMQGetMq2 c=new RabbitMQGetMq2();
            c.getMq2();
        }
    }
    

    上述代码运行可以发现,即使消费者二处理的时间更快,但是消费者二并没有消费更多的消息,而是你
    一个我一个的方式进行处理!

    相关文章

      网友评论

          本文标题:工作队列之轮询分发

          本文链接:https://www.haomeiwen.com/subject/abcbvqtx.html