一、 消息的自定义监听
- 我们一般就是在代码中编写while循环。进行consumer.nextDelivery 方法进行获取下一条消息,然后进行消费处理。
- 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是实际工作中最常用的使用方式。
data:image/s3,"s3://crabby-images/a28ee/a28ee848fc4a79b554ba7b16f057b8c8567a60ac" alt=""
image.png
/**
* 自定义消息监听
* 生产者
*
* @author yangHX
* createTime 2019/3/21 23:02
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.save";
String msg = "Hello RabbitMQ Consumer Message";
for (int i = 5; i > 0; i--) {
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
}
}
/**
* 自定义消息监听 消费者
*
* @author yangHX
* createTime 2019/3/21 23:05
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
/**
* 自定义消息监听
*
* @author yangHX
* createTime 2019/3/21 23:09
*/
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
/**
* @param consumerTag 消费标记
* @param envelope 里面重要消息
* @param properties 配置信息
* @param body 消息体
* @throws IOException ex
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("___________consumer message____________________");
System.err.println("consumerTag : " + consumerTag);
System.err.println("envelope : " + envelope);
System.err.println("properties : " + properties);
System.err.println("body : " + new String(body));
;
}
}
网友评论