美文网首页NIO
NIO源码阅读(4)-SocketChannel

NIO源码阅读(4)-SocketChannel

作者: allanYan | 来源:发表于2016-12-22 17:29 被阅读0次

    概述

    ServerSocketChannel主要用于服务端,而在客户端,经常打交道的是SocketChannel,这篇文章将介绍SocketChannel是如何实现的。

    实例化

    在之前介绍SelectorProvider的时候曾经介绍过,NIO channel的创建都是通过SelecorProvider实现的:

    public SocketChannel openSocketChannel() throws IOException {
            return new SocketChannelImpl(this);
        }
    

    当然SocketChannel也提供了快捷方法open:

    public static SocketChannel open() throws IOException {
        return SelectorProvider.provider().openSocketChannel();
    }
    
    public static SocketChannel open(SocketAddress remote)
        throws IOException
    {
        SocketChannel sc = open();//调用SelectorProvider实现,默认是堵塞的
        try {
            sc.connect(remote);
        } catch (Throwable x) {
            try {
                sc.close();
            } catch (Throwable suppressed) {
                x.addSuppressed(suppressed);
            }
            throw x;
        }
        assert sc.isConnected();
        return sc;
    }
    

    SocketChannel是个抽象类,SelectorProvider返回的是SocketChannelImpl,继承自SocketChannel;一般情况下,当采用异步方式时,使用不带参数的open方法比较常见,而且会调用configureBlocking设置非堵塞;

    SocketChannelImpl构造函数定义如下:

    SocketChannelImpl(SelectorProvider sp) throws IOException {
        super(sp);
        //调用inux的socket函数,true表示TCP
        this.fd = Net.socket(true);
        //由于FileDescriptor未提供访问fdVal的方法,通过JNI获取
        this.fdVal = IOUtil.fdVal(fd);
        this.state = ST_UNCONNECTED;//设置状态为未连接
    }
    

    connect

    调用connect方法连接到远程服务器,其源码如下:

    public boolean connect(SocketAddress sa) throws IOException {
        int localPort = 0;
        //注意加速顺序,整个类保存一致
        synchronized (readLock) {
            synchronized (writeLock) {
                ensureOpenAndUnconnected();//检查连接状态
                InetSocketAddress isa = Net.checkAddress(sa);
                synchronized (blockingLock()) {
                    int n = 0;
                    try {
                        try {
                           //支持线程中断,通过设置当前线程的Interruptible blocker属性实现,由于前面已经介绍过多次,此处不再介绍
                            begin();
                            synchronized (stateLock) {
                               //默认为open, 除非调用了close方法
                                if (!isOpen()) {
                                    return false;
                                }
                                //只有未绑定本地地址也就是说未调用bind方法才执行,该方法在ServerSocketChannel中介绍过
                                if (localAddress == null) {
                                    NetHooks.beforeTcpConnect(fd,
                                                           isa.getAddress(),
                                                           isa.getPort());
                                }
                                //记录当前线程
                                readerThread = NativeThread.current();
                            }
                            for (;;) {
                                InetAddress ia = isa.getAddress();
                                if (ia.isAnyLocalAddress())
                                    ia = InetAddress.getLocalHost();
                               //调用Linux的connect函数实现,如果采用堵塞模式,会一直等待,直到成功或出现异常,后面会介绍
                                n = Net.connect(fd,
                                                ia,
                                                isa.getPort());
                                if (  (n == IOStatus.INTERRUPTED)
                                      && isOpen())
                                    continue;
                                break;
                            }
    
                        } finally {
                           //清空readerThread
                            readerCleanup();
                           //和begin成对出现,当线程中断时,抛出ClosedByInterruptException
                            end((n > 0) || (n == IOStatus.UNAVAILABLE));
                            assert IOStatus.check(n);
                        }
                    } catch (IOException x) {
                        close(); //出现异常,关闭channel
                        throw x;
                    }
                    synchronized (stateLock) {
                        remoteAddress = isa;
                        if (n > 0) {//如果连接成功,更新状态为ST_CONNECTED
                            state = ST_CONNECTED;
                           //如果未调用bind方法,操作系统内核会自动分配地址和端口;否则返回bind的地址和端口
                            if (isOpen())
                                localAddress = Net.localAddress(fd);
                            return true;
                        }
                       //如果是非堵塞模式,而且未立即返回成功,更新状态为ST_PENDING;
                       //由此可见,该状态只有非堵塞时才会存在
                        if (!isBlocking())
                            state = ST_PENDING;
                        else
                            assert false;
                    }
                }
                return false;
            }
        }
    }
    

    上面的代码中会调用Net.connect方法,该方法最终会调用native方法:

    JNIEXPORT jint JNICALL
    Java_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6,
                                 jobject fdo, jobject iao, jint port)
    {
        SOCKADDR sa;
        int sa_len = SOCKADDR_LEN;
        int rv;
        //地址转换为struct sockaddr格式
        if (NET_InetAddressToSockaddr(env, iao, port, (struct sockaddr *) &sa,
                                      &sa_len, preferIPv6) != 0)
        {
          return IOS_THROWN;
        }
       //传入fd和sockaddr,与远程服务器建立连接,一般就是TCP三次握手
       //如果设置了configureBlocking(false),不会堵塞,否则会堵塞一直到超时或出现异常
        rv = connect(fdval(env, fdo), (struct sockaddr *)&sa, sa_len);
        if (rv != 0) {//0表示连接成功,失败时通过errno获取具体原因
            if (errno == EINPROGRESS) {//非堵塞,连接还未建立(-2)
                return IOS_UNAVAILABLE;
            } else if (errno == EINTR) {//中断(-3)
                return IOS_INTERRUPTED;
            }
            return handleSocketError(env, errno); //出错
        }
        return 1;//连接建立,一般TCP连接连接都需要时间,因此除非是本地网络,一般情况下非堵塞模式返回IOS_UNAVAILABLE比较多;
    }
    

    从上面可以看到,如果是非堵塞,而且连接未马上建立成功,此时状态为ST_PENDING,那么什么时候会变为ST_CONNECTED呢?是否有什么方法可以查询状态或者等待连接完成呢?

    finishConnect

    带着上面的问题,我们一起看看finishConnect的实现,代码比较长,我只保留比较重要的部分:

    public boolean finishConnect() throws IOException {
        synchronized (readLock) {
            synchronized (writeLock) {
                int n = 0;
                try {
                    try {
                        synchronized (blockingLock()) {
                            if (!isBlocking()) {//非堵塞模式
                                for (;;) {
                                    n = checkConnect(fd, false,
                                                     readyToConnect);
                                    if (  (n == IOStatus.INTERRUPTED)
                                          && isOpen())
                                        continue;
                                    break;//除非被中断,否则退出
                                }
                            } else {//堵塞模式
                                for (;;) {
                                    n = checkConnect(fd, true,
                                                     readyToConnect);
                                    if (n == 0) {//除非>0,否则自旋,继续等待
                                        continue;
                                    }
                                    if (  (n == IOStatus.INTERRUPTED)
                                          && isOpen())
                                        continue;
                                    break;
                                }
                            }
                        }
                    } finally {
                        synchronized (stateLock) {
                            if (state == ST_KILLPENDING) {//调用了close方法
                                kill();
                                n = 0;
                            }
                        }
                    }
                } catch (IOException x) { //异常发生,关闭channel
                    close();
                    throw x;
                }
                if (n > 0) {//连接成功
                    synchronized (stateLock) {
                        state = ST_CONNECTED;//更新状态
                        if (isOpen())
                            localAddress = Net.localAddress(fd);
                    }
                    return true;
                }
                return false;
            }
        }
    }
    

    从上面看到,如果是堵塞模式,会一直循环检查状态,直到成功或发生异常;而非堵塞模式下,检查完,马上结束循环;
    上面的代码是通过checkConnect检查连接状态,下面看看它是如何实现的:

    JNIEXPORT jint JNICALL
    Java_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this,
                                                   jobject fdo, jboolean block,
                                                   jboolean ready)
    {
        int error = 0;
        socklen_t n = sizeof(int);
        jint fd = fdval(env, fdo);//获取FileDescriptor中的fd
        int result = 0;
        struct pollfd poller;
    
        poller.revents = 1;//返回的事件
        if (!ready) {
            poller.fd = fd;//文件描述符
            poller.events = POLLOUT;//请求的事件:写事件
            poller.revents = 0;//返回的事件
           //第3个参数表示超时时间(毫秒)
           //-1表示永远不会超时,0表示立即返回,不阻塞进程
            result = poll(&poller, 1, block ? -1 : 0);
            if (result < 0) {//小于0表示调用失败
                JNU_ThrowIOExceptionWithLastError(env, "Poll failed");
                return IOS_THROWN;
            }
            //非堵塞时,0表示没有准备好的连接
            if (!block && (result == 0))
                return IOS_UNAVAILABLE;
        }
    
        if (poller.revents) {//准备好写或出现错误的socket数量>0
            errno = 0;
            result = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &n);
            if (result < 0) {//出错
                handleSocketError(env, errno);
                return JNI_FALSE;
            } else if (error) {//发生错误
                handleSocketError(env, error);
                return JNI_FALSE;
            }
            return 1;//socket已经准备好,可写,即连接已经建立好
        }
        return 0;
    }
    

    从上面的源码看到,底层是通过poll查询socket的状态,从而判断连接是否建立成功;
    由于在非堵塞模式下,finishConnect方法会立即返回,因此不大建议用循环的方式判断连接是否建立,而是建议注册到Selector,通过ops=OP_CONNECT获取连接完成的SelectionKey,然后调用finishConnect完成连接的建立;
    那么finishConnect是否可以不调用呢?答案是否,因为只有finishConnect中会将状态更新为ST_CONNECTED,而在调用read和write时都会对状态进行判断;

    另外还有特别说一下的是translateReadyOps方法,在EpollSelectorImpl的doSelect方法中会调用channel的translateAndSetReadyOps方法,在该方法中设置SocketChannel的readyToConnect变量;从上面代码知道,finishConnect的时候,如果发现readyToConnect=true,将不会调用poll来查询状态;

    相关文章

      网友评论

        本文标题:NIO源码阅读(4)-SocketChannel

        本文链接:https://www.haomeiwen.com/subject/tmqhvttx.html