这里博主推荐大家阅读由朱忠华先生编写的<<RabbitMq实战指南>>,这里有详细的客户端开发接口的说明,例如com.rabbotmq.client包的使用
运行流程
生产者
-
生产者连接到RabbitMq Broker,建立一个连接即程序里的Connection,开启一个信道
ConnectionFactory factory = new ConnectionFactory(); // "guest"/"guest" by default, limited to localhost connections factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); Connection conn = factory.newConnection(); Channel channel = conn.createChannel();
-
生产者声明一个交换器,并设置相关属性,这里声明交换器的类型,是否持久化等,很重要哦,可以自行查看api
(交换器也可以转发消息给别的交换器)
channel.exchangeDeclare(exchangeName, "direct", true);
-
生产者声明一个队列并设置属性,比如排他性,是否持久化,是否过期删除等
String queueName = channel.queueDeclare().getQueue(); or channel.queueBind(queueName, exchangeName, routingKey);
-
生产者通过路由键将交换器和队列进行绑定起来
channel.queueBind(queueName, exchangeName, routingKey)
-
生产者发送信息到RabbitMq Broker,其中包含路由键、交换器信息
byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
-
对应交换器根据收到的路由键匹配发送的队列`(重点type: topic、direct)
-
找到后,把消息存入队列
-
找不到,会退给生产者
-
关闭信道,关闭连接
channel.close(); conn.close();
消费者
-
消费者连接到RabbitMq Broker,建立一个连接即程序里的Connection,开启一个信道
-
消费者向对应队列请求消费消息,
-
等待RabbitMq Broker回应并投递相应的队列消息给消费者,消费者然后进行回调处理
(回调在轮询或者是阻塞里)
-
消费者完成回调,确认消息已经被成功消费
(这里确认消息可以是自动应答,也可手动,建议是手动)
boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); //。。。。。 // 手动应答 acknowledge receipt of the message //Delivery Tag 用来标识信道中投递的消息 //第二个参数是requeue 如果是true表示通知RabbitMq把这个消息重新存入队列并发送给下一个消费者,反之删除 channel.basicAck(deliveryTag, false); } });
-
RabbitMq 删除对应已经被消费者确认消费的消息
-
关闭信道
-
关闭连接
重点
- 以上过程不是绝对的,已经表明
- Connection连接就是一条TCP连接,在实际项目中不要多次创建Connection,他是可以复用的,类似NIO。因为建立TCP连接开销很大。
- Channel信道是建立在Connection上虚拟连接,RabbitMq处理的每条AMQP指令都是通过信道完成。
- 每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMq确保了每个线程的私密性,就像独立的连接一样。
- Channel不建议在线程间贡献,不然很酸爽
- Connection可以复用不代表项目中只能创建一个,可以根据实际情况创建多个Connection,并让信道均摊到这些Connection上。
网友评论