系列
Netty源码分析 - Bootstrap服务端
Netty源码分析 - Bootstrap客户端
netty源码分析 - ChannelHandler
netty源码分析 - EventLoop类关系
netty源码分析 - register分析
Netty源码分析 - NioEventLoop事件处理
netty源码分析 - accept过程分析
Netty源码分析 - ByteBuf
Netty源码分析 - 粘包和拆包问题
开篇
- 本文基于netty-4.1.8.Final版本进行分析,主要是分析Netty Client初始化过程。
- 理解Bootstrap、NioEventLoopGroup、NioEventLoop、NioSocketChannel之间的联系。
- 理解SelectorProvider、NioSocketChannelUnsafe、DefaultChannelPipeline、DefaultChannelPromise之间的联系。
- 理解NioSocketChannel的初始化和注册流程。
- 理解Bootstrap客户端连接过程连接过程。
基本概念
Bootstrap关系图- Bootstrap包含一个EventLoopGroup对象。
- EventLoopGroup对象包含多个EventLoop对象。
- EventLoop对象包含Selector负责处理EventLoop的事件。
- SocketChannel包含EventLoop,记录绑定对应的EventLoop对象。
- SocketChannel包含Unsafe,所有的读写连接动作由Unsafe去实现。
- SocketChannel包含DefaultChannelPipeline对象,维护所有handler对象。
- SocketChannel的SelectableChannel类型的ch对象执行真正连接的对象,由SelectorProvider.openSocketChannel()创建。
Netty Client案例
public final class DiscardClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// 创建事件处理Group
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建Bootstrap对象
Bootstrap b = new Bootstrap();
// 绑定group到Bootstrap
b.group(group)
// 绑定channel到Bootstrap
.channel(NioSocketChannel.class)
// 绑定handler到Bootstrap
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(new DiscardClientHandler());
}
});
// 建立连接
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
- Netty客户端demo如上图所示,固定的操作步骤。
- 创建EventLoopGroup对象
- 创建Bootstrap对象
- 绑定EventLoopGroup到Bootstrap对象,采用链式编程风格,直接返回了当前对象。
- 绑定channel(NioSocketChannel)到Bootstrap对象,采用链式编程风格。
- 绑定handler(ChannelInitializer)到Bootstrap对象,采用链式编程风格。
- 建立连接,Bootstrap的connect(HOST, PORT).sync()方法。
对象基本概念介绍
Bootstrap
Bootstrap- Bootstrap的继承关系如上图,父类AbstractBootstrap提供通用功能。
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class);
private static final AddressResolverGroup<?> DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE;
// BootstrapConfig包含Bootstrap对象
private final BootstrapConfig config = new BootstrapConfig(this);
@SuppressWarnings("unchecked")
private volatile AddressResolverGroup<SocketAddress> resolver =
(AddressResolverGroup<SocketAddress>) DEFAULT_RESOLVER;
private volatile SocketAddress remoteAddress;
public Bootstrap() { }
}
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
volatile EventLoopGroup group;
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile ChannelHandler handler;
AbstractBootstrap() {
}
public B group(EventLoopGroup group) {
this.group = group;
return (B) this;
}
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
this.channelFactory = channelFactory;
return (B) this;
}
public B handler(ChannelHandler handler) {
this.handler = handler;
return (B) this;
}
}
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
}
}
}
- Bootstrap继承自AbstractBootstrap,核心对象包含BootstrapConfig。
- AbstractBootstrap包含核心变量group、channelFactory、options、attrs、handler。
- AbstractBootstrap提供链式编程风格来初始化这些变量。
- AbstractBootstrap的channelFactory是通过channel()方法来设置的,通过ReflectiveChannelFactory来进行封装。
- ReflectiveChannelFactory的newChannel()方法来创建channel对象。
- Bootstrap的核心变量包括EventLoopGroup、ChannelHandler、ChannelFactory。
NioEventLoopGroup
NioEventLoopGrouppublic class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
// provider = sun.nio.ch.DefaultSelectorProvider.create();
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
// executor默认为null
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
// 创建NioEventLoop对象
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
}
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
private static final int DEFAULT_EVENT_LOOP_THREADS;
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// 省略非核心代码
} finally {
// 省略非核心代码
}
}
chooser = chooserFactory.newChooser(children);
// 省略非核心代码
}
}
- Bootstrap的核心变量包括EventLoopGroup对象NioEventLoopGroup。
- NioEventLoopGroup的类继承关系如上图所示,继承MultithreadEventLoopGroup,核心关注初始化过程。
- MultithreadEventExecutorGroup的核心变量children(NioEventLoop)、chooser(选择器)。
- NioEventLoopGroup包含一系列NioEventLoop对象,通过chooser进行选择。
- NioEventLoopGroup的newChild()方法负责创建NioEventLoop对象,传递的SelectorProvider对象是SelectorProvider.provider()生成的,是个单例方法。
- NioEventLoopGroup作为事件中心包含核心的变量,部分定义在其父类当中,主要是包含了NioEventLoop的对象。
NioEventLoop
NioEventLooppublic final class NioEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
// 绑定事件选择器,由provider创建
Selector selector;
private SelectedSelectionKeySet selectedKeys;
// provider = SelectorProvider.provider(),单例方法。
private final SelectorProvider provider;
private final AtomicBoolean wakenUp = new AtomicBoolean();
// 事件选择策略
private final SelectStrategy selectStrategy;
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
// provider = sun.nio.ch.DefaultSelectorProvider.create()
provider = selectorProvider;
// selector = provider.openSelector();
selector = openSelector();
selectStrategy = strategy;
}
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
}
}
}
- NioEventLoop作为处理event的处理中心,内部包含核心变量selector、provider、selectStrategy。
- NioEventLoop是SocketChannel进行绑定的载体。
- NioEventLoop的provider由sun.nio.ch.DefaultSelectorProvider.create()生成。
- NioEventLoop的selector由provider.openSelector()生成。
- SingleThreadEventLoop作为NioEventLoop的父类,提供register()方法解决channel绑定到指定NioEventLoop的方法。
NioSocketChannel
NioSocketChannelpublic class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
// SelectorProvider.provider()生成的SelectorProvider对象,单例方法。
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private final SocketChannelConfig config;
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
// newSocket(provider)返回由SelectorProvider#openSocketChannel()返回channel对象
this(newSocket(provider));
}
private static SocketChannel newSocket(SelectorProvider provider) {
try {
// java.nio.channels.spi.SelectorProvider#openSocketChannel
return provider.openSocketChannel();
} catch (IOException e) {
}
}
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
// newUnsafe返回的是NioSocketChannelUnsafe对象
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
// See https://github.com/netty/netty/issues/4449
doDeregister();
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
}
return null;
}
}
private final SocketChannelConfig config;
}
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
}
public abstract class AbstractNioChannel extends AbstractChannel {
private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey;
boolean readPending;
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {}
}
protected SelectableChannel javaChannel() {
return ch;
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
private boolean strValActive;
private String strVal;
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
// 调用子类NioSocketChannel#newUnsafe创建的unsafe对象
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
}
- ReflectiveChannelFactory的ReflectiveChannelFactory的newChannel()方法来创建NioSocketChannel对象。
- NioSocketChannel包括SocketChannelConfig、DefaultChannelPipeline对象。
- NioSocketChannel的DefaultChannelPipeline负责维护channelHandler对象。
- NioSocketChannel的Unsafe为NioSocketChannelUnsafe对象。
- NioSocketChannel的ch为SelectorProvider#openSocketChannel对象。
- NioSocketChannel的javaChannel()方法返回ch,即SelectorProvider#openSocketChannel对象。
- NioSocketChannel负责执行读写连接操作等动作。
SelectorProvider
SelectorProviderpublic abstract class SelectorProvider {
private static final Object lock = new Object();
private static SelectorProvider provider = null;
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
public abstract DatagramChannel openDatagramChannel() throws IOException;
public abstract DatagramChannel openDatagramChannel(ProtocolFamily family) throws IOException;
public abstract Pipe openPipe() throws IOException;
public abstract AbstractSelector openSelector() throws IOException;
public abstract ServerSocketChannel openServerSocketChannel() throws IOException;
public abstract SocketChannel openSocketChannel() throws IOException;
}
public abstract class SelectorProviderImpl extends SelectorProvider {
public SelectorProviderImpl() {
}
public DatagramChannel openDatagramChannel() throws IOException {
return new DatagramChannelImpl(this);
}
public DatagramChannel openDatagramChannel(ProtocolFamily var1) throws IOException {
return new DatagramChannelImpl(this, var1);
}
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
public abstract AbstractSelector openSelector() throws IOException;
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
}
- SelectorProvider#provider静态方法返回单例SelectorProvider对象,核心方法包括openSelector()、openServerSocketChannel()、openSocketChannel()方法。
- SelectorProvider#openSelector()返回selector对象。
- SelectorProvider#openServerSocketChannel()返回ServerSocketChannel对象。
- SelectorProvider#openSocketChannel()返回SocketChannel对象。
NioSocketChannelUnsafe
NioSocketChannelUnsafeprivate final class NioSocketChannelUnsafe extends NioByteUnsafe {}
protected class NioByteUnsafe extends AbstractNioUnsafe {
private void closeOnRead(ChannelPipeline pipeline) {}
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {}
public final void read() {}
}
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
protected final void removeReadOp() { }
public final SelectableChannel ch() {}
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {}
public final void finishConnect() {}
protected final void flush0() {}
public final void forceFlush() {}
private boolean isFlushPending() {}
}
protected abstract class AbstractUnsafe implements Unsafe {
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private RecvByteBufAllocator.Handle recvHandle;
private boolean inFlush0;
/** true if the channel has never been registered, false otherwise */
private boolean neverRegistered = true;
private void assertEventLoop() {}
public RecvByteBufAllocator.Handle recvBufAllocHandle() {}
public final ChannelOutboundBuffer outboundBuffer() {}
public final SocketAddress localAddress() {}
public final SocketAddress remoteAddress() {}
public final void register(EventLoop eventLoop, final ChannelPromise promise) {}
private void register0(ChannelPromise promise) {}
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {}
public final void disconnect(final ChannelPromise promise) {}
public final void close(final ChannelPromise promise) {}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {}
private void doClose0(ChannelPromise promise) {}
private void fireChannelInactiveAndDeregister(final boolean wasActive) {}
public final void closeForcibly() {}
public final void deregister(final ChannelPromise promise) {}
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {}
public final void beginRead() {}
public final void write(Object msg, ChannelPromise promise) {}
public final void flush() {}
protected void flush0() {}
public final ChannelPromise voidPromise() {}
protected final boolean ensureOpen(ChannelPromise promise) {}
protected final void safeSetSuccess(ChannelPromise promise) {}
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {}
protected final void closeIfClosed() {}
private void invokeLater(Runnable task) {}
protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {}
protected Executor prepareToClose() {}
}
interface Unsafe {
RecvByteBufAllocator.Handle recvBufAllocHandle();
SocketAddress localAddress();
SocketAddress remoteAddress();
void register(EventLoop eventLoop, ChannelPromise promise);
void bind(SocketAddress localAddress, ChannelPromise promise);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelPromise promise);
void close(ChannelPromise promise);
void closeForcibly();
void deregister(ChannelPromise promise);
void beginRead();
void write(Object msg, ChannelPromise promise);
void flush();
ChannelPromise voidPromise();
ChannelOutboundBuffer outboundBuffer();
}
- NioSocketChannelUnsafe作为Unsafe的对象,负责执行真正的连接读取等操作。
DefaultChannelPipeline
TailContext HeadContextpublic class DefaultChannelPipeline implements ChannelPipeline {
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap<Class<?>, String>();
}
};
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;
private PendingHandlerCallback pendingHandlerCallbackHead;
private boolean registered;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
}
- DefaultChannelPipeline包含AbstractChannelHandlerContext类型的head和tail串联对应的handler对象。
DefaultChannelPromise
DefaultChannelPromisepublic class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel;
private long checkpoint;
public DefaultChannelPromise(Channel channel) {
this.channel = channel;
}
public DefaultChannelPromise(Channel channel, EventExecutor executor) {
super(executor);
this.channel = channel;
}
}
- DefaultChannelPromise负责封装NioSocketChannel对象和NioEventLoop对象。
NioSocketChannel使用流程
channel注册过程- NioSocketChannel的使用过程包括NioSocketChannel的初始化和注册两个过程。
NioSocketChannel初始化注册过程
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private final BootstrapConfig config = new BootstrapConfig(this);
private volatile SocketAddress remoteAddress;
public Bootstrap() { }
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 初始化channel的过程。
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// 省略相关代码
return promise;
}
}
}
- Bootstrap客户端下的SocketChannel是在进行connect操作的时候进行初始化,具体是在执行ChannelFuture f = b.connect(HOST, PORT).sync()的动作。
- connect()内部会执行doResolveAndConnect() => AbstractBootstrap#initAndRegister。
initAndRegister
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1.newChannel
channel = channelFactory.newChannel();
// 2.initChannel
init(channel);
} catch (Throwable t) {
// 省略代码
}
// 3.registerChannel,config().group()返回NioEventLoopGroup对象
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// 返回regFuture对象
return regFuture;
}
}
- initAndRegister执行3件事包括newChannel、initChannel、registerChannel三个动作。
- newChannel是通过channelFactory.newChannel()来实现channel的创建。
- config().group().register(channel)返回的是NioEventLoopGroup对象。
newChannel - 新建
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
this.channelFactory = channelFactory;
return (B) this;
}
}
- channelFactory在Bootstrap.channel()中生成ReflectiveChannelFactory对象包含NioSocketChannel类。
- channelFactory.newChannel()执行ReflectiveChannelFactory#newChannel生成NioSocketChannel对象。
initChannel - 初始化
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
void init(Channel channel) throws Exception {
// 将handler对象加到NioSocketChannel的pipeline当中
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
}
- initChannel的过程主要绑定对应的handler到channel的pipeline对象当中。
- initChannel的额外操作就是绑定一些属性信息。
registerChannel - 绑定
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
public ChannelFuture register(Channel channel) {
// channel为NioSocketChannel
return next().register(channel);
}
public EventLoop next() {
return (EventLoop) super.next();
}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
// NioEventLoop对象的集合
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
// chooser = chooserFactory.newChooser(children);
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
public EventExecutor next() {
// chooser包含的是NioEventLoop的集合,相当于选择NioEventLoop的过程
// PowerOfTwoEventExecutorChooser
return chooser.next();
}
}
- NioEventLoopGroup的chooser.next()选择其中一个NioEventLoop的过程。
- chooser有PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser两类,这里指的是PowerOfTwoEventExecutorChooser。
- chooser.next().register(channel)选择一个NioEventLoop注册NioSocketChannel的channel对象。
chooser -选择
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
- PowerOfTwoEventExecutorChooser的next()方法按照轮询返回EventExecutor对象即NioEventLoop对象,NioEventLoop的实现接口里面有EventExecutor。
- GenericEventExecutorChooser和PowerOfTwoEventExecutorChooser的next()区别在于取模的逻辑而已。
NioEventLoop#register
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
// this指的是NioEventLoop对象,channel为NioSocketChannel,
// 封装为DefaultChannelPromise对象进行注册。
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// promise为DefaultChannelPromise对象
// promise.channel()返回NioSocketChannel
// promise.channel().unsafe()返回NioSocketChannelUnsafe对象
// this为NioEventLoop对象
promise.channel().unsafe().register(this, promise);
return promise;
}
}
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel;
private long checkpoint;
public DefaultChannelPromise(Channel channel, EventExecutor executor) {
// channel是NioSocketChannel对象,executor为NioEventLoop对象
super(executor);
this.channel = channel;
}
}
- 将NioEventLoop和NioSocketChannel封装为DefaultChannelPromise对象。
- register()返回DefaultChannelPromise对象。
- promise.channel().unsafe()返回NioSocketChannelUnsafe对象进行注册。
NioSocketChannelUnsafe#register
NioSocketChannelUnsafepublic abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private RecvByteBufAllocator.Handle recvHandle;
private boolean inFlush0;
/** true if the channel has never been registered, false otherwise */
private boolean neverRegistered = true;
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 省略部分代码
// NioEventLoop对象
AbstractChannel.this.eventLoop = eventLoop;
// 由eventLoop执行execute()来执行注册动作
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
// 执行doRegister()方法
doRegister();
neverRegistered = false;
registered = true;
// pipeline的一系列操作,暂时不知道什么原理。
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
- NioSocketChannelUnsafe#register执行AbstractUnsafe#register的动作。
- AbstractUnsafe#register执行的是NioEventLoop.execute()来执行register0()方法。
- AbstractUnsafe#register0()方法内部执行AbstractNioChannel#doRegister()操作。
- AbstractChannel.this.eventLoop = eventLoop赋值NioEventLoop给NioSocketChannel对象。
AbstractNioChannel#doRegister
public abstract class AbstractNioChannel extends AbstractChannel {
private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey;
boolean readPending;
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
}
}
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// javaChannel()返回的是SelectorProvider.openSocketChannel()返回对象
// java.nio.channels.spi.SelectorProvider#openSocketChannel
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
protected SelectableChannel javaChannel() {
// io.netty.channel.nio.AbstractNioChannel#ch
return ch;
}
}
- javaChannel()返回的是java.nio.channels.spi.SelectorProvider#openSocketChannel()生成的ch对象。
- eventLoop().selector返回的是NioEventLoop的selector对象,由java.nio.channels.spi.SelectorProvider#openSelector()返回。
- javaChannel().register()将原生的ch对象注册到NioEventLoop对象中的selector当中,同时绑定了当前的NioSocketChannel对象。
Bootstrap connect流程
connect流程Bootstrap#doResolveAndConnect
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// doResolveAndConnect0
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// 省略代码
}
}
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// doConnect
doConnect(remoteAddress, localAddress, promise);
return promise;
}
// 省略代码
}
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
// connect
channel.connect(remoteAddress, connectPromise);
} else {
// connect
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
}
- 这里channel的类型就是NioSocketChannel。
NioSocketChannel#connect
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
}
- 调用NioSocketChannel的pipeline对象的connect()方法进行连接。
DefaultChannelPipeline#connect
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
@Override
public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}
}
- 调用DefaultChannelPipeline的tail对象开始执行connect()动作。
- tail的执行顺序从后往前,因为head的执行顺序从前往后,两者相对应。
TailContext#connect
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return connect(remoteAddress, null, promise);
}
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
}
- 上面的代码中有一个关键的地方, 即 final AbstractChannelHandlerContext next = findContextOutbound(), 这里调用 findContextOutbound 方法, 从 DefaultChannelPipeline 内的双向链表的 tail 开始, 不断向前寻找第一个 outbound 为 true 的 AbstractChannelHandlerContext, 然后调用它的 invokeConnect 方法。
HeadContext#connect
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
}
AbstractNioUnsafe#connect
public abstract class AbstractNioChannel extends AbstractChannel {
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
boolean wasActive = isActive();
// doConnect
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
}
// 省略相关代码
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
}
NioSocketChannel#doConnect
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
}
- 获取 Java NIO SocketChannel, 从 NioSocketChannel.newSocket 返回的 SocketChannel 对象。
SocketUtils#connect
public final class SocketUtils {
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
}
- 调用 SocketChannel.connect方法完成Java NIO层面上的Socket的连接。
网友评论