摘要
本节讲解ServerCnxnFactory的nio实现,NIOServerCnxnFactory,是33节讲的NIOServerCnxn的工厂类
本节讲解内容如下
简介
属性
函数
configure完成Factory的初始配置
start和startup分别完成集群,非集群模式下的server启动
run方法以及内部调用的方法,不断完成
线程不断监听ACCEPT事件完成连接
监听READ,WRITE事件调用NIOServerCnxn#doIO完成相关IO操作
简介
这里贴一下ServerCnxn,ServerCnxnFactory以及netty,nio两种实现的关系
ServerCnxn以及ServerCnxnFactory本节以及前面三节讲解了除netty以外的四个类
类继承,实现关系如下
NIOServerCnxnFactory继承,实现关系属性
ServerSocketChannel ss;
final Selector selector = Selector.open();
/**
* We use this buffer to do efficient socket I/O. Since there is a single
* sender thread per NIOServerCnxn instance, we can use a member variable to
* only allocate it once.
*/
final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
new HashMap<InetAddress, Set<NIOServerCnxn>>( );//每个client地址对应的连接集合
int maxClientCnxns = 60;//默认最大允许的client数量
Thread thread;//后台跑的线程
NIO相关部分不展开。
函数
get,set,close,shutdown等相关函数不展开
configure
继承父类方法,初始化thread,完成socket相关配置
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configureSaslLogin();
thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
thread.setDaemon(true);
maxClientCnxns = maxcc;
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
不同模式的启动方法
分为单机和集群模式,
@Override
public void start() {//集群版启动
// ensure thread is started once and only once
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
@Override
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {//单机版启动
//启动IO线程
start();
setZooKeeperServer(zks);
//从log和snapshot恢复database和session,并重新生成一个最新的snapshot文件
zks.startdata();
//启动sessionTracker线程,初始化IO请求的处理链,并启动每个processor线程
zks.startup();
}
线程运行相关
上面thread.start()进入NIOServerCnxnFactory#run中,源码如下
run方法
public void run() {
while (!ss.socket().isClosed()) {//只要socket没有close
try {
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);//随机打乱
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {//获取client的连接请求
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);//获取client对应的连接数
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);//注册read事件
NIOServerCnxn cnxn = createConnection(sc, sk);//创建连接,构造NIOServerCnxn
sk.attach(cnxn);//selecionKey带上一个附件cnxn
addCnxn(cnxn);//加入ipMap记录
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();//取出附件cnxn
c.doIO(k);//处理IO操作
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "
+ k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
closeAll();//结束关闭所有连接
LOG.info("NIOServerCnxn factory exited run method");
}
里面调用,涉及的函数如下
getClientCnxnCount
获取同一个client的连接数
private int getClientCnxnCount(InetAddress cl) {//获取同一个client地址的连接数
// The ipMap lock covers both the map, and its contents
// (that is, the cnxn sets shouldn't be modified outside of
// this lock)
synchronized (ipMap) {
Set<NIOServerCnxn> s = ipMap.get(cl);//拿到对应ServerCnxn集合
if (s == null) return 0;
return s.size();
}
}
createConnection
创建NIOServerCnxn
protected NIOServerCnxn createConnection(SocketChannel sock,//创建NIO连接
SelectionKey sk) throws IOException {
return new NIOServerCnxn(zkServer, sock, sk, this);
}
addCnxn
//在验证最大连接数条件ok之后,添加Cnxn记录
private void addCnxn(NIOServerCnxn cnxn) {
synchronized (cnxns) {
cnxns.add(cnxn);
synchronized (ipMap){
InetAddress addr = cnxn.sock.socket().getInetAddress();
Set<NIOServerCnxn> s = ipMap.get(addr);
if (s == null) {
// in general we will see 1 connection from each
// host, setting the initial cap to 2 allows us
// to minimize mem usage in the common case
// of 1 entry -- we need to set the initial cap
// to 2 to avoid rehash when the first entry is added
s = new HashSet<NIOServerCnxn>(2);
s.add(cnxn);
ipMap.put(addr,s);
} else {
s.add(cnxn);
}
}
}
}
思考
为什么单机和集群的启动方式不一样,一个用start一个用startup
单机版:ServerCnxnFactory#startup
集群版:ServerCnxnFactory#start
区别在于单机可以直接从日志,快照等恢复数据
而集群则根据角色划分,涉及到数据同步等
变量ipMap的意义是什么
记录一个ip对应的ServerCnxn列表,用于管理一个ip最大允许的连接数
如何通过SelectionKey关联到对应的NIOServerCnxn的
这里用了SelectionKey的attach特性进行关联,方便进行NIOServerCnxn.doIO的调用
SelectionKey关联NIOServerCnxn问题
NIOServerCnxnFactory#run为什么shuffle就绪的SelectionKey
refer
无
网友评论