隔了一周才开始写Kafka和RabbitMQ的Producer发消息的比较,这个一开始无从下手,静下心来才真正能够写出来,接下来咱们进入Producer发送消息比较的正题。
Kafka Producer发送消息流程
1.配置生产者客户端参数并与服务器Broker建立连接。
2.通过消息内容,消息key,partion,topic创建ProducerRecord。
3.对消息的key、value序列化处理
4.分区器对未设定分区的计算分区,然后将ProducerRecord通过RecordAccumulator消息累加器累加到各个不同分区的批次ProducerBatch中。
5.ProducerBatch是一个双端队列,一端取出ProducerRecord,另一端存入ProducerRecord。
6.ProducerRecord将消息缓存累加到一定程度,触发Sender线程发送消息。
7.Sender线程将Deque<ProducerBatch>>的保存形式转变成<Node,List<ProducerBatch>的形式。其中Node表示具体哪一个Broker。
8.Sender线程进一步将消息转换为<Node,Request>的形式。
9.Sender线程发送之前,最后将消息缓存到InFlightRequests中,具体形式为Map<NodeId,Deque<Request>,NodeId表示Broker节点ID,来表示发送的状态,是否发送完成。
10.Sender线程将<Node,Request>形式的数据提交给Selector发送。
11.Selector发送成功后更新InFlightRequests中的缓存消息。
12.发送完毕后关闭生产者实例。
RabbitMQ Producer发送消息流程
1.生产者连接RabbitMQ服务器,建立TCP连接Connection,创建信道Channel
// 设置连接工厂,实际就是配置RabbitMQ的客户端连接账号密码,RabbitMQ服务器地址
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
// 注意这个才是真正的连接到RabbitMQ服务器
Connection connection = factory.newConnection();
// 创建一条信道,共享上面的TCP连接,每个线程共享Connection但是独享Channel
// 避免创建大量TCP Connection
Channel channel = connection.createChannel();
2.生产者创建Exchange交换器,并设置相关属性,比如交换器类型、是否持久化等
String exchange = "test-exchange";
channel.exchangeDeclare(exchange, "direct", true);
3.生产者发送消息至RabbitMQ Broker,其中包含routingKey、交换器等信息
String routingKey = "test-routingKey";
byte[] body = "test".getBytes();
channel.basicPublish(exchange, routingKey, null, body);
4.相应的交换器根据接收到的routingKey 查找相匹配的队列,这一步实际在RabbitMQ服务器。
5.关闭信道、关闭连接。
总结一下
1.Kafka的Producer发送消息过程比较复杂经过了多次转换封装,而RabbitMQ的Producer发送消息过程较为简单,只是将body转换为byte后发送。这么来看Kafka的Producer处理效率不如RabbitMQ。
2.Kafka存在对多条消息的组合成批次发送消息而不是实时立即发送,而RabbitMQ从Producer源码部分没有看出来是批次,目前看是单独一条就发送一条。这么来看再发送量比较少的情况下,Kafka的Producer效率不如RabbitMQ。但是发送消息量大了之后,批次发送消息会比一次发送一条的情况效率要高。
3.Kafka在Producer端就已经知道发送到了哪个Broker,以及topic和partition,而RabbitMQ看来是在服务端匹配的routingKey和bindingKey匹配找到对应的queue,所以Producer实际不知道具体哪个queue。
网友评论