美文网首页个人学习一些收藏Netty
Netty源码分析 - Bootstrap客户端

Netty源码分析 - Bootstrap客户端

作者: 晴天哥_王志 | 来源:发表于2020-03-21 18:48 被阅读0次

系列

Netty源码分析 - Bootstrap服务端
Netty源码分析 - Bootstrap客户端
netty源码分析 - ChannelHandler
netty源码分析 - EventLoop类关系
netty源码分析 - register分析
Netty源码分析 - NioEventLoop事件处理
netty源码分析 - accept过程分析
Netty源码分析 - ByteBuf
Netty源码分析 - 粘包和拆包问题

开篇

  • 本文基于netty-4.1.8.Final版本进行分析,主要是分析Netty Client初始化过程。
  • 理解Bootstrap、NioEventLoopGroup、NioEventLoop、NioSocketChannel之间的联系。
  • 理解SelectorProvider、NioSocketChannelUnsafe、DefaultChannelPipeline、DefaultChannelPromise之间的联系。
  • 理解NioSocketChannel的初始化和注册流程。
  • 理解Bootstrap客户端连接过程连接过程。

基本概念

Bootstrap关系图
  • Bootstrap包含一个EventLoopGroup对象。
  • EventLoopGroup对象包含多个EventLoop对象。
  • EventLoop对象包含Selector负责处理EventLoop的事件。



SocketChannel关系图
  • SocketChannel包含EventLoop,记录绑定对应的EventLoop对象。
  • SocketChannel包含Unsafe,所有的读写连接动作由Unsafe去实现。
  • SocketChannel包含DefaultChannelPipeline对象,维护所有handler对象。
  • SocketChannel的SelectableChannel类型的ch对象执行真正连接的对象,由SelectorProvider.openSocketChannel()创建。

Netty Client案例

public final class DiscardClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // 创建事件处理Group
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            // 创建Bootstrap对象
            Bootstrap b = new Bootstrap();
            // 绑定group到Bootstrap
            b.group(group)
              // 绑定channel到Bootstrap
             .channel(NioSocketChannel.class)
              // 绑定handler到Bootstrap
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     p.addLast(new DiscardClientHandler());
                 }
             });

            // 建立连接
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  • Netty客户端demo如上图所示,固定的操作步骤。
  • 创建EventLoopGroup对象
  • 创建Bootstrap对象
  • 绑定EventLoopGroup到Bootstrap对象,采用链式编程风格,直接返回了当前对象。
  • 绑定channel(NioSocketChannel)到Bootstrap对象,采用链式编程风格。
  • 绑定handler(ChannelInitializer)到Bootstrap对象,采用链式编程风格。
  • 建立连接,Bootstrap的connect(HOST, PORT).sync()方法。

对象基本概念介绍

Bootstrap

Bootstrap
  • Bootstrap的继承关系如上图,父类AbstractBootstrap提供通用功能。

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class);
    private static final AddressResolverGroup<?> DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE;
    // BootstrapConfig包含Bootstrap对象
    private final BootstrapConfig config = new BootstrapConfig(this);

    @SuppressWarnings("unchecked")
    private volatile AddressResolverGroup<SocketAddress> resolver =
            (AddressResolverGroup<SocketAddress>) DEFAULT_RESOLVER;
    private volatile SocketAddress remoteAddress;

    public Bootstrap() { }
}

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    volatile EventLoopGroup group;
    private volatile ChannelFactory<? extends C> channelFactory;
    private volatile SocketAddress localAddress;
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private volatile ChannelHandler handler;

    AbstractBootstrap() {
    }

    public B group(EventLoopGroup group) {
        this.group = group;
        return (B) this;
    }

    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        this.channelFactory = channelFactory;
        return (B) this;
    }

    public B handler(ChannelHandler handler) {
        this.handler = handler;
        return (B) this;
    }
}

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        this.clazz = clazz;
    }

    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
        }
    }
}
  • Bootstrap继承自AbstractBootstrap,核心对象包含BootstrapConfig。
  • AbstractBootstrap包含核心变量group、channelFactory、options、attrs、handler。
  • AbstractBootstrap提供链式编程风格来初始化这些变量。
  • AbstractBootstrap的channelFactory是通过channel()方法来设置的,通过ReflectiveChannelFactory来进行封装。
  • ReflectiveChannelFactory的newChannel()方法来创建channel对象。
  • Bootstrap的核心变量包括EventLoopGroup、ChannelHandler、ChannelFactory。

NioEventLoopGroup

NioEventLoopGroup
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    public NioEventLoopGroup() {
        this(0);
    }

    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    public NioEventLoopGroup(int nThreads, Executor executor) {
        // provider = sun.nio.ch.DefaultSelectorProvider.create();
        this(nThreads, executor, SelectorProvider.provider());
    }

    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        // executor默认为null
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    // 创建NioEventLoop对象
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
}


public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
}


public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // 省略非核心代码
            } finally {
                // 省略非核心代码
            }
        }

        chooser = chooserFactory.newChooser(children);
        // 省略非核心代码
    }
}
  • Bootstrap的核心变量包括EventLoopGroup对象NioEventLoopGroup。
  • NioEventLoopGroup的类继承关系如上图所示,继承MultithreadEventLoopGroup,核心关注初始化过程。
  • MultithreadEventExecutorGroup的核心变量children(NioEventLoop)、chooser(选择器)。
  • NioEventLoopGroup包含一系列NioEventLoop对象,通过chooser进行选择。
  • NioEventLoopGroup的newChild()方法负责创建NioEventLoop对象,传递的SelectorProvider对象是SelectorProvider.provider()生成的,是个单例方法。
  • NioEventLoopGroup作为事件中心包含核心的变量,部分定义在其父类当中,主要是包含了NioEventLoop的对象。

NioEventLoop

NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
    private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
    private static final boolean DISABLE_KEYSET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
    private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;

     // 绑定事件选择器,由provider创建
    Selector selector;
    private SelectedSelectionKeySet selectedKeys;
    // provider = SelectorProvider.provider(),单例方法。
    private final SelectorProvider provider;
    private final AtomicBoolean wakenUp = new AtomicBoolean();
    // 事件选择策略
    private final SelectStrategy selectStrategy;
    private volatile int ioRatio = 50;
    private int cancelledKeys;
    private boolean needsToSelectAgain;

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {

        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);

        // provider = sun.nio.ch.DefaultSelectorProvider.create()
        provider = selectorProvider;
        // selector = provider.openSelector();
        selector = openSelector();
        selectStrategy = strategy;
    }

    private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
        }
}
}
  • NioEventLoop作为处理event的处理中心,内部包含核心变量selector、provider、selectStrategy。
  • NioEventLoop是SocketChannel进行绑定的载体。
  • NioEventLoop的provider由sun.nio.ch.DefaultSelectorProvider.create()生成。
  • NioEventLoop的selector由provider.openSelector()生成。
  • SingleThreadEventLoop作为NioEventLoop的父类,提供register()方法解决channel绑定到指定NioEventLoop的方法。

NioSocketChannel

NioSocketChannel
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    // SelectorProvider.provider()生成的SelectorProvider对象,单例方法。
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    private final SocketChannelConfig config;

    public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }

    public NioSocketChannel(SelectorProvider provider) {
        // newSocket(provider)返回由SelectorProvider#openSocketChannel()返回channel对象
        this(newSocket(provider));
    }

    private static SocketChannel newSocket(SelectorProvider provider) {
        try {
            // java.nio.channels.spi.SelectorProvider#openSocketChannel
            return provider.openSocketChannel();
        } catch (IOException e) {
        }
    }

    public NioSocketChannel(SocketChannel socket) {
        this(null, socket);
    }

    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

    // newUnsafe返回的是NioSocketChannelUnsafe对象
    protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }

    private final class NioSocketChannelUnsafe extends NioByteUnsafe {
        @Override
        protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    // See https://github.com/netty/netty/issues/4449
                    doDeregister();
                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {
            }
            return null;
        }
    }

    private final SocketChannelConfig config;
}


public abstract class AbstractNioByteChannel extends AbstractNioChannel {

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }
}


public abstract class AbstractNioChannel extends AbstractChannel {
    private final SelectableChannel ch;
    protected final int readInterestOp;
    volatile SelectionKey selectionKey;
    boolean readPending;
    private ChannelPromise connectPromise;
    private ScheduledFuture<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {}
    }

    protected SelectableChannel javaChannel() {
        return ch;
    }
}


public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    private final Channel parent;
    private final ChannelId id;
    private final Unsafe unsafe;
    private final DefaultChannelPipeline pipeline;
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);
    private volatile SocketAddress localAddress;
    private volatile SocketAddress remoteAddress;
    private volatile EventLoop eventLoop;
    private volatile boolean registered;
    private boolean strValActive;
    private String strVal;

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        // 调用子类NioSocketChannel#newUnsafe创建的unsafe对象
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
}
  • ReflectiveChannelFactory的ReflectiveChannelFactory的newChannel()方法来创建NioSocketChannel对象。
  • NioSocketChannel包括SocketChannelConfig、DefaultChannelPipeline对象。
  • NioSocketChannel的DefaultChannelPipeline负责维护channelHandler对象。
  • NioSocketChannel的Unsafe为NioSocketChannelUnsafe对象。
  • NioSocketChannel的ch为SelectorProvider#openSocketChannel对象。
  • NioSocketChannel的javaChannel()方法返回ch,即SelectorProvider#openSocketChannel对象。
  • NioSocketChannel负责执行读写连接操作等动作。

SelectorProvider

SelectorProvider
public abstract class SelectorProvider {
    private static final Object lock = new Object();
    private static SelectorProvider provider = null;

    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

    public abstract DatagramChannel openDatagramChannel() throws IOException;
    public abstract DatagramChannel openDatagramChannel(ProtocolFamily family) throws IOException;
    public abstract Pipe openPipe() throws IOException;
    public abstract AbstractSelector openSelector() throws IOException;
    public abstract ServerSocketChannel openServerSocketChannel() throws IOException;
    public abstract SocketChannel openSocketChannel() throws IOException;
}
public abstract class SelectorProviderImpl extends SelectorProvider {
    public SelectorProviderImpl() {
    }

    public DatagramChannel openDatagramChannel() throws IOException {
        return new DatagramChannelImpl(this);
    }

    public DatagramChannel openDatagramChannel(ProtocolFamily var1) throws IOException {
        return new DatagramChannelImpl(this, var1);
    }

    public Pipe openPipe() throws IOException {
        return new PipeImpl(this);
    }

    public abstract AbstractSelector openSelector() throws IOException;

    public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
    }

    public SocketChannel openSocketChannel() throws IOException {
        return new SocketChannelImpl(this);
    }
}
  • SelectorProvider#provider静态方法返回单例SelectorProvider对象,核心方法包括openSelector()、openServerSocketChannel()、openSocketChannel()方法。
  • SelectorProvider#openSelector()返回selector对象。
  • SelectorProvider#openServerSocketChannel()返回ServerSocketChannel对象。
  • SelectorProvider#openSocketChannel()返回SocketChannel对象。

NioSocketChannelUnsafe

NioSocketChannelUnsafe
private final class NioSocketChannelUnsafe extends NioByteUnsafe {}

protected class NioByteUnsafe extends AbstractNioUnsafe {

    private void closeOnRead(ChannelPipeline pipeline) {}
    private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
            RecvByteBufAllocator.Handle allocHandle) {}
    public final void read() {}
}

protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {

    protected final void removeReadOp() { }
    public final SelectableChannel ch() {}
    public final void connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {}
    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {}
    private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {}
    public final void finishConnect() {}
    protected final void flush0() {}
    public final void forceFlush() {}
    private boolean isFlushPending() {}
}

protected abstract class AbstractUnsafe implements Unsafe {

    private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
    private RecvByteBufAllocator.Handle recvHandle;
    private boolean inFlush0;
    /** true if the channel has never been registered, false otherwise */
    private boolean neverRegistered = true;

    private void assertEventLoop() {}
    public RecvByteBufAllocator.Handle recvBufAllocHandle() {}
    public final ChannelOutboundBuffer outboundBuffer() {}
    public final SocketAddress localAddress() {}
    public final SocketAddress remoteAddress() {}
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {}
    private void register0(ChannelPromise promise) {}
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {}
    public final void disconnect(final ChannelPromise promise) {}
    public final void close(final ChannelPromise promise) {}
    private void close(final ChannelPromise promise, final Throwable cause,
                       final ClosedChannelException closeCause, final boolean notify) {}
    private void doClose0(ChannelPromise promise) {}
    private void fireChannelInactiveAndDeregister(final boolean wasActive) {}
    public final void closeForcibly() {}
    public final void deregister(final ChannelPromise promise) {}
    private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {}
    public final void beginRead() {}
    public final void write(Object msg, ChannelPromise promise) {}
    public final void flush() {}
    protected void flush0() {}
    public final ChannelPromise voidPromise() {}
    protected final boolean ensureOpen(ChannelPromise promise) {}
    protected final void safeSetSuccess(ChannelPromise promise) {}
    protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {}
    protected final void closeIfClosed() {}
    private void invokeLater(Runnable task) {}
    protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {}
    protected Executor prepareToClose() {}
}

interface Unsafe {
    RecvByteBufAllocator.Handle recvBufAllocHandle();
    SocketAddress localAddress();
    SocketAddress remoteAddress();
    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    void disconnect(ChannelPromise promise);
    void close(ChannelPromise promise);
    void closeForcibly();
    void deregister(ChannelPromise promise);
    void beginRead();
    void write(Object msg, ChannelPromise promise);
    void flush();
    ChannelPromise voidPromise();
    ChannelOutboundBuffer outboundBuffer();
}
  • NioSocketChannelUnsafe作为Unsafe的对象,负责执行真正的连接读取等操作。

DefaultChannelPipeline

TailContext HeadContext
public class DefaultChannelPipeline implements ChannelPipeline {

    static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);

    private static final String HEAD_NAME = generateName0(HeadContext.class);
    private static final String TAIL_NAME = generateName0(TailContext.class);

    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
            new FastThreadLocal<Map<Class<?>, String>>() {
        @Override
        protected Map<Class<?>, String> initialValue() throws Exception {
            return new WeakHashMap<Class<?>, String>();
        }
    };

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;

    private final Channel channel;
    private final ChannelFuture succeededFuture;
    private final VoidChannelPromise voidPromise;
    private final boolean touch = ResourceLeakDetector.isEnabled();

    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    private MessageSizeEstimator.Handle estimatorHandle;
    private boolean firstRegistration = true;
    private PendingHandlerCallback pendingHandlerCallbackHead;
    private boolean registered;

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
}
  • DefaultChannelPipeline包含AbstractChannelHandlerContext类型的head和tail串联对应的handler对象。

DefaultChannelPromise

DefaultChannelPromise
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {

    private final Channel channel;
    private long checkpoint;

    public DefaultChannelPromise(Channel channel) {
        this.channel = channel;
    }

    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = channel;
    }
}
  • DefaultChannelPromise负责封装NioSocketChannel对象和NioEventLoop对象。

NioSocketChannel使用流程

channel注册过程
  • NioSocketChannel的使用过程包括NioSocketChannel的初始化和注册两个过程。

NioSocketChannel初始化注册过程

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {

    private final BootstrapConfig config = new BootstrapConfig(this);
    private volatile SocketAddress remoteAddress;

    public Bootstrap() { }

    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        // 初始化channel的过程。
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // 省略相关代码
            return promise;
        }
    }
}
  • Bootstrap客户端下的SocketChannel是在进行connect操作的时候进行初始化,具体是在执行ChannelFuture f = b.connect(HOST, PORT).sync()的动作。
  • connect()内部会执行doResolveAndConnect() => AbstractBootstrap#initAndRegister。

initAndRegister

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 1.newChannel
            channel = channelFactory.newChannel();
            // 2.initChannel
            init(channel);
        } catch (Throwable t) {
           // 省略代码
        }
        // 3.registerChannel,config().group()返回NioEventLoopGroup对象
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        // 返回regFuture对象
        return regFuture;
    }
}
  • initAndRegister执行3件事包括newChannel、initChannel、registerChannel三个动作。
  • newChannel是通过channelFactory.newChannel()来实现channel的创建。
  • config().group().register(channel)返回的是NioEventLoopGroup对象。

newChannel - 新建

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
}


public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        this.channelFactory = channelFactory;
        return (B) this;
    }
}
  • channelFactory在Bootstrap.channel()中生成ReflectiveChannelFactory对象包含NioSocketChannel类。
  • channelFactory.newChannel()执行ReflectiveChannelFactory#newChannel生成NioSocketChannel对象。

initChannel - 初始化

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {

    void init(Channel channel) throws Exception {
        // 将handler对象加到NioSocketChannel的pipeline当中
        ChannelPipeline p = channel.pipeline();
        p.addLast(config.handler());

        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }
}
  • initChannel的过程主要绑定对应的handler到channel的pipeline对象当中。
  • initChannel的额外操作就是绑定一些属性信息。

registerChannel - 绑定

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    public ChannelFuture register(Channel channel) {
        // channel为NioSocketChannel
        return next().register(channel);
    }

    public EventLoop next() {
        return (EventLoop) super.next();
    }
}

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    // NioEventLoop对象的集合
    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    // chooser = chooserFactory.newChooser(children);
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    public EventExecutor next() {
        // chooser包含的是NioEventLoop的集合,相当于选择NioEventLoop的过程
        // PowerOfTwoEventExecutorChooser
        return chooser.next();
    }
}
  • NioEventLoopGroup的chooser.next()选择其中一个NioEventLoop的过程。
  • chooser有PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser两类,这里指的是PowerOfTwoEventExecutorChooser。
  • chooser.next().register(channel)选择一个NioEventLoop注册NioSocketChannel的channel对象。

chooser -选择

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}
  • PowerOfTwoEventExecutorChooser的next()方法按照轮询返回EventExecutor对象即NioEventLoop对象,NioEventLoop的实现接口里面有EventExecutor。
  • GenericEventExecutorChooser和PowerOfTwoEventExecutorChooser的next()区别在于取模的逻辑而已。

NioEventLoop#register

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    @Override
    public ChannelFuture register(Channel channel) {
        // this指的是NioEventLoop对象,channel为NioSocketChannel,
        // 封装为DefaultChannelPromise对象进行注册。
        return register(new DefaultChannelPromise(channel, this));
    }

    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // promise为DefaultChannelPromise对象
        // promise.channel()返回NioSocketChannel
        // promise.channel().unsafe()返回NioSocketChannelUnsafe对象
        // this为NioEventLoop对象
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}


public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
    private final Channel channel;
    private long checkpoint;

    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        // channel是NioSocketChannel对象,executor为NioEventLoop对象
        super(executor);
        this.channel = channel;
    }
}
  • 将NioEventLoop和NioSocketChannel封装为DefaultChannelPromise对象。
  • register()返回DefaultChannelPromise对象。
  • promise.channel().unsafe()返回NioSocketChannelUnsafe对象进行注册。

NioSocketChannelUnsafe#register

NioSocketChannelUnsafe
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    protected abstract class AbstractUnsafe implements Unsafe {

        private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
        private RecvByteBufAllocator.Handle recvHandle;
        private boolean inFlush0;
        /** true if the channel has never been registered, false otherwise */
        private boolean neverRegistered = true;

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            // 省略部分代码
            
            // NioEventLoop对象
            AbstractChannel.this.eventLoop = eventLoop;
            // 由eventLoop执行execute()来执行注册动作
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                boolean firstRegistration = neverRegistered;
                // 执行doRegister()方法
                doRegister();

                neverRegistered = false;
                registered = true;
                // pipeline的一系列操作,暂时不知道什么原理。
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();

                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }
  • NioSocketChannelUnsafe#register执行AbstractUnsafe#register的动作。
  • AbstractUnsafe#register执行的是NioEventLoop.execute()来执行register0()方法。
  • AbstractUnsafe#register0()方法内部执行AbstractNioChannel#doRegister()操作。
  • AbstractChannel.this.eventLoop = eventLoop赋值NioEventLoop给NioSocketChannel对象。

AbstractNioChannel#doRegister

public abstract class AbstractNioChannel extends AbstractChannel {

    private final SelectableChannel ch;
    protected final int readInterestOp;
    volatile SelectionKey selectionKey;
    boolean readPending;
    private ChannelPromise connectPromise;
    private ScheduledFuture<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
        }
    }

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // javaChannel()返回的是SelectorProvider.openSocketChannel()返回对象
                // java.nio.channels.spi.SelectorProvider#openSocketChannel
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }

    protected SelectableChannel javaChannel() {
        // io.netty.channel.nio.AbstractNioChannel#ch
        return ch;
    }
}
  • javaChannel()返回的是java.nio.channels.spi.SelectorProvider#openSocketChannel()生成的ch对象。
  • eventLoop().selector返回的是NioEventLoop的selector对象,由java.nio.channels.spi.SelectorProvider#openSelector()返回。
  • javaChannel().register()将原生的ch对象注册到NioEventLoop对象中的selector当中,同时绑定了当前的NioSocketChannel对象。

Bootstrap connect流程

connect流程

Bootstrap#doResolveAndConnect

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {

    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            // doResolveAndConnect0
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // 省略代码
        }
    }

    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) {
        try {
            final EventLoop eventLoop = channel.eventLoop();
            final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

            if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                // doConnect
                doConnect(remoteAddress, localAddress, promise);
                return promise;
            }
        // 省略代码
    }

    private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (localAddress == null) {
                    // connect
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    // connect
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        });
    }
}
  • 这里channel的类型就是NioSocketChannel。

NioSocketChannel#connect

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, promise);
    }
}
  • 调用NioSocketChannel的pipeline对象的connect()方法进行连接。

DefaultChannelPipeline#connect

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

    @Override
    public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }

    @Override
    public final ChannelFuture connect(
            SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, localAddress, promise);
    }
}
  • 调用DefaultChannelPipeline的tail对象开始执行connect()动作。
  • tail的执行顺序从后往前,因为head的执行顺序从前往后,两者相对应。

TailContext#connect

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return connect(remoteAddress, null, promise);
    }

    public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }
}
  • 上面的代码中有一个关键的地方, 即 final AbstractChannelHandlerContext next = findContextOutbound(), 这里调用 findContextOutbound 方法, 从 DefaultChannelPipeline 内的双向链表的 tail 开始, 不断向前寻找第一个 outbound 为 true 的 AbstractChannelHandlerContext, 然后调用它的 invokeConnect 方法。

HeadContext#connect

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }
}


public class DefaultChannelPipeline implements ChannelPipeline {

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {

            unsafe.connect(remoteAddress, localAddress, promise);
        }
}

AbstractNioUnsafe#connect

public abstract class AbstractNioChannel extends AbstractChannel {
    protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {

        public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            try {
                boolean wasActive = isActive();
                // doConnect
                if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } 
                // 省略相关代码
            } catch (Throwable t) {
                promise.tryFailure(annotateConnectException(t, remoteAddress));
                closeIfClosed();
            }
        }
}

NioSocketChannel#doConnect

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }
}
  • 获取 Java NIO SocketChannel, 从 NioSocketChannel.newSocket 返回的 SocketChannel 对象。

SocketUtils#connect

public final class SocketUtils {

    public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
            throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
                @Override
                public Boolean run() throws IOException {
                    return socketChannel.connect(remoteAddress);
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }
}
  • 调用 SocketChannel.connect方法完成Java NIO层面上的Socket的连接。

相关文章

网友评论

    本文标题:Netty源码分析 - Bootstrap客户端

    本文链接:https://www.haomeiwen.com/subject/fhnsehtx.html