背景
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现
应用场景
- 异步处理
- 应用解耦(服务解耦)
API
下载rabbitMQ自己看
简单实战搭建
- 生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author create by yingjie.chen on 2018/3/27.
* @version 2018/3/27 14:54
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
//设置 RabbitMQ 地址
factory.setHost("172.16.7.45");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola";
//发布消息
byte[] messageBodyBytes = "quit".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close();
conn.close();
}
}
- 消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author create by yingjie.chen on 2018/3/27.
* @version 2018/3/27 15:17
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("172.16.7.45");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
final Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "hola";
//绑定队列,通过键 hola 将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);
while(true) {
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
}
原理
- Client 用过EXCHANGE 指定routingkey 就能找到消息所在的队列Queue
- Client 订阅就是监控绑定的的队列,有消息会直接取到,消费者处理完返回信息原理也是一样,通过不一样的队列(一般都是queue后面加个queueName_xxxx)
网友评论