public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
} else {
sslCtx = null;
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//将选项添加到AbstractBootstrap属性options. 实现类中Bootstrap的init(Channel channel)方法设置channel的类型
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
// Start the client.
//包含channel(注意channel可抽象为socket链接来理解)实例化Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister(最终channelFactory.newChannel();这里的factory就是前面设置的ReflectiveChannelFactory)
// NioSocketChannel最终newSocket 来打开一个新的 Java NIO SocketChannel, 最后调用父类AbstractChannel(Channel parent)
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
} finally {
// Shut down the event loop to terminate all threads.
- 分析重点,1、NioSocketChannel初始化。2、handler的添加。3、
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
netty的底层实现为java 的nio 前面文章已经介绍过。很明显netty的channel和nio中的channel其实是一类的。
ChannelFuture f = b.connect(HOST, PORT).sync();
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
return promise;
- 上述中最为重要的两个方法,一个initAndRegister 用于channel创建和注册。一个是doResolveAndConnect0用户channel于连接
initAndRegister 源码分析:
// 实例化channel和 channel 的注册过程
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
//2、BootstrapConfig中group()其实调用的为其父类 AbstractBootstrapConfig中的group()方法,该方法中返回bootstrap.group()即bootstrap中我们添加的group
//3、也就是最终调用NioEventLoopGroup的父类MultithreadEventExecutorGroup的register方法。该方法返回 next().register(channel);
//4、 next()方法MultithreadEventExecutorGroup的父类MultithreadEventExecutorGroup实现的。该方法返回 chooser.next(); 这里的
//这里的chooser是DefaultEventExecutorChooserFactory由方法chooserFactory.newChooser(children)返回;可参考(NioEventLoopGroup的父类MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup类的构造器this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);)
//我们还记得最初NioEventLoopGroup的构造器最终会调用MultithreadEventExecutorGroup的构造器MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)
// 在构造器中有一个children[i] = newChild(executor, args);方法格外引起我们的注意。因为这里的前面的next方法返回children数组中的值,newChild方法的实现类在NioEventLoopGroup中
//它返回return new NioEventLoop(this, executor, (SelectorProvider) args[0],
// ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);最后NioEventLoop的父类SingleThreadEventLoop 父类中的注册方法register,该方法调用到第四步
// 5、promise.channel().unsafe().register(this, promise);获取 channel 的 unsafe() 底层操作对象, 然后调用它的 register.
//6、在 AbstractUnsafe.register 方法中, 调用 register0 方法注册 Channel。AbstractUnsafe.register0 中, 调用 AbstractNioChannel#doRegister 方法
//7、AbstractNioChannel.doRegister 方法通过 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.
ChannelFuture regFuture = config().group().register(channel);//注册
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
} else {
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
- 先分析创建,就是利用ReflectiveChannelFactory以及最初我们添加NioSocketChannel.class 来创建新的对象。
* Create a new instance
public NioSocketChannel() {
这里的DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); 至于SelectorProvider的作用前面文章介绍过,会根据同的操作系统选择不同的io处理方式
继续跟踪代码会构造器中会掉用newSocket发挥java nio中的SocketChannel
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新Java NIO SocketChannel。 AbstractChannel(Channel parent) 中初始化 AbstractChannel 的属性:parent 属性置为 null
unsafe 通过newUnsafe() 实例化一个unsafe对象, 它的类型是AbstractNioByteChannel.NioByteUnsafe 内部类pipeline 是 new DefaultChannelPipeline(this) 新创建的实例. 这里体现了:Each channel has its own pipeline and it is created automatically when a new channel is created.
AbstractNioChannel 中的属性:
SelectableChannel ch 被设置为 Java SocketChannel, 即 NioSocketChannel#newSocket 返回的 Java NIO SocketChannel.
readInterestOp 被设置为 SelectionKey.OP_READ
SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)
NioSocketChannel 中的属性:
SocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket())
channel = channelFactory.newChannel();
- 分析初始化init方法的作用(实现在子类Bootstrap中)
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// config使用bootstrap.handler(),就是最初ChannelInitializer,可参考上面分析
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
- 接着我们分析initAndRegister。 的另外一个重要的方法config().group().register(channel);他的作用是channel注册
public ChannelFuture register(Channel channel) {
return next().register(channel);
public EventExecutor next() {
return chooser.next();
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//创建一个大小为 nThreads 的 EventExecutor数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
//从DefaultEventExecutorChooserFactory工厂实现类中的newChooser方法: 根据线程数在children 数组中选出一个合适的 EventExecutor 实例
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
for (EventExecutor e: children) {
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
这个构造器很长,我们关心的在chooser = chooserFactory.newChooser(children);从DefaultEventExecutorChooserFactory工厂实现类中的newChooser方法:
根据线程数在children 数组中选出一个合适的 EventExecutor 实例。
根据代码中children = new EventExecutor[nThreads];和children[i] = newChild(executor, args);newChild的实现类在NioEventLoopGroup中,返回NioEventLoop
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
//return executors[idx.getAndIncrement() & executors.length - 1];
} else {
return new GenericEventExecutorChooser(executors);
//return executors[Math.abs(idx.getAndIncrement() % executors.length)];
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
try {
ch.register(unwrappedSelector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
上面代码我们在java NIO使用很常见的注册,更多细节可查看NioEventLoop类源码
EventLoopGroup(包含java nio中的selector) 的初始化过程
EventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children 数组, 其大小是 nThreads, 这样就构成了一个线程池
如果我们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2
MultithreadEventExecutorGroup 中会调用 newChild 抽象方法来初始化 children 数组
抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例.
NioEventLoop 属性:
SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过 SelectorProvider.provider() 获取一个 SelectorProvider
Selector selector 属性: NioEventLoop 构造器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver has no idea about what to do with the specified remote address or it's resolved already.
doConnect(remoteAddress, localAddress, promise);
return promise;
return promise;
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
// DefaultChannelPromiseelPipeline 内的双向链表的 tail 开始, 不断向前寻找第一个 outbound 为 true 的 AbstractChannelHandlerContext, 然后调用它的 invokeConnect
final AbstractChannelHandlerContext next = findContextOutbound();
//next其实是找到headContext unsafe.connect(remoteAddress, localAddress, promise);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//当我们找到了一个 outbound 的 Context 后, 就调用它的 invokeConnect 方法, 这个方法中会调用 Context 所关联着的 ChannelHandler 的 connect 方法:
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}, promise, null);
return promise;
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
} else {
connect(remoteAddress, localAddress, promise);
已知connect中findContextOutbound会从: tail -> 自定义outboundhandler->head 这个调用过程
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
** unsafe.connect 中unsafe为操作nio底层的对象,细节后续说明。需要知道的是这里最终调用java NIO的socketChannel.connect(remoteAddress)**
总的来说, Channel 注册过程所做的工作就是将 Channel 与对应的 EventLoop 关联, 因此这也体现了, 在 Netty 中, 每个 Channel 都会关联一个特定的 EventLoop, 并且这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执行的; 当关联好 Channel 和 EventLoop 后, 会继续调用底层的 Java NIO SocketChannel 的 register 方法, 将底层的 Java NIO SocketChannel 注册到指定的 selector 中. 通过这两步, 就完成了 Netty Channel 的注册过程.