git地址链接:https://github.com/smipo/mykafka-0.7
服务端启动类是kafka,但是启动之前需要指定配置文件server.properties。0.7版本不涉及副本的概念,分区是配置文件配置的,且topic的创建是由生产者获取每个服务端分区数目在服务端创建多少个分区,服务端最重要的功能是日志的管理,网络通讯,向zk注册服务端的信息,以下分别说说概要。
日志的管理:
主要两个部分组成log包和message包,log包主要是管理文件的,message包是管理存储文件信息格式的。log包比较简单主要是两个类:
1.LogManager负责管理Log类,文件定时清理,topic的创建等。
2.Log主要负责管理多个文件读取,写入等。
message包下主要类:
1.Message存储信息的格式
2.MessageSet创建Message,读取,写入。ByteBufferMessageSet主要负责存储在字节缓冲区中的消息序列,负责管理Message,FileMessageSet负责管理单个文件的。
3.CompressionCodec管理文件消息是否需要压缩的,这个相对简单。
关于日志大概介绍完了,因为比较简单不在详细赘述,在简单赘述下0.7版本和0.10版本关于偏移量offset的管理,文件名即为该文件偏移量的起始值是个long型,0.10版本添加了偏移量索引(OffsetIndex),我们先看下0.7版本offset的读取,代码在Log.findRange
QQ图片20200320213305.png
首先需要找到偏移量所在的文件,但是0.7版本没有对多文件偏移量的管理,所以只能遍历文件查找是否有对应的offset,然后就是遍历文件时,需要大量读取文件内容并且需要频繁创建ByteBuffer,影响性能。0.10版本添加了对多文件偏移量的管理代码在Log.scala
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
采用了跳表,当读取offset时,首先会根据segments.floorEntry(startOffset)获取具体Offset索引文件,在获取数据文件获取真正的数据(floorEntry方法是获取小于或等于给定的键,跳表前面有文章介绍),OffsetIndex只存储偏移量和数据位置,并且MappedByteBuffer管理,关于MappedByteBuffer介绍请移步到https://www.jianshu.com/p/f90866dcbffc
网络通讯核心类:(network包)
1.Receive接收客户端请求的基类
2.Send发送给客户端请求的基类
3.SocketServer网络通信,实现了Reactor模型,内部类Acceptor相当于netty的boss线程,内部类Processor相当于netty的work线程。
4.server包中KafkaRequestHandlers是处理客户端请求的类
关于向zk注册信息很简单没什么可讲的,感兴趣看下KafkaZooKeeper类。
网友评论