RabbitMQ

作者: 叫我小码哥 | 来源:发表于2020-03-18 21:01 被阅读0次

    RabbitMQ简介

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。

    什么是MQ

    MQ 我们可以理解为消息队列,具有先进先出的特点。

    RabbitMQ特点

    1.解耦,新模块的引入,使其代码改动量最小。
    2.削峰,设置流量缓存池,使得后端的服务按照自身的吞吐量进行消费。
    3.异步,将非关联引用的链路异步优化并提升系统的吞吐能力。

    RabbitMQ模式

    1.Simple 简单队列:

    单个提供者,单个消费者 。提供者将消息发送到队列中,消费者从队列中获取消息。
    缺点:耦合性高,生产者和消费者一一对应,队列名变更,生产者和消费者需要同时变更。
    如图1所示:


    图1

    java 代码
    RabbitMQ依赖

    <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.7.0</version>
        </dependency>
    

    java链接MQ的工具类

    public class ConnectionUtils {
    
        /**
         * 获取MQ链接
         * @return
         */
        public static Connection getConnection() throws IOException, TimeoutException {
            //创建一个工场
            ConnectionFactory factory = new ConnectionFactory();
            //服务地址
            factory.setHost("127.0.0.1");
            //服务端口
            factory.setPort(5672);
            //vhost
            factory.setVirtualHost("/branch_virtual");
            //用户名
            factory.setUsername("admin");
            //密码
            factory.setPassword("admin");
            return factory.newConnection();
        }
    }
    
    

    服务发送方

    public class Provider {
        private static  final String QUEUE_NAME = "mqName";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection =  ConnectionUtils.getConnection();
            //从链接获取一个通道
            Channel channel = connection.createChannel();
            //创建队列声明
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            String msg= "hello resive";
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("send succeed !");
            channel.close();
            connection.close();
        }
    }
    

    服务接收方

    public class Receive {
        private static  final String QUEUE_NAME = "mqName";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("receive msh :"+msg);
                }
            };
            //监听队列
            channel.basicConsume(QUEUE_NAME,true,consumer);
        }
    }
    
    1.Work queues 工作队列

    一个生产者把消息生产到队列中,一个或者多个消费者进行消费。

    图2
    java代码(轮询分发)
    消费者往队列发送消息
    缺点:autoAck=true自动确认模式一旦rabbitmq将消息分发给消费者,就会从内存中删除。如果删除正在执行的消费者,就会出现消息丢失的现象。
    public class Provider {
        private static  final String QUEUE_NAME = "work_queue";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            //从链接获取一个通道
            Channel channel = connection.createChannel();
            //创建队列声明
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            for(int i=0;i<61;i++){
                String msg= "this is +"+i+"+ msg";
                System.out.println(msg);
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
                Thread.sleep(100);
            }
            channel.close();
            connection.close();
        }
    }
    

    消费者1

    public class Receive {
        private static  final String QUEUE_NAME = "work_queue";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer 1 :"+msg);
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            boolean autoAck = true;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
       }
    }
    

    消费者2

    public class ReceiveTwo {
        private static  final String QUEUE_NAME = "work_queue";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer 2 :"+msg);
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            boolean autoAck = true;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
       }
    }
    
    3.fairdipatch 公平分发

    一个生产者多个消费者模式下的情况,将消费者改为手动回执的形式。
    java代码(公平分发)
    首先在提供者者中设置basicQos=1,然后在消费者中设置basicAck方法和
    autoAck=false手动模式如果一个消费者挂了,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息告诉rabbitmq已处理完成,这时rabbitmq会删除内存中的消息。
    消息应答默认是打开的 false。

    public class Provider {
        private static  final String QUEUE_NAME = "work_queue";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            //从链接获取一个通道
            Channel channel = connection.createChannel();
            //创建队列声明
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            /**
             * 每个消费者 发送确认之前,消息队列不发送下一个消息到消费者.
             * 限制发送给通哟个消费者不得超过一条数据。
             */
    
    
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            for(int i=0;i<61;i++){
                String msg= "this is +"+i+"+ msg";
                System.out.println(msg);
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
                Thread.sleep(100);
            }
            channel.close();
            connection.close();
        }
    }
    
    public class Receive {
        private static  final String QUEUE_NAME = "work_queue";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer 1 :"+msg);
                    //手动回执
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
       }
    }
    
    public class ReceiveTwo {
        private static  final String QUEUE_NAME = "work_queue";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer 2 :"+msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
       }
    }
    
    4.订阅模式

    1.一个生产者,多个消费者。
    2.每个消费者都有自己的队列。
    3.生产者都没有直接把消息发送队列,而是发送到交换价
    4.每个队列都要绑定到交换机上
    5生产者发送消息,进过交换机到达队列,就能实现一个消息被多个消费者消费。

    图3
    public class Provider {
        private static  final String EXCHANGE_NAME = "exchange_fanout";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            //从链接获取一个通道
            Channel channel = connection.createChannel();
            /**
             * 声明交换机
             * fanout 分发
             */
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            String msg = "订阅模式";
            channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
            System.out.println("消费者已发送消息,发送内容: "+msg);
            channel.close();
            connection.close();
        }
    }
    
    public class ReceiveTwo {
        private static  final String EXCHANGE_NAME = "exchange_fanout_send_sms";
        private static  final String FACTOR_NAME = "exchange_fanout";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(EXCHANGE_NAME,false,false,false,null);
            //绑定队列到交换机
            channel.queueBind(EXCHANGE_NAME,FACTOR_NAME,"");
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer 2 :"+msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            boolean autoAck = false;
            channel.basicConsume(EXCHANGE_NAME,autoAck,consumer);
    
        }
    }
    
    public class Receive {
        private static  final String EXCHANGE_NAME = "exchange_fanout_send_email";
        private static  final String FACTOR_NAME = "exchange_fanout";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(EXCHANGE_NAME,false,false,false,null);
            //绑定队列到交换机
            channel.queueBind(EXCHANGE_NAME,FACTOR_NAME,"");
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer 1 :"+msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            boolean autoAck = false;
            channel.basicConsume(EXCHANGE_NAME,autoAck,consumer);
    
       }
    }
    
    5.路由模式
    图4
    public class Provider {
        private static  final String EXCHANGE_NAME = "echange_direct";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            //从链接获取一个通道
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
            //创建队列声明
    
           String msg = "hello diret";
           String routingKey="error";
           channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
           System.out.println("direct send :"+msg);
            channel.close();
            connection.close();
        }
    }
    
    public class Receive {
        private static  final String EXCHANGE_NAME = "echange_direct";
        private static  final String QUEUE_NAME = "direct_queue";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
    
    
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.basicQos(1);
    
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer 1 :"+msg);
                     channel.basicAck(envelope.getDeliveryTag(),false);
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            boolean autoAck = true;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
       }
    }
    
    public class ReceiveTwo {
        private static  final String EXCHANGE_NAME = "echange_direct";
        private static  final String QUEUE_NAME = "direct_queue";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
    
    
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.basicQos(1);
    
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                   channel.basicAck(envelope.getDeliveryTag(),false);
                    System.out.println("comsumer 2 :"+msg);
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            boolean autoAck = true;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
        }
    }
    
    6.top exchange

    将路由和某个模式匹配。


    图5
    public class Provider {
        private static  final String EXCHANGE_NAME = "exchange_topic";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            //从链接获取一个通道
            Channel channel = connection.createChannel();
            //创建队列声明
    
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
            String msg = "this is msg...";
            channel.basicPublish(EXCHANGE_NAME,"data.select",null,msg.getBytes());
    
            System.out.println("msg is send......");
            channel.close();
            connection.close();
        }
    }
    
    public class Receive {
        private static  final String EXCHANGE_NAME = "exchange_topic";
        private static  final String QUEUE_NAME = "queue_topic";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"data.insert");
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer 1 :"+msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            boolean autoAck = true;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
       }
    }
    
    public class ReceiveTwo {
        private static  final String EXCHANGE_NAME = "exchange_topic";
        private static  final String QUEUE_NAME = "queue_topic_two";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"data.#");
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer 2 :"+msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            boolean autoAck = true;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
        }
    }
    

    这就是RabbitMQ常见的5种模式,及其代码演示。

    RabbitMQ消息确认机制(事务+confirm)

    RabbitMQ为我们提供了两种方式:
    方式一:通过AMQP事务机制实现,这也是从AMQP协议层面提供的解决方案;
    方式二:通过将channel设置成confirm模式来实现;

    AMQP模式:

    该模式吞吐量较低。使用的是同步的模式。主要是用channel.txSelect()开启事务,使用channel.txRollback();回滚事务。代码如下所示:

    /**
     * 使用事务的模式来确定消息有没有到达Rabbit服务器
     */
    public class Provider {
        private static  final String QUEUE_NAME = "queue_tx";
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtils.getConnection();
            //从链接获取一个通道
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String msg = "tx  msg";
            try {
                //开启事务
                channel.txSelect();
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
                System.out.println("this is msg send");
                channel.txCommit();
            }catch (Exception e){
                //事务回滚
                channel.txRollback();
                System.out.println("send msg txRollBack");
            }
            channel.close();
            connection.close();
    
        }
    }
    
    public class Receive {
        private static  final String QUEUE_NAME = "queue_tx";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer  :"+msg);
                }
            });
    
        }
    }
    
    Confirm模式:

    生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
    Confirm使用的是异步的模式。使用channel,confirmSelect()开启事务。
    模式主要分为三种
    1.普通发送:waitForConfirms()
    2.批量发 waitForConfirms()
    3.异步,提送回调。
    普通发送代码如下所示:

    /**
     * 普通模式
     */
    public class Provider {
        private static  final String QUEUE_NAME = "queue_confirm_pt";
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            //将channel设置为普通模式
            channel.confirmSelect();
            String msg = "普通模式";
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            if(!channel.waitForConfirms()){
                System.out.println("send failed");
            }else{
                System.out.println("send succeed");
            }
            channel.close();
            connection.close();
    
        }
    }
    
    public class Receive {
        private static  final String QUEUE_NAME = "queue_confirm_pt";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer  :"+msg);
                }
            });
    
        }
    }
    

    批量发送相对于普通模式的优点是效率高,缺点是一条失败全部失败。 代码如下所示:

    public class Provider2 {
        private static  final String QUEUE_NAME = "queue_confirm_pt_2";
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            //将channel设置为普通模式
            channel.confirmSelect();
            String msg = "普通模式";
            //批量发送
            for(int i=0;i<10;i++){
                String data = msg+" "+i;
                channel.basicPublish("",QUEUE_NAME,null,data.getBytes());
            }
    
    
            if(!channel.waitForConfirms()){
                System.out.println("send failed");
            }else{
                System.out.println("send succeed");
            }
            channel.close();
            connection.close();
    
        }
    }
    
    public class Receive {
        private static  final String QUEUE_NAME = "queue_confirm_pt_2";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer  :"+msg);
                }
            });
    
        }
    }
    

    异步监听模式,效率高,单个的失败不会影响其它数据的发送。 代码如下所示:

    /**
     * 异步回调模式
     */
    public class Provider {
        private static  final String QUEUE_NAME = "queue_confirm_pt_3";
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            //将channel设置为普通模式
            channel.confirmSelect();
    
            final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
    
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    if(b){
                        System.out.println("handle ACK succeed");
                        confirmSet.headSet(l+1).clear();
                    }else{
                        System.out.println("handle ACK false");
                        confirmSet.remove(l);
                    }
                }
    
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    if(b){
                        System.out.println("handle ACK succeed");
                        confirmSet.headSet(l+1).clear();
                    }else{
                        System.out.println("handle ACK false");
                        confirmSet.remove(l);
                    }
                }
            });
            String msg = "回调模式";
            while(true){
                long data = channel.getNextPublishSeqNo();
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
                confirmSet.add(data);
            }
    
        }
    }
    
    public class Receive {
        private static  final String QUEUE_NAME = "queue_confirm_pt_3";
        public static void main(String[] args) throws Exception{
            Connection connection =  ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    String msg = new String(body,"utf-8");
                    System.out.println("comsumer  :"+msg);
                }
            });
    
        }
    }
    

    相关文章

      网友评论

          本文标题:RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/csxcyhtx.html