相关概念:
-
Producer:消息产生者
-
Consumer: 消息消费者
-
Broker:消息中间件的服务节点,对于Rabbitmq来说,一个Rabbitmq broker可以认为是一个Rabbitmq的服务实例。
-
Connection,与Broker的tcp长连接,Producer和Consumer都需要建立连接之后才可以使用
-
Channel,建立在Connection基础上,每个线程分配一个channel,类似于NIO的多路复用,节省连接资源。大部分RabbitMQ的操作和核心概念都是基于Channel的,需要特别注意。
-
Queue:队列,RabbitMQ中用于存储消息的容器。而且RabbitMQ中的消息只能存储在Queue中,这点跟kafka不同,kafka只能将消息放在topic中,而kafka中的queue只是topic实际存储文件中的位移标识。
多个consumer可以消费同一个queue中的消息,这时候消息的处理是互斥的,即一个消息只能被一个consumer处理。
-
Exchange:还是不翻译成中文了,太怪。在producer将消息发到Broker中时,是通过exchange按照一定规则转发到不同的queue中,而不是直接放入queue中。
-
RoutingKey:producer在发送消息给exchange时,一般会指定一个RoutingKey,用来指定这个消息的路由规则。
-
BindingKey:RoutingKey和BindingKey匹配时(注意不是相同,可能是模糊匹配),exchange才会把消息发送到对应的queue中
exchange类型
- fanout,会把消息路由到所有绑定的queue中,无视RoutingKey和BindingKey
- direct,只能将消息路由到RoutingKey和BindingKey完全匹配的queue,queue可以有多个,只要匹配就行
- topic,可以支持# * 等通配符匹配RoutingKey和BindingKey
- headers,不常用,不会依赖RoutingKey,而是根据消息内容中的headers属性跟exchange绑定的内容进行匹配,性能较低,不过非常灵活。
一些关键点
大部分情况下,按照最简单的方式使用就好了,作为工具书去查询《RabbitMQ实战详解》里面的配置。
- channel.basicQos是设置一个channel中consumer所能保持的最大未确认消息,也就是说,如果一个channel中的qos值已经到了最大,那么rabbitmq就不会继续往这个channel中push对应的消息。
- rabbitmq的顺序一致性其实是无法保证的,
- 比如事务消息或者发送消息确认,当发送失败需要重试时,这一条(批)数据跟其之后的数据在producer端就不一致了
- 如果producer发送的时候设置了不同的超时时间,并且也设置了死信队列,那么消费者在处理死信队列的时候,也会出现数据顺序与发送顺序不同的情况。
- 设置优先级,也会导致顺序一致性收到影响。
- 死信队列、延迟队列、消费优先级、持久化等,查询工具书即可。
web端管理
- 使用rabbitmq-plugins enable rabbitmq_management 来启用web管理插件,重启才会生效.
- 使用rabbitmq-plugins list来查看正在使用的插件。
- 访问 server_ip:15672可以访问,guest用户无法登陆远程服务器,需要使用上面创建的root:root123 用户/密码来登陆
HelloWorld
加入maven依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>
producer代码:
package rabbitmq.server;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQProducer {
private static final String exchange_name = "exchange_siyu";
private static final String routing_key = "routing_key_siyu";
private static final String queue_name = "queue_siyu";
private static final String rabbitmq_server_ip_addr = "10.199.189.30";
private static final int rabbitmq_server_port = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
// 连接工厂类
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接属性及用户名密码,用户、密码要通过rabbitmqctl设置过权限
connectionFactory.setHost(rabbitmq_server_ip_addr);
connectionFactory.setPort(rabbitmq_server_port);
connectionFactory.setUsername("root");
connectionFactory.setPassword("root123"); // 如果用户名密码不匹配,会连接失败
// 建立连接,一个tcp长连接
Connection connection = connectionFactory.newConnection();
// 创建信道,主要操作通过channel执行,可以认为channel是虚拟化出来的一个Connection,用于复用
Channel channel = connection.createChannel();
// 定义路由,direct是point-2-piont的,直接到对应的单个queue中
channel.exchangeDeclare(exchange_name,"direct",true,false,null);
// 定义queue
channel.queueDeclare(queue_name,true,false,false,null);
// 通过routingkey 绑定queue和exchange
channel.queueBind(queue_name,exchange_name,routing_key);
// 开始发送消息
String message = "Hello World!!";
/* MessageProperties中预置了一部分消息的参数,比如PERSIST_TEXT_PLAIN,其中的定义如下:
*
*
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
* */
channel.basicPublish(exchange_name,routing_key, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 关闭channel和connection
channel.close();
connection.close();
}
}
Consumer
package rabbitmq.client;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class RabbitMQConsumer {
private static final String queue_name = "queue_siyu";
private static final String rabbitmq_server_ip_address = "10.199.189.30";
private static final int port = 5672;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[]{
new Address(rabbitmq_server_ip_address,port)
};
// 长连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("root");
connectionFactory.setPassword("root123");
// 这里创建连接跟server端不同,传入了address
Connection connection = connectionFactory.newConnection(addresses);
// 创建channel
final Channel channel = connection.createChannel();
channel.basicQos(64);// ?? 设置客户端最多接收未被ack的消息个数
// 创建消费
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive msg:" + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false); // 发送ack之后,消息会在queue中被删除
}
};
channel.basicConsume(queue_name,consumer);
TimeUnit.SECONDS.sleep(5);
// 如果先关闭connection,再关闭channel,就会抛出异常:
// com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown;
// 所以这里一定要注意关闭的顺序
channel.close();
connection.close();
}
}
网友评论