NIO在Zookeeper中的应用,我们今天讨论的是Zookeeper服务端启动流程中,使用Java NIO或者Netty创建socket网络通信的过程。
Zookeeper服务端启动过程中,会初始化ServerCnxnFactory,并创建ServerCnxnFactory主线程,这是一个守护线程。这个线程的具体实现是Java NIO或者Netty。也就是说,Zookeeper客户端与服务端通信的机制是NIO。也就是同步非阻塞IO。
OK,接下来,我们进入源码,分析实现细节。
一,NIO在Zookeeper源码中的入口
如图:我们的NIO的实现逻辑就在NIOServerCnxnFactory和NettyServerCnxnFactory中。

// 创建服务端实例
this.cnxnFactory = ServerCnxnFactory.createFactory();
// 配置服务端
this.cnxnFactory.configure(config.getClientPortAddress, config.getMaxClientCnxns());
这里主要配置了SASL登录,创建守护线程处理请求,创建socket网络通信。这里socket,使用的是Java NIO或者Netty。
this.cnxnFactory.configure()方法的实现有2个,NIOServerCnxnFactory和NettyServerCnxnFactory。接下来我们分别来介绍。
二,NIOServerCnxnFactory实现源码
NIOServerCnxnFactory继承了ServerCnxnFactory抽象类,并且实现了Runnable接口。
NIOServerCnxnFactory.configure(config.getClientPortAddress, config.getMaxClientCnxns())的关键源码如下:
// 配置SASL登录
this.configureSaslLogin();
// 创建一个守护线程,用来处理客户端请求
this.thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
this.thread.setDaemon(true);
this.maxClientCnxns = maxcc;
// 开启通道Channel
this.ss = ServerSocketChannel.open();
this.ss.socket().setReuseAddress(true);
// 绑定socket地址
this.ss.socket().bind(addr);
// 设置线程的阻塞模式:false-非阻塞
// Adjusts this channel's blocking mode.
this.ss.configureBlocking(false);
// Registers this channel with the given
// selector, returning a selection key.
// 把通道注册到选择器selector上。
this.ss.register(this.selector, 16);
以上就是配置NIOServerCnxnFactory的关键代码。有两点需要补充说明一下,一个是设置地址复用,再一个就是缓冲Buffer的使用。
1,首先来看地址复用,这是一个难点,因为涉及到底层的TCP通信协议。
this.ss.socket().setReuseAddress(true)
要搞清楚这个地址复用的含义,首先来看一下接口SocketOptions,这个接口是干嘛的呢?看注释:Interface of methods to get/set socket option。意思就是用来设置socket的一些可选值。这个接口中定义了2个方法和14个属性。
2个方法如下:
// 设置socket可选值
public void setOption(int optId, Object value);
// 获取socket可选值
public void getOption(int optId);
14个属性如下:
// Disable Nagle's algorithm for this
// connection. Written data to the network
// is not buffered pending acknowledgment
// of perviously written data.
// 什么意思呢?就是说发送数据包不再多条
// 一起发送,而是来一条发一条。
// 这里的纳格尔算法可以了解一下。
TCP_NODELAY = 0x0001;
SO_BINDADDR = 0x000F;
// Sets SO_REUSEADDR for a socket.
// This is used only for MulticastSockets in
// java, and it is set by default for
// MulticastSockets.
// 个人理解:这个选项是设置socket是否支 // 持多路复用,多路复用是实现NIO的基
//础。因此NIO操作socket时会启用该选项。
SO_REUSEADDR = 0x000F;
SO_BRODCAST = 0x000F;
// Set which outgoing interface on which to // send multicast packets.
IP_MULTICAST_IF = 0x10;
// Same as IP_MULTICAST_IF. This option
// is introduced so that the behavior with
// IP_MULTICAST_IF will be kept the same
// as before, while this new option can
// support setting outgoing interfaces with
// either IPv4 and IPv6 address.
IP_MULTICAST_IF2 = 0x1f;
// This option enables or disables local
// loopback of multicast datagram.
// This option is enabled by default for
// multicast socket.
IP_MULTICAST_LOOP = 0x12;
IP_TOS = 0x3;
SO_LINGER = 0x0080;
SO_TIMEOUT = 0x1006;
SO_SNDBUF = 0x1001;
SO_RCVBUF = 0x1002;
SO_KEEPALIVE = 0x0008;
SO_OOBINLINE = 0x1003;
我们这里重点关注一下SO_REUSEADDR,我对这个选项的理解是,是否支持多路复用。这个仅仅是个人理解,如果理解有偏差,还请大家多多指正。
关于多路复用,这里我们来理解一下这个概念。多路复用,又叫multiplexing,全称I/O multiplexing。指的是,单个线程通过记录多个socket I/O流的方式来管理多个I/O流。
简单来说就是:单个线程管理多个连接的流操作。
OK,我们回到第一个问题,
this.ss.socket().setReuseAddress(true);
设置这个地址复用,其实就是NIO实现多路复用的一个必要条件。
2,接下来我们看缓冲Buffer。
我们都知道,NIO的实现,包含三个技术点:Selector、Channel、Buffer。
总得来说,NIO是面向通道和缓冲区的,也就是说,数据是从通道读取到缓冲区的,或者反过来从缓冲区写入通道。
这里还有一个关键点,就是选择器的工作原理。选择器提供选择执行已经就绪的任务的能力。从底层来看,选择器提供了询问通道是否已经就绪的能力。选择器允许单个线程处理多个通道。选择器的工作原理挺复杂的,有时间再研究,这里暂时不再讨论。
三,总结
OK,接下来我们总结一下如何使用NIO。
第一步:开启选择器。
SelectorProvider.provider().openSelector();
第二步:开启通道
SelectorProvider.provider().openServerSocketChannel();
第三步:设置地址复用
this.ss.socket().setReuseAddress(true);
第四步:绑定地址
this.ss.socket().bind(addr);
第五步:设置线程的阻塞模式:非阻塞
// Adjusts this channel's blocking mode.
this.ss.configureBlocking(false);
第六步:把通道注册到选择器selector上
this.ss.register(this.selector, 16);
到这里,我们的NIO就可以开始对外提供服务了,有的兄弟可能会疑惑,NIO三大件中的Buffer呢?怎么这里只提到了Selector和Channel?
Buffer其实是在数据处理的时候开始使用的,但是这块内存区域的分配其实在NIOServerCnxnFactory初始化的时候就已经分配了。
final ByteBuffer directBuffer = ByteBuffer.allocateDirect(65536);
网友评论