美文网首页
第三章-高级特性3:消息自定义监听

第三章-高级特性3:消息自定义监听

作者: asfafjwefhfuer | 来源:发表于2019-03-21 23:37 被阅读0次

一、 消息的自定义监听

  1. 我们一般就是在代码中编写while循环。进行consumer.nextDelivery 方法进行获取下一条消息,然后进行消费处理。
  2. 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是实际工作中最常用的使用方式。
image.png

/**
* 自定义消息监听
* 生产者
*
* @author yangHX
* createTime  2019/3/21 23:02
*/
public class Producer {

   public static void main(String[] args) throws IOException, TimeoutException {
       ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
       Connection connection = connectionFactory.newConnection();
       Channel channel = connection.createChannel();


       String exchangeName = "test_consumer_exchange";
       String routingKey = "consumer.save";

       String msg = "Hello RabbitMQ Consumer Message";

       for (int i = 5; i > 0; i--) {
           channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
       }
   }



/**
* 自定义消息监听 消费者
*
* @author yangHX
* createTime  2019/3/21 23:05
*/
public class Consumer {

   public static void main(String[] args) throws IOException, TimeoutException {
       ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
       Connection connection = connectionFactory.newConnection();
       Channel channel = connection.createChannel();

       String exchangeName = "test_consumer_exchange";
       String routingKey = "consumer.#";
       String queueName = "test_consumer_queue";

       channel.exchangeDeclare(exchangeName, "topic", true, false, null);
       channel.queueDeclare(queueName, true, false, false, null);
       channel.queueBind(queueName, exchangeName, routingKey);

       channel.basicConsume(queueName, true, new MyConsumer(channel));

   }
}





/**
* 自定义消息监听
*
* @author yangHX
* createTime  2019/3/21 23:09
*/
public class MyConsumer extends DefaultConsumer {


   public MyConsumer(Channel channel) {
       super(channel);
   }


   /**
    * @param consumerTag 消费标记
    * @param envelope    里面重要消息
    * @param properties  配置信息
    * @param body        消息体
    * @throws IOException ex
    */
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
       System.err.println("___________consumer message____________________");
       System.err.println("consumerTag        : " + consumerTag);
       System.err.println("envelope           : " + envelope);
       System.err.println("properties         : " + properties);
       System.err.println("body               : " + new String(body));

       ;
   }
}



相关文章

网友评论

      本文标题:第三章-高级特性3:消息自定义监听

      本文链接:https://www.haomeiwen.com/subject/itahvqtx.html