美文网首页
zk源码阅读35:Server与Client的网络I/O(四):

zk源码阅读35:Server与Client的网络I/O(四):

作者: 赤子心_d709 | 来源:发表于2017-08-13 15:38 被阅读95次

    摘要

    本节讲解ServerCnxnFactory的nio实现,NIOServerCnxnFactory,是33节讲的NIOServerCnxn的工厂类

    本节讲解内容如下

    简介
    属性
    函数
      configure完成Factory的初始配置
      start和startup分别完成集群,非集群模式下的server启动
      run方法以及内部调用的方法,不断完成
        线程不断监听ACCEPT事件完成连接
        监听READ,WRITE事件调用NIOServerCnxn#doIO完成相关IO操作
    

    简介

    这里贴一下ServerCnxn,ServerCnxnFactory以及netty,nio两种实现的关系

    ServerCnxn以及ServerCnxnFactory

    本节以及前面三节讲解了除netty以外的四个类

    类继承,实现关系如下

    NIOServerCnxnFactory继承,实现关系

    属性

        ServerSocketChannel ss;
    
        final Selector selector = Selector.open();
    
        /**
         * We use this buffer to do efficient socket I/O. Since there is a single
         * sender thread per NIOServerCnxn instance, we can use a member variable to
         * only allocate it once.
        */
        final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
    
        final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
            new HashMap<InetAddress, Set<NIOServerCnxn>>( );//每个client地址对应的连接集合
    
        int maxClientCnxns = 60;//默认最大允许的client数量
    
        Thread thread;//后台跑的线程
    

    NIO相关部分不展开。

    函数

    get,set,close,shutdown等相关函数不展开

    configure

    继承父类方法,初始化thread,完成socket相关配置

        public void configure(InetSocketAddress addr, int maxcc) throws IOException {
            configureSaslLogin();
    
            thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
            thread.setDaemon(true);
            maxClientCnxns = maxcc;
            this.ss = ServerSocketChannel.open();
            ss.socket().setReuseAddress(true);
            LOG.info("binding to port " + addr);
            ss.socket().bind(addr);
            ss.configureBlocking(false);
            ss.register(selector, SelectionKey.OP_ACCEPT);
        }
    

    不同模式的启动方法

    分为单机和集群模式,

        @Override
        public void start() {//集群版启动
            // ensure thread is started once and only once
            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }
        }
    
        @Override
        public void startup(ZooKeeperServer zks) throws IOException,
                InterruptedException {//单机版启动
            //启动IO线程
            start();
            setZooKeeperServer(zks);
            //从log和snapshot恢复database和session,并重新生成一个最新的snapshot文件
            zks.startdata();
            //启动sessionTracker线程,初始化IO请求的处理链,并启动每个processor线程
            zks.startup();
        }  
    

    线程运行相关

    上面thread.start()进入NIOServerCnxnFactory#run中,源码如下

    run方法

        public void run() {
            while (!ss.socket().isClosed()) {//只要socket没有close
                try {
                    selector.select(1000);
                    Set<SelectionKey> selected;
                    synchronized (this) {
                        selected = selector.selectedKeys();
                    }
                    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                            selected);
                    Collections.shuffle(selectedList);//随机打乱
                    for (SelectionKey k : selectedList) {
                        if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {//获取client的连接请求
                            SocketChannel sc = ((ServerSocketChannel) k
                                    .channel()).accept();
                            InetAddress ia = sc.socket().getInetAddress();
                            int cnxncount = getClientCnxnCount(ia);//获取client对应的连接数
                            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                                LOG.warn("Too many connections from " + ia
                                         + " - max is " + maxClientCnxns );
                                sc.close();
                            } else {
                                LOG.info("Accepted socket connection from "
                                         + sc.socket().getRemoteSocketAddress());
                                sc.configureBlocking(false);
                                SelectionKey sk = sc.register(selector,
                                        SelectionKey.OP_READ);//注册read事件
                                NIOServerCnxn cnxn = createConnection(sc, sk);//创建连接,构造NIOServerCnxn
                                sk.attach(cnxn);//selecionKey带上一个附件cnxn
                                addCnxn(cnxn);//加入ipMap记录
                            }
                        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                            NIOServerCnxn c = (NIOServerCnxn) k.attachment();//取出附件cnxn
                            c.doIO(k);//处理IO操作
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Unexpected ops in select "
                                          + k.readyOps());
                            }
                        }
                    }
                    selected.clear();
                } catch (RuntimeException e) {
                    LOG.warn("Ignoring unexpected runtime exception", e);
                } catch (Exception e) {
                    LOG.warn("Ignoring exception", e);
                }
            }
            closeAll();//结束关闭所有连接
            LOG.info("NIOServerCnxn factory exited run method");
        }
    

    里面调用,涉及的函数如下

    getClientCnxnCount

    获取同一个client的连接数

        private int getClientCnxnCount(InetAddress cl) {//获取同一个client地址的连接数
            // The ipMap lock covers both the map, and its contents
            // (that is, the cnxn sets shouldn't be modified outside of
            // this lock)
            synchronized (ipMap) {
                Set<NIOServerCnxn> s = ipMap.get(cl);//拿到对应ServerCnxn集合
                if (s == null) return 0;
                return s.size();
            }
        }
    

    createConnection

    创建NIOServerCnxn

        protected NIOServerCnxn createConnection(SocketChannel sock,//创建NIO连接
                SelectionKey sk) throws IOException {
            return new NIOServerCnxn(zkServer, sock, sk, this);
        }
    

    addCnxn

    //在验证最大连接数条件ok之后,添加Cnxn记录

        private void addCnxn(NIOServerCnxn cnxn) {
            synchronized (cnxns) {
                cnxns.add(cnxn);
                synchronized (ipMap){
                    InetAddress addr = cnxn.sock.socket().getInetAddress();
                    Set<NIOServerCnxn> s = ipMap.get(addr);
                    if (s == null) {
                        // in general we will see 1 connection from each
                        // host, setting the initial cap to 2 allows us
                        // to minimize mem usage in the common case
                        // of 1 entry --  we need to set the initial cap
                        // to 2 to avoid rehash when the first entry is added
                        s = new HashSet<NIOServerCnxn>(2);
                        s.add(cnxn);
                        ipMap.put(addr,s);
                    } else {
                        s.add(cnxn);
                    }
                }
            }
        }
    

    思考

    为什么单机和集群的启动方式不一样,一个用start一个用startup

    单机版:ServerCnxnFactory#startup
    集群版:ServerCnxnFactory#start
    

    区别在于单机可以直接从日志,快照等恢复数据
    而集群则根据角色划分,涉及到数据同步等

    变量ipMap的意义是什么

    记录一个ip对应的ServerCnxn列表,用于管理一个ip最大允许的连接数

    如何通过SelectionKey关联到对应的NIOServerCnxn的

    这里用了SelectionKey的attach特性进行关联,方便进行NIOServerCnxn.doIO的调用

    SelectionKey关联NIOServerCnxn

    问题

    NIOServerCnxnFactory#run为什么shuffle就绪的SelectionKey

    refer

    相关文章

      网友评论

          本文标题:zk源码阅读35:Server与Client的网络I/O(四):

          本文链接:https://www.haomeiwen.com/subject/wnuolxtx.html