美文网首页
Socket详解

Socket详解

作者: lizb | 来源:发表于2019-01-25 13:42 被阅读0次

    前面两篇分析了TCP和UDP协议,本篇来分析一下Socket,有了前面的基础,对理解Socket有很大的帮助,同时也是对TCP和UDP的更深层次的了解,经过多天的资料研究和代码分析,对socket也算是有了一个清楚的认识,鉴于网上的知识比较散,所以想把关于socket的知识整理一下,希望能够帮助想理解socket的童鞋,本着这些目的,本篇就来仔细分析一下Socket的原理。

    文章内容有点长,先罗列一下需要分析的要点:
    1. Socket是什么?
    2. Java中Socket的使用
    3. Java中Socket的源码分析
    4. Linux系统中Socket对TCP的详细处理过程


    1. Socket是什么?

    我们知道进程通信的方法有管道、命名管道、信号、消息队列、共享内存、信号量,这些方法都要求通信的两个进程位于同一个主机。但是如果通信双方不在同一个主机又该如何进行通信呢?
    在计算机网络中有一个tcp/ip协议族,使用tcp/ip协议族就能达到我们想要的效果,如下图所示:


    socket所处层次

    图中可以看到socket是位于应用层和传输层之间的一个抽象层,那么它存在的意义又是什么呢?其实没有socket抽象层,也是可以的,但是,当我们使用不同的协议进行通信时就得使用不同的接口,还得处理不同协议的各种细节,这就增加了开发的难度,软件也不易于扩展。于是UNIX BSD就发明了socket这种东西,socket屏蔽了各个协议的通信细节,使得程序员无需关注协议本身,直接使用socket提供的接口来进行互联的不同主机间的进程的通信。这就好比操作系统给我们提供了使用底层硬件功能的系统调用,通过系统调用我们可以方便的使用磁盘(文件操作),使用内存,而无需自己去进行磁盘读写,内存管理。socket其实也是一样的东西,就是提供了tcp/ip协议的抽象,对外提供了一套接口,同过这个接口就可以统一、方便的使用tcp/ip协议的功能了。

    2.Java中Socket的使用

    在Java中,我们使用Socket有两种,一个是基于TCP的Socket,一个是基于UDP的DatagramSocket,本篇只分析基于TCP的Socket。现在我们来看Socket的使用:

    客户端:指明主机端口创建Socket,获取输出流,写入数据。

            Socket socket = null;
            OutputStream os = null;
    
            try {
                socket = new Socket("192.168.1.106",9200);
                os = socket.getOutputStream();
                os.write("hello server !".getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                try {
                    if(os != null){
                        os.close();
                    }
                    if(socket != null){
                        socket.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    

    服务端:指明绑定的端口,创建ServerSocket,开启端口监听。

          ServerSocket serverSocket;
            try {
                serverSocket = new ServerSocket(9200);
            } catch (IOException e) {   
                return;
            }
    
            while (isRunning){
                try {
                    Socket client = serverSocket.accept();
                    handleClient(client);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            try {
                serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
    

    这里只是一个简单的示例,当然实际开发中可能需要更多的东西,比如在handleClient(client)处理数据的时候加入线程池去处理,以及client的关闭等,这里只是简单的使用,主要目的还是看其源码。

    3.Java中Socket的源码分析

    3.1 服务端ServerSocket的源码解析

    为了便于理解,我们先分析服务端ServerSocket的源码。通常我们在创建的时候需要指明绑定的端口,来看其构造函数:

        public ServerSocket(int port) throws IOException {
            this(port, 50, null);
        }
    
        public ServerSocket(int port, int backlog) throws IOException {
            this(port, backlog, null);
        }
      
        public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
            setImpl();
            if (port < 0 || port > 0xFFFF)
                throw new IllegalArgumentException(
                           "Port value out of range: " + port);
            if (backlog < 1)
              backlog = 50;
            try {
                bind(new InetSocketAddress(bindAddr, port), backlog);
            } catch(SecurityException e) {
                close();
                throw e;
            } catch(IOException e) {
                close();
                throw e;
            }
        }
    

    ServerSocket有三个指明端口的构造函数,其中参数backlog表示已建立连接的最大数量,也就是accept中未被我们取走的最大连接数,这个现在不理解也没关系,后面会重点介绍这个参数,这里当未指明时,它的值默认是50;InetAddress是对IP、DNS、端口等信息的封装类。
    我们看到,在构造函数中,先调用了setImpl()方法,看其源码:

        private void setImpl() {
            if (factory != null) {
                impl = factory.createSocketImpl();
                checkOldImpl();
            } else {
                // No need to do a checkOldImpl() here, we know it's an up to date
                // SocketImpl!
                impl = new SocksSocketImpl();
            }
            if (impl != null)
                impl.setServerSocket(this);
        }
    

    factory是SocketImplFactory类型,是创建SocketImpl的工厂类;impl是SocketImpl类型。

        /**
         * The factory for all server sockets.
         */
        private static SocketImplFactory factory = null;
    
        /**
         * The implementation of this Socket.
         */
        private SocketImpl impl;
    

    当我们调用构造函数的时候factory肯定是空的,所以此时会给ServerSocket的impl变量赋值为SocksSocketImpl的对象,然后调用 impl.setServerSocket(this)将ServerSocket自己和SocksSocketImpl关联起来。

    回到构造函数,接下来会判断端口的正确性以及backlog的处理,最后调用了bind(new InetSocketAddress(bindAddr, port), backlog)方法来绑定该端口,看其源码:

    
        public void bind(SocketAddress endpoint, int backlog) throws IOException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            if (!oldImpl && isBound())
                throw new SocketException("Already bound");
            if (endpoint == null)
                endpoint = new InetSocketAddress(0);
            if (!(endpoint instanceof InetSocketAddress))
                throw new IllegalArgumentException("Unsupported address type");
            InetSocketAddress epoint = (InetSocketAddress) endpoint;
            if (epoint.isUnresolved())
                throw new SocketException("Unresolved address");
            if (backlog < 1)
              backlog = 50;
            try {
                SecurityManager security = System.getSecurityManager();
                if (security != null)
                    security.checkListen(epoint.getPort());
                getImpl().bind(epoint.getAddress(), epoint.getPort());
                getImpl().listen(backlog);
                bound = true;
            } catch(SecurityException e) {
                bound = false;
                throw e;
            } catch(IOException e) {
                bound = false;
                throw e;
            }
        }
    

    首先进行一大堆的判断,之后获取SecurityManager,这是一个安全策略管理器,在执行一些不安全、敏感等操作之前通常需要用到这个东西来先检查一下,比如这里在监听之前,调用了checkListen()方法,来检查当前线程是否允许在指定端口的连接请求过程中的使用wait等待,如果不允许,则会抛出异常。接着继续往下看,getImpl()源码:

        SocketImpl getImpl() throws SocketException {
            if (!created)
                createImpl();
            return impl;
        }
    
        void createImpl() throws SocketException {
            if (impl == null)
                setImpl();
            try {
                impl.create(true);
                created = true;
            } catch (IOException e) {
                throw new SocketException(e.getMessage());
            }
        }
    
        /**
         * Creates a socket with a boolean that specifies whether this
         * is a stream socket (true) or an unconnected UDP socket (false).
         */
        protected synchronized void create(boolean stream) throws IOException {
            this.stream = stream;
            if (!stream) {
                ResourceManager.beforeUdpCreate();
                // only create the fd after we know we will be able to create the socket
                fd = new FileDescriptor();
                try {
                    socketCreate(false);
                } catch (IOException ioe) {
                    ResourceManager.afterUdpClose();
                    fd = null;
                    throw ioe;
                }
            } else {
                fd = new FileDescriptor();
                socketCreate(true);
            }
            if (socket != null)
                socket.setCreated();
            if (serverSocket != null)
                serverSocket.setCreated();
        }
    

    created初始化的时候为false,所以会调用createImpl(),impl我们在之前已经给赋值了指向SocksSocketImpl对象,所以将会调用SocksSocketImpl中继承下来的create(boolean stream)方法,可以看出,当参数为true时创建的是stream流,也就是TCP连接,当为false时是UDP。回到前面,当getImpl()返回impl对象后,就调用了它的bind方法和listen方法,我们接着来看bind方法的源码:

        /**
         * Binds the socket to the specified address of the specified local port.
         * @param address the address
         * @param lport the port
         */
        protected synchronized void bind(InetAddress address, int lport) throws IOException {
           synchronized (fdLock) {
                if (!closePending && (socket == null || !socket.isBound())) {
                    NetHooks.beforeTcpBind(fd, address, lport);
                }
            }
            socketBind(address, lport);
            if (socket != null)
                socket.setBound();
            if (serverSocket != null)
                serverSocket.setBound();
        }
    

    可以看到,调用了socketBind(address, lport),在来看socketBind方法源码:

        void socketBind(InetAddress address, int port) throws IOException {
            if (fd == null || !fd.valid()) {
                throw new SocketException("Socket closed");
            }
    
            IoBridge.bind(fd, address, port);
    
            this.address = address;
            if (port == 0) {
                // Now that we're a connected socket, let's extract the port number that the system
                // chose for us and store it in the Socket object.
                localport = IoBridge.getLocalInetSocketAddress(fd).getPort();
            } else {
                localport = port;
            }
        }
    

    注意这里的socketBind是Android SDK下java包中的方法,而在JDK中本身是个native方法,这里大概是谷歌工程师重新写了这个类,但是底层最终都是调用的系统的C++代码。我们接着看下去,调用了IoBridge的bind方法:

    public static void bind(FileDescriptor fd, InetAddress address, int port) throws SocketException {
            if (address instanceof Inet6Address) {
                Inet6Address inet6Address = (Inet6Address) address;
                if (inet6Address.getScopeId() == 0 && inet6Address.isLinkLocalAddress()) {
                    // Linux won't let you bind a link-local address without a scope id.
                    // Find one.
                    NetworkInterface nif = NetworkInterface.getByInetAddress(address);
                    if (nif == null) {
                        throw new SocketException("Can't bind to a link-local address without a scope id: " + address);
                    }
                    try {
                        address = Inet6Address.getByAddress(address.getHostName(), address.getAddress(), nif.getIndex());
                    } catch (UnknownHostException ex) {
                        throw new AssertionError(ex); // Can't happen.
                    }
                }
            }
            try {
                Libcore.os.bind(fd, address, port);
            } catch (ErrnoException errnoException) {
                if (errnoException.errno == EADDRINUSE || errnoException.errno == EADDRNOTAVAIL ||
                    errnoException.errno == EPERM || errnoException.errno == EACCES) {
                    throw new BindException(errnoException.getMessage(), errnoException);
                } else {
                    throw new SocketException(errnoException.getMessage(), errnoException);
                }
            }
        }
    
    

    由此可以看出,最后调用了 Libcore.os.bind(fd, address, port); 那么Libcore.os又是什么,来看一下:

    package libcore.io;
    
    public final class Libcore {
        private Libcore() { }
    
        /**
         * Direct access to syscalls. Code should strongly prefer using {@link #os}
         * unless it has a strong reason to bypass the helpful checks/guards that it
         * provides.
         */
        public static Os rawOs = new Linux();
    
        /**
         * Access to syscalls with helpful checks/guards.
         */
        public static Os os = new BlockGuardOs(rawOs);
    }
    

    所以,调用了BlockGuardOs类中的bind方法,而BlockGuardOs中的bind方法是调用了rawOs对象的bind方法,而Liunx类中几乎都是native方法:

    public final class Linux implements Os {
        Linux() { }
    
        public native FileDescriptor accept(FileDescriptor fd, SocketAddress peerAddress) throws ErrnoException, SocketException;
        public native void bind(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException;
        public native void bind(FileDescriptor fd, SocketAddress address) throws ErrnoException, SocketException;
        public native void listen(FileDescriptor fd, int backlog) throws ErrnoException;
        public native void connect(FileDescriptor fd, InetAddress address, int port) throws ErrnoException, SocketException;
        public native void connect(FileDescriptor fd, SocketAddress address) throws ErrnoException, SocketException;
        public native void close(FileDescriptor fd) throws ErrnoException;
        ...
        
    

    这里我先将主要的几个方法列出来,后面就不用再贴代码了。
    自此,bind方法就执行完了,回到之前,程序会继续执行listen方法,listen方法和bind方法的执行过程几乎完全一样,最后都是调用native层的方法,这里就不再赘述,执行完listen之后,bound = true,表示对端口成功绑定并且开启了监听。所以总的来说,在ServerSocket的构造方法中主要完成了两件事,bind和listen,完成之后,ServerSocket就会一直监听端口中客户端的连接请求,完成三次握手之后,就会建立连接并将连接放入队列中,等待应用程序调用accept从队列中取走该连接,这个过程稍后会详细介绍。

    构造完ServerSocket后,应用程序会不断的调用accept来获取连接,再来看accept的源码:

        /**
         * Listens for a connection to be made to this socket and accepts
         * it. The method blocks until a connection is made.
         */
        public Socket accept() throws IOException {
            if (isClosed())
                throw new SocketException("Socket is closed");
            if (!isBound())
                throw new SocketException("Socket is not bound yet");
            Socket s = new Socket((SocketImpl) null);
            implAccept(s);
            return s;
        }
    

    从注释可以看出该方法是个阻塞的方法,直到队列中有已建立的连接。从代码可以看出,每调用一次accept就会new Socket然后,去接收连接。然后我们来看implAccept方法,其源码如下:

        /**
         * Subclasses of ServerSocket use this method to override accept()
         * to return their own subclass of socket.  So a FooServerSocket
         * will typically hand this method an <i>empty</i> FooSocket.  On
         * return from implAccept the FooSocket will be connected to a client.
         */
        protected final void implAccept(Socket s) throws IOException {
            SocketImpl si = null;
            try {
                if (s.impl == null)
                  s.setImpl();
                else {
                    s.impl.reset();
                }
                si = s.impl;
                s.impl = null;
                si.address = new InetAddress();
                si.fd = new FileDescriptor();
                getImpl().accept(si);
    
                SecurityManager security = System.getSecurityManager();
                if (security != null) {
                    security.checkAccept(si.getInetAddress().getHostAddress(),
                                         si.getPort());
                }
            } catch (IOException e) {
                if (si != null)
                    si.reset();
                s.impl = si;
                throw e;
            } catch (SecurityException e) {
                if (si != null)
                    si.reset();
                s.impl = si;
                throw e;
            }
            s.impl = si;
            s.postAccept();
        }
    

    Socket的setImpl方法和ServerSocket的setImpl方法实现一模一样,所以这里的变量si和参数s.impl指向的还是一个SocksSocketImpl对象,si只是临时变量,可以看到最后还是把准备好的SocksSocketImpl对象重新赋值给了s.impl;
    getImpl().accept(si)是调用的SocksSocketImpl的accept方法,其源码如下:

    /**
         * Accepts connections.
         * @param s the connection
         */
        protected void accept(SocketImpl s) throws IOException {
            acquireFD();
            try {
                BlockGuard.getThreadPolicy().onNetwork();
                socketAccept(s);
            } finally {
                releaseFD();
            }
        }
    

    首先获取文件描述符,然后调用了socketAccept(s),之后在释放文件描述符资源,socketAccept源码如下:

    void socketAccept(SocketImpl s) throws IOException {
            if (fd == null || !fd.valid()) {
                throw new SocketException("Socket closed");
            }
    
            // poll() with a timeout of 0 means "poll for zero millis", but a Socket timeout == 0 means
            // "wait forever". When timeout == 0 we pass -1 to poll.
            if (timeout <= 0) {
                IoBridge.poll(fd, POLLIN | POLLERR, -1);
            } else {
                IoBridge.poll(fd, POLLIN | POLLERR, timeout);
            }
    
            InetSocketAddress peerAddress = new InetSocketAddress();
            try {
                FileDescriptor newfd = Libcore.os.accept(fd, peerAddress);
    
                s.fd.setInt$(newfd.getInt$());
                s.address = peerAddress.getAddress();
                s.port = peerAddress.getPort();
            } catch (ErrnoException errnoException) {
                if (errnoException.errno == EAGAIN) {
                    throw new SocketTimeoutException(errnoException);
                } else if (errnoException.errno == EINVAL || errnoException.errno == EBADF) {
                    throw new SocketException("Socket closed");
                }
                errnoException.rethrowAsSocketException();
            }
    
            s.localport = IoBridge.getLocalInetSocketAddress(s.fd).getPort();
        }
    
    

    这里通过调用IoBridge的poll实现阻塞,其底层还是Linux系统通过监听文件描述符事件实现了阻塞,可以看一下poll的注释:

        /**
         * Wait for some event on a file descriptor, blocks until the event happened or timeout period
         * passed. See poll(2) and @link{android.system.Os.Poll}.
         *
         * @throws SocketException if poll(2) fails.
         * @throws SocketTimeoutException if the event has not happened before timeout period has passed.
         */
        public static void poll(FileDescriptor fd, int events, int timeout)
                throws SocketException, SocketTimeoutException {
            StructPollfd[] pollFds = new StructPollfd[]{ new StructPollfd() };
            pollFds[0].fd = fd;
            pollFds[0].events = (short) events;
    
            try {
                int ret = android.system.Os.poll(pollFds, timeout);
                if (ret == 0) {
                    throw new SocketTimeoutException("Poll timed out");
                }
            } catch (ErrnoException e) {
                e.rethrowAsSocketException();
            }
        }
    

    最终,当连接队列中有已建立的连接后,线程就会被唤醒,继续执行下面的内容,将连接的信息封装到参数s指向的对象中返回,这个对象就是前面的si指向的对象,最后将该对象赋值给前面的s.impl,接着执行s.postAccept(),其源码如下:

        /**
         * set the flags after an accept() call.
         */
        final void postAccept() {
            connected = true;
            created = true;
            bound = true;
        }
    
    

    就是标记了几个状态值,最后此s(Socket)就被返回给我们应用程序来使用了。由此ServerSocket中accept()方法就分析完毕,总结一下就是:线程会被阻塞,直到连接队列中有了新连接,线程再被唤醒,然后将连接的信息封装到一个SocketImpl对象中,再将此SocketImpl对象封装到一个Socket对象中,最后将此Socket对象返回给应用程序使用。

    通过以上的分析,我们了解了服务端的bind、listen、和accept的过程,所有这些最终都是调用底层系统的方法实现的。接下来看一下客户端的分析。

    3.2 客户端Socket的源码解析

    Socket为指定主机地址和端口提供了6个公有的构造函数,但最终都是通过一个私有的构造函数来创建新对象。
    当我们来看一下私有的构造函数源码:

    private Socket(InetAddress[] addresses, int port, SocketAddress localAddr,
                boolean stream) throws IOException {
            if (addresses == null || addresses.length == 0) {
                throw new SocketException("Impossible: empty address list");
            }
    
            for (int i = 0; i < addresses.length; i++) {
                setImpl();
                try {
                    InetSocketAddress address = new InetSocketAddress(addresses[i], port);
                    createImpl(stream);
                    if (localAddr != null) {
                        bind(localAddr);
                    }
                    connect(address);
                    break;
                } catch (IOException | IllegalArgumentException | SecurityException e) {
                    try {
                        // Android-changed:
                        // Do not call #close, classes that extend this class may do not expect a call
                        // to #close coming from the superclass constructor.
                        impl.close();
                        closed = true;
                    } catch (IOException ce) {
                        e.addSuppressed(ce);
                    }
    
                    // Only stop on the last address.
                    if (i == addresses.length - 1) {
                        throw e;
                    }
                }
    
                // Discard the connection state and try again.
                impl = null;
                created = false;
                bound = false;
                closed = false;
            }
        }
    
    

    注意此方法是AndroidSDK下java包下面的,JDK中的这个方法要比这个更简单:

     private Socket(SocketAddress address, SocketAddress localAddr,
                       boolean stream) throws IOException {
            setImpl();
    
            // backward compatibility
            if (address == null)
                throw new NullPointerException();
    
            try {
                createImpl(stream);
                if (localAddr != null)
                    bind(localAddr);
                connect(address);
            } catch (IOException | IllegalArgumentException | SecurityException e) {
                try {
                    close();
                } catch (IOException ce) {
                    e.addSuppressed(ce);
                }
                throw e;
            }
        }
    

    过程都是差不多的都是先setImpl();然后createImpl(stream);最后connect(address);只是Android下的会做一些额外的操作,这里我们就不再赘述,直接进入connect方法里面:

        /**
         * Connects this socket to the server with a specified timeout value.
         * A timeout of zero is interpreted as an infinite timeout. The connection
         * will then block until established or an error occurs.
         */
        public void connect(SocketAddress endpoint, int timeout) throws IOException {
            if (endpoint == null)
                throw new IllegalArgumentException("connect: The address can't be null");
    
            if (timeout < 0)
              throw new IllegalArgumentException("connect: timeout can't be negative");
    
            if (isClosed())
                throw new SocketException("Socket is closed");
    
            if (!oldImpl && isConnected())
                throw new SocketException("already connected");
    
            if (!(endpoint instanceof InetSocketAddress))
                throw new IllegalArgumentException("Unsupported address type");
    
            InetSocketAddress epoint = (InetSocketAddress) endpoint;
            InetAddress addr = epoint.getAddress ();
            int port = epoint.getPort();
            checkAddress(addr, "connect");
    
            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                if (epoint.isUnresolved())
                    security.checkConnect(epoint.getHostName(), port);
                else
                    security.checkConnect(addr.getHostAddress(), port);
            }
            if (!created)
                createImpl(true);
            if (!oldImpl)
                impl.connect(epoint, timeout);
            else if (timeout == 0) {
                if (epoint.isUnresolved())
                    impl.connect(addr.getHostName(), port);
                else
                    impl.connect(addr, port);
            } else
                throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
            connected = true;
            /*
             * If the socket was not bound before the connect, it is now because
             * the kernel will have picked an ephemeral port & a local address
             */
            bound = true;
        }
    
    

    和服务端的bind方法差不多,一样先进行一大堆的判断,最后也是调用了SocksSocketImpl类中的connect方法,看一下SocksSocketImpl中的connect源码:

        /**
         * Connects the Socks Socket to the specified endpoint. It will first
         * connect to the SOCKS proxy and negotiate the access. If the proxy
         * grants the connections, then the connect is successful and all
         * further traffic will go to the "real" endpoint.
         */
        @Override
        protected void connect(SocketAddress endpoint, int timeout) throws IOException {
    
            ...省略
    
            // Connects to the SOCKS server
            try {
                   privilegedConnect(server, serverPort, remainingMillis(deadlineMillis));
             } catch (IOException e) {
                    throw new SocketException(e.getMessage());
             }
    
             ...省略
    
        }
    
    

    这个方法代码很多也很复杂,Android版的和JDK版的还有些代码不同,这里我们不去关注,只需要知道在里面调用了 privilegedConnect(server, serverPort, remainingMillis(deadlineMillis));跟进去:

    private synchronized void privilegedConnect(final String host,final int port, final int timeout) throws IOException{
            try {
                AccessController.doPrivileged(
                    new java.security.PrivilegedExceptionAction<Void>() {
                        public Void run() throws IOException {
                                  superConnectServer(host, port, timeout);
                                  cmdIn = getInputStream();
                                  cmdOut = getOutputStream();
                                  return null;
                              }
                          });
            } catch (java.security.PrivilegedActionException pae) {
                throw (IOException) pae.getException();
            }
        }
    

    调用了AccessController的doPrivileged方法,在其中执行了PrivilegedExceptionAction对象的run方法,也就是执行了superConnectServer(host, port, timeout),而这个方法是直接调用父类AbstractPlainSocketImpl的connect方法,所以我们直接看AbstractPlainSocketImpl中的connect方法:

     /**
         * Creates a socket and connects it to the specified address on
         * the specified port.
         * @param address the address
         * @param timeout the timeout value in milliseconds, or zero for no timeout.
         * @throws IOException if connection fails
         * @throws  IllegalArgumentException if address is null or is a
         *          SocketAddress subclass not supported by this socket
         * @since 1.4
         */
        protected void connect(SocketAddress address, int timeout)
                throws IOException {
            boolean connected = false;
            try {
                if (address == null || !(address instanceof InetSocketAddress))
                    throw new IllegalArgumentException("unsupported address type");
                InetSocketAddress addr = (InetSocketAddress) address;
                if (addr.isUnresolved())
                    throw new UnknownHostException(addr.getHostName());
                this.port = addr.getPort();
                this.address = addr.getAddress();
    
                connectToAddress(this.address, port, timeout);
                connected = true;
            } finally {
                if (!connected) {
                    try {
                        close();
                    } catch (IOException ioe) {
                        /* Do nothing. If connect threw an exception then
                           it will be passed up the call stack */
                    }
                }
            }
        }
    
    

    很明显,执行了connectToAddress(this.address, port, timeout):

      private void connectToAddress(InetAddress address, int port, int timeout) throws IOException {
            if (address.isAnyLocalAddress()) {
                doConnect(InetAddress.getLocalHost(), port, timeout);
            } else {
                doConnect(address, port, timeout);
            }
        }
    

    判断端口是否是本地的还是远程的,然后执行相应的doConnect方法:

       /**
         * The workhorse of the connection operation.  Tries several times to
         * establish a connection to the given <host, port>.  If unsuccessful,
         * throws an IOException indicating what went wrong.
         */
    
        synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
            synchronized (fdLock) {
                if (!closePending && (socket == null || !socket.isBound())) {
                    NetHooks.beforeTcpConnect(fd, address, port);
                }
            }
            try {
                acquireFD();
                try {
                    BlockGuard.getThreadPolicy().onNetwork();
                    socketConnect(address, port, timeout);
                    /* socket may have been closed during poll/select */
                    synchronized (fdLock) {
                        if (closePending) {
                            throw new SocketException ("Socket closed");
                        }
                    }
                    // If we have a ref. to the Socket, then sets the flags
                    // created, bound & connected to true.
                    // This is normally done in Socket.connect() but some
                    // subclasses of Socket may call impl.connect() directly!
                    if (socket != null) {
                        socket.setBound();
                        socket.setConnected();
                    }
                } finally {
                    releaseFD();
                }
            } catch (IOException e) {
                close();
                throw e;
            }
        }
    

    在这其中执行了socketConnect(address, port, timeout);此方法在AbstractPlainSocketImpl中是个抽象方法,是由SocksSocketImpl的直接父类PlainSocketImpl实现的,来看一下其源码:

        void socketConnect(InetAddress address, int port, int timeout) throws IOException {
            if (fd == null || !fd.valid()) {
                throw new SocketException("Socket closed");
            }
    
            IoBridge.connect(fd, address, port, timeout);
    
            this.address = address;
            this.port = port;
    
            if (localport == 0) {
                // If socket is pending close, fd becomes an AF_UNIX socket and calling
                // getLocalInetSocketAddress will fail.
                // http://b/34645743
                if (!isClosedOrPending()) {
                    localport = IoBridge.getLocalInetSocketAddress(fd).getPort();
                }
            }
        }
    

    调用了IoBridge的connect方法,再来看这个方法:

      /**
         * Connects socket 'fd' to 'inetAddress' on 'port', with a the given 'timeoutMs'.
         * Use timeoutMs == 0 for a blocking connect with no timeout.
         */
        public static void connect(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs) throws SocketException, SocketTimeoutException {
            try {
                connectErrno(fd, inetAddress, port, timeoutMs);
            } catch (ErrnoException errnoException) {
                if (errnoException.errno == EHOSTUNREACH) {
                    throw new NoRouteToHostException("Host unreachable");
                }
                if (errnoException.errno == EADDRNOTAVAIL) {
                    throw new NoRouteToHostException("Address not available");
                }
                throw new ConnectException(connectDetail(fd, inetAddress, port, timeoutMs,
                        errnoException), errnoException);
            } catch (SocketException ex) {
                throw ex; // We don't want to doubly wrap these.
            } catch (SocketTimeoutException ex) {
                throw ex; // We don't want to doubly wrap these.
            } catch (IOException ex) {
                throw new SocketException(ex);
            }
        }
    

    直接调用了connectErrno方法,其源码如下:

    private static void connectErrno(FileDescriptor fd, InetAddress inetAddress, int port, int timeoutMs) throws ErrnoException, IOException {
            // With no timeout, just call connect(2) directly.
            if (timeoutMs <= 0) {
                Libcore.os.connect(fd, inetAddress, port);
                return;
            }
    
            // For connect with a timeout, we:
            //   1. set the socket to non-blocking,
            //   2. connect(2),
            //   3. loop using poll(2) to decide whether we're connected, whether we should keep
            //      waiting, or whether we've seen a permanent failure and should give up,
            //   4. set the socket back to blocking.
    
            // 1. set the socket to non-blocking.
            IoUtils.setBlocking(fd, false);
    
            // 2. call connect(2) non-blocking.
            long finishTimeNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs);
            try {
                Libcore.os.connect(fd, inetAddress, port);
                IoUtils.setBlocking(fd, true); // 4. set the socket back to blocking.
                return; // We connected immediately.
            } catch (ErrnoException errnoException) {
                if (errnoException.errno != EINPROGRESS) {
                    throw errnoException;
                }
                // EINPROGRESS means we should keep trying...
            }
    
            // 3. loop using poll(2).
            int remainingTimeoutMs;
            do {
                remainingTimeoutMs =
                        (int) TimeUnit.NANOSECONDS.toMillis(finishTimeNanos - System.nanoTime());
                if (remainingTimeoutMs <= 0) {
                    throw new SocketTimeoutException(connectDetail(fd, inetAddress, port, timeoutMs,
                            null));
                }
            } while (!IoBridge.isConnected(fd, inetAddress, port, timeoutMs, remainingTimeoutMs));
            IoUtils.setBlocking(fd, true); // 4. set the socket back to blocking.
        }
    
    

    connectErrno实现了connect+poll的逻辑,可以看出,如果没有设置超时时间,则立刻调用Libcore.os.connect(fd, inetAddress, port)来建立连接,之前我们已经罗列出来了,这是个native方法,是调用系统底层来实现的。接着往下看,注释已经说明的很清楚了, 如果有timeout的时候,进行非阻塞connect,然后用poll进行事件轮询直到timeout。OK,现在再回到privilegedConnect方法中的run方法里面,当连接没报异常,表示连接成功,然后调用getInputStream()和getOutputStream()来初始化了输入流和输出流。

    
        protected synchronized InputStream getInputStream() throws IOException {
            synchronized (fdLock) {
                if (isClosedOrPending())
                    throw new IOException("Socket Closed");
                if (shut_rd)
                    throw new IOException("Socket input is shutdown");
                if (socketInputStream == null)
                    socketInputStream = new SocketInputStream(this);
            }
            return socketInputStream;
        }
    
        protected synchronized OutputStream getOutputStream() throws IOException {
            synchronized (fdLock) {
                if (isClosedOrPending())
                    throw new IOException("Socket Closed");
                if (shut_wr)
                    throw new IOException("Socket output is shutdown");
                if (socketOutputStream == null)
                    socketOutputStream = new SocketOutputStream(this);
            }
            return socketOutputStream;
        }
    

    自此,Socket中的成员变量SocksSocketImpl中的输入和输出流就已经被初始化好了,然后就可以给应用程序使用。
    最后总结一下客户端Socket:当我们指定服务器地址和端口来构造完Socket时,会通过SocksSocketImpl对象去调用系统底层与与服务端建立连接,连接建立成功后,接着就初始化SocksSocketImpl的输入流和输出流,供我们应用程序使用。

    4. Linux系统中Socket对TCP的详细处理过程

    对于Socket对TCP的详细处理过程,只需要一张图,你就明白了:


    处理过程

    在前面文章中,我们分析了TCP在建立连接的三次握手,但是只分析了数据包的内容,并没有分析服务端系统底层的socket是怎么处理的,现在我们就来继续分析。
    首先我们要知道:Linux内核协议栈为TCP连接管理使用两个队列,一个是SYN队列(半链接队列,用来保存处于SYN_SENT和SYN_RECV状态的请求),一个是accpetd队列(用来保存处于established状态,但是应用层没有调用accept取走的请求),这两个队列是内核实现的,当服务器绑定、监听了某个端口后,这个端口的SYN队列和ACCEPT队列就建立好了。如图,整个过程就是:

    1. 当SYN包到达了服务器后,内核会把这一信息放到SYN队列(即未完成握手队列)中,同时回一个SYN+ACK包给客户端。
    2. 一段时间后,客户端再次发来了针对服务器SYN包的ACK网络分组时,内核会把连接从SYN队列中取出,再把这个连接放到ACCEPT队列(即已完成握手队列)中。
    3. 服务器在第3步调用accept时,其实就是直接从ACCEPT队列中取出已经建立成功的连接套接字而已。

    现在我们来讨论其中可能出现的问题:
    前面我们在分析java源码中讲到在创建ServerSocket的时候,需要指定一个backlog参数,默认传的是50,那么这个参数到底是什么?
    在Linux内核2.2之前,backlog参数是用来限制SYN队列和ACCEPT队列的长度的,两个队列的大小都是相同的。但是在2.2以后,就分离为两个backlog来分别限制这两个队列的大小。

    第一个队列的长度是/proc/sys/net/ipv4/tcp_max_syn_backlog,默认是1024。如果开启了syncookies,那么基本上没有限制。

    第二个队列的长度是/proc/sys/net/core/somaxconn,默认是128,表示最多有129个established链接等待accept。

    那么问题来了,当这两个队列满了后,新的请求到达了又将发生什么?

    对于SYN队列,若队列满,则会直接丢弃请求,即新的SYN网络分组会被丢弃,这个问题好解决,客户端接收不到回复,会再一次发送,然后服务端继续丢弃,知道队列有空闲的位置。而客户端如果一直接收不到回复,发几次之后就会停止。

    对于ACCEPT队列的处理就有点复杂了,分两种情况:

    1.如果server端设置了sysctl_tcp_abort_on_overflow,那么server会发送rst给client,并删除掉这个链接。默认情况下是不会设置的。

    2.如果没有设置sysctl_tcp_abort_on_overflow ,server端只是标记连接请求块的acked标志,并且连接建立定时器,会遍历半连接表,重新发送synack,重复上面的过程,如果重传次数超过synack重传的阀值,会把该连接从半连接链表中直接删除。

    下面来模拟一下第二种情况:

    1.Server收到ack后,尝试将连接放到accept队列,但是因为accept队列已满,所以只是标记连接为acked,并不会将连接移动到accept队列中,也不会为连接分配sendbuf和recvbuf等资源。
    2.client端在发送ack之后就认为已经建立了连接,于是开始向该连接发送数据。
    3.Server端收到数据包,由于acept队列仍然是满的,所以server端处理也只是标记acked,并连接建立定时器,然后返回。
    4.client端由于没有收到刚才发送数据的ack,所以会重传刚才的数据包
    5.同上
    6.同上
    7.同上
    8.server端连接建立定时器生效,遍历半连接链表,发现刚才acked的连接,重新发送synack给client端。注意这个重发是有次数的,如果次数用完,服务端就直接删除了这个半连接。
    9.client端收到synack后,根据ack值,使用SACK算法,只重传最后一个ack内容。
    10.Server端收到数据包,如果accept队列仍然是满的,则还是只标记acked,然后返回。
    11.client端等待一段时间后,认为连接不可用,于是发送FIN、ACK给server端。Client端的状态变为FIN_WAIT1,等待一段时间后,client端将看不到该链接(终止了)。

    但是在上面第10步,Server端收到数据包,如果此时服务器进程处理完一个请求,从accept队列中取走一个了连接,此时accept队列中有了空闲,server端将请求的连接放到accept队列中。这样应用服务器进程显示该链接是established的,但是在client端上已经没有该链接了。之后,应用服务器进程通过该连接调用read去读取sock中的内容,但是由于client端早就退出了,所以read会一直阻塞的。或许你会认为在server端不应该建立此连接,这是内核的bug。但是内核是按照RFC来实现的,在3次握手的过程中,是不会判断FIN标志位的,只会处理SYN、ACK、RST这三种标志位。从应用层的角度来考虑解决问题的方法,那就是使用非阻塞的方式read,或者使用select超时方式read;亦或者client端关闭连接的时候使用RST方式,而不是FIN方式。

    应用

    经过上面分析的这些之后,我们应用层要做的就是尽量避免ACCEPT队列拥堵,所以,如TOMCAT等服务器会使用独立的线程,只做accept获取连接这一件事,以防止不能及时的去accept获取连接。但是也有的一些服务器,如Nginx,在一个线程内做accept的同时,还会做其他IO等操作,这又是为什么呢?
    这里主要是由于阻塞和非阻塞的概念,应用程序可以把listen时设置的套接字设为非阻塞模式(默认为阻塞模式),这两种模式会导致accept方法有不同的行为:
    1. 阻塞套接字上使用accept,第一个阶段是等待ACCEPT队列不为空的阶段,它耗时不定,由客户端是否向自己发起了TCP请求而定,可能会耗时很长。

    2. 非阻塞套接字上的accept,不存在等待ACCEPT队列不为空的阶段,它要么返回成功并拿到建立好的连接,要么返回失败。因此它在accept阶段是不会阻塞的。

    所以,在企业级的服务器进程中,若某一线程既使用accept获取新连接,又继续在这个连接上读、写字符流,那么,这个连接对应的套接字通常要设为非阻塞。调用accept时不会长期占用所属线程的CPU时间片,使得线程能够及时的做其他工作。

    相关文章

      网友评论

          本文标题:Socket详解

          本文链接:https://www.haomeiwen.com/subject/tsmmjqtx.html