RabbitMQ

作者: 叫我小码哥 | 来源:发表于2020-03-18 21:01 被阅读0次

RabbitMQ简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。

什么是MQ

MQ 我们可以理解为消息队列,具有先进先出的特点。

RabbitMQ特点

1.解耦,新模块的引入,使其代码改动量最小。
2.削峰,设置流量缓存池,使得后端的服务按照自身的吞吐量进行消费。
3.异步,将非关联引用的链路异步优化并提升系统的吞吐能力。

RabbitMQ模式

1.Simple 简单队列:

单个提供者,单个消费者 。提供者将消息发送到队列中,消费者从队列中获取消息。
缺点:耦合性高,生产者和消费者一一对应,队列名变更,生产者和消费者需要同时变更。
如图1所示:


图1

java 代码
RabbitMQ依赖

<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.7.0</version>
    </dependency>

java链接MQ的工具类

public class ConnectionUtils {

    /**
     * 获取MQ链接
     * @return
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        //创建一个工场
        ConnectionFactory factory = new ConnectionFactory();
        //服务地址
        factory.setHost("127.0.0.1");
        //服务端口
        factory.setPort(5672);
        //vhost
        factory.setVirtualHost("/branch_virtual");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");
        return factory.newConnection();
    }
}

服务发送方

public class Provider {
    private static  final String QUEUE_NAME = "mqName";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection =  ConnectionUtils.getConnection();
        //从链接获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        String msg= "hello resive";
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("send succeed !");
        channel.close();
        connection.close();
    }
}

服务接收方

public class Receive {
    private static  final String QUEUE_NAME = "mqName";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("receive msh :"+msg);
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}
1.Work queues 工作队列

一个生产者把消息生产到队列中,一个或者多个消费者进行消费。

图2
java代码(轮询分发)
消费者往队列发送消息
缺点:autoAck=true自动确认模式一旦rabbitmq将消息分发给消费者,就会从内存中删除。如果删除正在执行的消费者,就会出现消息丢失的现象。
public class Provider {
    private static  final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        //从链接获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        for(int i=0;i<61;i++){
            String msg= "this is +"+i+"+ msg";
            System.out.println(msg);
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            Thread.sleep(100);
        }
        channel.close();
        connection.close();
    }
}

消费者1

public class Receive {
    private static  final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer 1 :"+msg);
                try {
                    Thread.sleep(700);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
   }
}

消费者2

public class ReceiveTwo {
    private static  final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer 2 :"+msg);
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
   }
}
3.fairdipatch 公平分发

一个生产者多个消费者模式下的情况,将消费者改为手动回执的形式。
java代码(公平分发)
首先在提供者者中设置basicQos=1,然后在消费者中设置basicAck方法和
autoAck=false手动模式如果一个消费者挂了,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息告诉rabbitmq已处理完成,这时rabbitmq会删除内存中的消息。
消息应答默认是打开的 false。

public class Provider {
    private static  final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        //从链接获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        /**
         * 每个消费者 发送确认之前,消息队列不发送下一个消息到消费者.
         * 限制发送给通哟个消费者不得超过一条数据。
         */


        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        for(int i=0;i<61;i++){
            String msg= "this is +"+i+"+ msg";
            System.out.println(msg);
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            Thread.sleep(100);
        }
        channel.close();
        connection.close();
    }
}
public class Receive {
    private static  final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer 1 :"+msg);
                //手动回执
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(700);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
   }
}
public class ReceiveTwo {
    private static  final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer 2 :"+msg);
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
   }
}
4.订阅模式

1.一个生产者,多个消费者。
2.每个消费者都有自己的队列。
3.生产者都没有直接把消息发送队列,而是发送到交换价
4.每个队列都要绑定到交换机上
5生产者发送消息,进过交换机到达队列,就能实现一个消息被多个消费者消费。

图3
public class Provider {
    private static  final String EXCHANGE_NAME = "exchange_fanout";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        //从链接获取一个通道
        Channel channel = connection.createChannel();
        /**
         * 声明交换机
         * fanout 分发
         */
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        String msg = "订阅模式";
        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
        System.out.println("消费者已发送消息,发送内容: "+msg);
        channel.close();
        connection.close();
    }
}
public class ReceiveTwo {
    private static  final String EXCHANGE_NAME = "exchange_fanout_send_sms";
    private static  final String FACTOR_NAME = "exchange_fanout";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(EXCHANGE_NAME,false,false,false,null);
        //绑定队列到交换机
        channel.queueBind(EXCHANGE_NAME,FACTOR_NAME,"");

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer 2 :"+msg);
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(700);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        boolean autoAck = false;
        channel.basicConsume(EXCHANGE_NAME,autoAck,consumer);

    }
}
public class Receive {
    private static  final String EXCHANGE_NAME = "exchange_fanout_send_email";
    private static  final String FACTOR_NAME = "exchange_fanout";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(EXCHANGE_NAME,false,false,false,null);
        //绑定队列到交换机
        channel.queueBind(EXCHANGE_NAME,FACTOR_NAME,"");

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer 1 :"+msg);
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(700);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        boolean autoAck = false;
        channel.basicConsume(EXCHANGE_NAME,autoAck,consumer);

   }
}
5.路由模式
图4
public class Provider {
    private static  final String EXCHANGE_NAME = "echange_direct";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        //从链接获取一个通道
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //创建队列声明

       String msg = "hello diret";
       String routingKey="error";
       channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
       System.out.println("direct send :"+msg);
        channel.close();
        connection.close();
    }
}
public class Receive {
    private static  final String EXCHANGE_NAME = "echange_direct";
    private static  final String QUEUE_NAME = "direct_queue";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();



        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);

        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer 1 :"+msg);
                 channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(700);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
   }
}
public class ReceiveTwo {
    private static  final String EXCHANGE_NAME = "echange_direct";
    private static  final String QUEUE_NAME = "direct_queue";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();



        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);

        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
               channel.basicAck(envelope.getDeliveryTag(),false);
                System.out.println("comsumer 2 :"+msg);
                try {
                    Thread.sleep(700);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}
6.top exchange

将路由和某个模式匹配。


图5
public class Provider {
    private static  final String EXCHANGE_NAME = "exchange_topic";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        //从链接获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明

        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String msg = "this is msg...";
        channel.basicPublish(EXCHANGE_NAME,"data.select",null,msg.getBytes());

        System.out.println("msg is send......");
        channel.close();
        connection.close();
    }
}
public class Receive {
    private static  final String EXCHANGE_NAME = "exchange_topic";
    private static  final String QUEUE_NAME = "queue_topic";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"data.insert");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer 1 :"+msg);
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(700);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
   }
}
public class ReceiveTwo {
    private static  final String EXCHANGE_NAME = "exchange_topic";
    private static  final String QUEUE_NAME = "queue_topic_two";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"data.#");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer 2 :"+msg);
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(700);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

这就是RabbitMQ常见的5种模式,及其代码演示。

RabbitMQ消息确认机制(事务+confirm)

RabbitMQ为我们提供了两种方式:
方式一:通过AMQP事务机制实现,这也是从AMQP协议层面提供的解决方案;
方式二:通过将channel设置成confirm模式来实现;

AMQP模式:

该模式吞吐量较低。使用的是同步的模式。主要是用channel.txSelect()开启事务,使用channel.txRollback();回滚事务。代码如下所示:

/**
 * 使用事务的模式来确定消息有没有到达Rabbit服务器
 */
public class Provider {
    private static  final String QUEUE_NAME = "queue_tx";
    public static void main(String[] args) throws Exception {
        Connection connection =  ConnectionUtils.getConnection();
        //从链接获取一个通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String msg = "tx  msg";
        try {
            //开启事务
            channel.txSelect();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("this is msg send");
            channel.txCommit();
        }catch (Exception e){
            //事务回滚
            channel.txRollback();
            System.out.println("send msg txRollBack");
        }
        channel.close();
        connection.close();

    }
}
public class Receive {
    private static  final String QUEUE_NAME = "queue_tx";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer  :"+msg);
            }
        });

    }
}
Confirm模式:

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
Confirm使用的是异步的模式。使用channel,confirmSelect()开启事务。
模式主要分为三种
1.普通发送:waitForConfirms()
2.批量发 waitForConfirms()
3.异步,提送回调。
普通发送代码如下所示:

/**
 * 普通模式
 */
public class Provider {
    private static  final String QUEUE_NAME = "queue_confirm_pt";
    public static void main(String[] args) throws Exception {
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //将channel设置为普通模式
        channel.confirmSelect();
        String msg = "普通模式";
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        if(!channel.waitForConfirms()){
            System.out.println("send failed");
        }else{
            System.out.println("send succeed");
        }
        channel.close();
        connection.close();

    }
}
public class Receive {
    private static  final String QUEUE_NAME = "queue_confirm_pt";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer  :"+msg);
            }
        });

    }
}

批量发送相对于普通模式的优点是效率高,缺点是一条失败全部失败。 代码如下所示:

public class Provider2 {
    private static  final String QUEUE_NAME = "queue_confirm_pt_2";
    public static void main(String[] args) throws Exception {
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //将channel设置为普通模式
        channel.confirmSelect();
        String msg = "普通模式";
        //批量发送
        for(int i=0;i<10;i++){
            String data = msg+" "+i;
            channel.basicPublish("",QUEUE_NAME,null,data.getBytes());
        }


        if(!channel.waitForConfirms()){
            System.out.println("send failed");
        }else{
            System.out.println("send succeed");
        }
        channel.close();
        connection.close();

    }
}
public class Receive {
    private static  final String QUEUE_NAME = "queue_confirm_pt_2";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer  :"+msg);
            }
        });

    }
}

异步监听模式,效率高,单个的失败不会影响其它数据的发送。 代码如下所示:

/**
 * 异步回调模式
 */
public class Provider {
    private static  final String QUEUE_NAME = "queue_confirm_pt_3";
    public static void main(String[] args) throws Exception {
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //将channel设置为普通模式
        channel.confirmSelect();

        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long l, boolean b) throws IOException {
                if(b){
                    System.out.println("handle ACK succeed");
                    confirmSet.headSet(l+1).clear();
                }else{
                    System.out.println("handle ACK false");
                    confirmSet.remove(l);
                }
            }

            @Override
            public void handleNack(long l, boolean b) throws IOException {
                if(b){
                    System.out.println("handle ACK succeed");
                    confirmSet.headSet(l+1).clear();
                }else{
                    System.out.println("handle ACK false");
                    confirmSet.remove(l);
                }
            }
        });
        String msg = "回调模式";
        while(true){
            long data = channel.getNextPublishSeqNo();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            confirmSet.add(data);
        }

    }
}
public class Receive {
    private static  final String QUEUE_NAME = "queue_confirm_pt_3";
    public static void main(String[] args) throws Exception{
        Connection connection =  ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("comsumer  :"+msg);
            }
        });

    }
}

相关文章

网友评论

      本文标题:RabbitMQ

      本文链接:https://www.haomeiwen.com/subject/csxcyhtx.html