美文网首页Netty
netty源码分析 - EventLoop类关系

netty源码分析 - EventLoop类关系

作者: 晴天哥_王志 | 来源:发表于2020-03-31 21:49 被阅读0次

系列

Netty源码分析 - Bootstrap服务端
Netty源码分析 - Bootstrap客户端
netty源码分析 - ChannelHandler
netty源码分析 - EventLoop类关系
netty源码分析 - register分析
Netty源码分析 - NioEventLoop事件处理
netty源码分析 - accept过程分析
Netty源码分析 - ByteBuf
Netty源码分析 - 粘包和拆包问题

开篇

  • 这篇文章的目的主要是为了梳理下EventLoop和NioEventLoopGroup的类关系,慎重阅读,有点枯燥。
  • 顺带解释了下Channel+NioEventLoopGroup+EventLoop的注册流程。

NioEventLoop

NioEventLoop
  • NioEventLoop的继承关系主线如上图所示,NioEventLoop --> SingleThreadEventLoop --> SingleThreadEventExecutor --> AbstractScheduledEventExecutor --> AbstractEventExecutor --> AbstractExecutorService,其中AbstractEventExecutor作为连接JDK原生Executor和Netty的Executor的桥梁。

AbstractExecutorService

AbstractExecutorService
  • Executor接口只有一个execute()方法。
  • ExecutorService接口新增了判断是否shutdown相关的方法、提交任务的submit方法、方法调用的invoke相关的方法。
  • AbstractExecutorService新增了newTaskFor的相关方法,并重载实现了submit的相关方法。
  • AbstractExecutorService作为JDK的Executor对象(如ThreadPoolExecutor)的父类,比较值得研究下。

AbstractEventExecutor

AbstractEventExecutor
  • AbstractEventExecutor作为Netty的Executor的基类,起着承上启下的作用。
  • AbstractEventExecutor的核心包括submit和schedule其实都是重写了父类的AbstractExecutorService的方法,不过schedule内部没有任何实现。

AbstractScheduledEventExecutor

AbstractScheduledEventExecutor
  • AbstractScheduledEventExecutor类的核心方法都和schedule()有关,也是真正实现schedule功能的相关的方法。

SingleThreadEventExecutor

SingleThreadEventExecutor
  • SingleThreadEventExecutor包含任务队列Queue<Runnable> taskQueue,这个任务队列是保存所有任务的核心数据结构,是一个LinkedBlockingQueue数据结构的对象。
  • SingleThreadEventExecutor的execute()方法负责提交Runnable对象到对应的taskQueue对象中。

SingleThreadEventLoop

image.png
  • SingleThreadEventLoop#register把eventLoop对象绑定对应的channel当中。

NioEventLoop

NioEventLoop
  • NioEventLoop#run方法用于处理该NioEventLoop的任务,是任务执行的核心调度入口。
  • NioEventLoop#processSelectedKeys是由NioEventLoop#run内部触发的,负责处理事件。
public final class NioEventLoop extends SingleThreadEventLoop {
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        selector = openSelector();
        selectStrategy = strategy;
    }
}



public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        tailTasks = newTaskQueue(maxPendingTasks);
    }
}


public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    private final Queue<Runnable> taskQueue;
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
}
  • NioEventLoop的初始化过程,主要关注核心的几个变量包括selectorProvider、selector、selectStrategy、taskQueue。
  • selector:这个就是jdk中nio的多路复用器Selector对象,熟悉nio类的人应该很清楚了,每个NioEventLoop会绑定一个selector对象。调用它的select()或者selectNow方法来读取io事件。
  • selectStrategy:选择策略SelectStrategy对象。它提供了一种控制select循环行为的能力,要么是select方法阻塞线程,要么是继续循环两种行为。
  • provider:SelectorProvider类的对象,它是Selector的工厂类,调用它的openSelector方法即可打开一个新的selector对象。
  • ioRatio:表示处理io事件时间占比,默认值是50,即各占一半时间。

NioEventLoopGroup

NioEventLoopGroup

EventExecutorGroup

EventExecutorGroup

AbstractEventExecutorGroup

AbstractEventExecutorGroup

MultithreadEventExecutorGroup

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    // NioEvengLoop的选择器chooser
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;


    public EventExecutor next() {
        return chooser.next();
    }
}
MultithreadEventExecutorGroup

MultithreadEventLoopGroup

MultithreadEventLoopGroup
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    public EventExecutor next() {
        return chooser.next();
    }
}

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }

    @Deprecated
    @Override
    public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }
}
  • MultithreadEventLoopGroup的register()方法负责选中其中一种NioEventLoop注册channel。

EventLoop注册channel流程

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            // 省略代码
        }
        // channle绑定eventLoop对象
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }
}


public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
}


public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    public EventExecutor next() {
        return chooser.next();
    }
}
  • config().group().register(channel)实现了NioEventLoopGroup通过chooser.next()选择NioEventLoop绑定channel。

相关文章

网友评论

    本文标题:netty源码分析 - EventLoop类关系

    本文链接:https://www.haomeiwen.com/subject/njosehtx.html