netty源码看不懂?试着写一个吧

作者: pq217 | 来源:发表于2022-06-25 18:02 被阅读0次

前言

最近一直在看netty源码,观后感:很难看,于是为了屡清netty的设计思路,我参照netty源码手写一个山寨简版的“netty”,说是手写,其实也就是从源码复制出来核心的代码,并尽量保持命名,设计结构与源码基本一致,因为我的目的很明确:尝试以作者的角度理解netty的全貌

效果

最终山寨版的netty代码server端使用如下(代码没有引用任何netty的依赖)

public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup(4);
    try {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new NettyServerHandler(), new NettyServerHandler2());
        System.out.println("netty server start...");
        bootstrap.bind(9000);
    } finally {
    }
}

其中NettyHandler:

public class NettyServerHandler implements ChannelInboundHandler {

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("thread: " + Thread.currentThread().getName());
        System.out.println("msg:" + new String(((ByteBuffer) msg).array()));
        // 传递给下个handler
        ctx.fireChannelRead(msg);
    }
}

看起来和真的netty其实差不多,最终执行的效果也比较符合预期

Netty与NIO

首先,在学习netty前,要了解它是干嘛的,从我们常使用的场景上看可以说NETTY其实就是对NIO编程的一种封装,所以在理解netty之前,nio的基础知识是必须要掌握的,下面是一个NIO编程的基础server端代码:

public class NioServer {

    public static void main(String[] args) throws IOException, InterruptedException {

        // 创建NIO ServerSocketChannel
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(9000));
        serverSocket.configureBlocking(false);
        // 打开Selector处理Channel,底层调用epoll_create
        Selector selector = Selector.open();
        // 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣,底层调用epoll_ctl
//        SelectionKey registerKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        SelectionKey serverRegisterKey = serverSocket.register(selector, 0);
        serverRegisterKey.interestOps(SelectionKey.OP_ACCEPT);
        // 测试attach
        serverRegisterKey.attach(new NioServer());
        System.out.println("服务启动成功");

        while (true) {
            // 阻塞等待需要处理的事件发生,即调用epoll_wait
            selector.select();
            // 获取selector中注册的全部事件的 SelectionKey 实例
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            // 遍历SelectionKey对事件进行处理
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 测试attachment
                System.out.println("attachment: "+ key.attachment());
                // 测试key
                System.out.println("key is serverRegisterKey:" + key.equals(serverRegisterKey));
                // 如果是OP_ACCEPT事件,则进行连接获取和事件注册
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = server.accept();
                    socketChannel.configureBlocking(false);
                    // 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件,底层调用epoll_ctl
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    System.out.println("客户端连接成功");
                } else if (key.isReadable()) {  // 如果是OP_READ事件,则进行读取和打印
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(128);
                    int len = socketChannel.read(byteBuffer);
                    // 如果有数据,把数据打印出来
                    if (len > 0) {
                        System.out.println("接收到消息:" + new String(byteBuffer.array()));
                    } else if (len == -1) { // 如果客户端断开连接,关闭Socket
                        System.out.println("客户端断开连接");
                        socketChannel.close();
                    }
                }
                //从事件集合里删除本次处理的key,防止下次select重复处理
                iterator.remove();
            }
        }
    }
}

如果这里看不懂,可以参考Netty-理解selector是什么

上面的服务端代码有几个问题:

  • 每次写基本都是大致固定的写法,很多代码可以封装
  • 所有的请求都交给同一个线程处理,强求量一大就会出现问题
  • 写起来非常蹩脚

而netty的存在也就是为了解决这些问题,通过封装让NIO的编程变的简单易用

所以下面我们就假装以作者的设计角度,尝试着写一个"山寨"的netty

总体思路

首先我们要把每次都会写的固定代码封装起来,比如服务端接受OP_ACCEPT事件,之后需要把accpet到的channel注册到多路复用器,这些都是基本固定代码,可以封装,而接受到客户端信息之后的处理方式是根据需求而定的,所以要暴露出来可以让用户自定义

其次就是最重要的线程模型,肯定不能所有的请求都同一个线程去处理,最次也得用个线程池

Channel的封装

回头看NIO的代码,出现了两种jdk的channel:ServerSocketChannelSocketChannel,二者都继承了SelectableChannel,二者都有一些通用的行为,我们可以给它抽象出来一个channel类,里面封装了jdk的channel和感兴趣的事件,并提供了注册到多路复用器、绑定端口、设置非阻塞等行为,并可以继续泛化为服务端通道和客户端通道

Channel
Channel下的管道

回头看NIO的代码,每个channel发生事件后都会进行一些处理,所以这些处理的方式可以说是属于某个channel的,而一个channel下可能会有个步骤的处理工作,比如先解码再实际处理业务,我们可以把channel下每个处理步骤抽象成一个handler,多个handler互相连接组成channel下的一个管道(pipeline),这样channel的机构如下

pipeline

用户可以通过给管道下的pipeline添加处理步骤来改变时间放生时的响应,以实现自定义的客户端通道发来信息的响应

而服务端的管道处理步骤基本是一定的,即读到channel注册到多路复用器,所以对应的handler我们可以封装一个在服务端通道建立时就绑定上

EventLoop

通道封装完了,通道发生事件后的处理步骤也可以用户自定义,下一步研究就是这些活到底谁来干,也就是方法写好了,下一步是用什么线程模型去执行这些方法能做到高效,所以现在需要的是一个执行者(Executor)

我们想想实际场景,一个服务端可能会受到多个客户端的连接请求,我们要做的是高效的让请求多时可以用多个线程去处理,同时希望线程数可以根据场景来配置,针对这种情况,大师Doug Lea给出了以一种解决思路,写在《Scalable IO in Java》中,有兴趣可以自行百度,最终Lea给出的模型大概如下

Scalable IO in Java

主要思路是有一个主线程可以通过selector监听服务端通道,在发生连接事件后,把连接的通道注册到子线程池中某一个线程下的selector中,这个子线程就负责使用selector监听客户端通道,发生事件后处理

所以我们要有这样一种线程:

  • 内部有多路复用器selector,可以注册多个channel,并在事件放生是执行channel下绑定的处理步骤
  • 除了监控selector,也可以以执行给定任务

这种线程可以处理事件(Event),同时它一经启动就不会自动关闭,因为要监控selector,所以内部一定是个死循环Loop,所以这种执行特定任务的线程就叫做EventLoop,说的更直白点,就是一个会不断响应多路复用器事件的多channel处理者

而channel也可以注册到EventLoop,这样channel发生的事件就会由EventLoop按管道内部定义的handler执行处理

EventLoop
EventLoop&EventLoopGroup

EventLoop是一个特定的线程,那么EventLoopGroup就是这种特定线程的组合,也就是特定线程池,内部包含固定数量的EventLoop,而其对外提供的服务和内部的EventLoop组一模一样,只不过选了某一个EventLoop去具体执行服务,所以二者的方法是一样的,在netty源码中,EventLoop继承了EventLoopGroup,看到这里不免很糊涂,确实很怪,但可以这样去理解:EventLoopGroup是一群一模一样的EventLoop的组合,所以EventLoop能干什么,EventLoopGroup也就能且只能干什么,这样EventLoop就可以看成一个特殊的EventLoopGroup,只不过是只有一个对象的EventLoopGroup
比如:有一个瓦匠群体,瓦匠群体的每个人都只会砌墙,那么群体对外能提供的服务即砌墙,只不过是选一个瓦匠去砌墙,而某个瓦匠对外提供的服务也是砌墙,那么他可以看做一个特殊的瓦匠群体

EventLoopGroup
uml

结合以上的设计思路,再尽量贴近netty的源码,最终画出的uml图如下


netty

与上面的思路相比,更细致了多层的抽象类,尽量单一职责,主要是为了和netty基本结构一致

相比于netty,主要省略了EventExecutor和EventExecutorGroup,把对应的代码写在了EventLoop和EventLoopGroup中,省略了各种Unsafe内部类,把Unsafe的方法直接写在外部类

每个类的具体意义和实现代码中介绍

Channel管道

Channel

首先是抽象的channel接口,主要方法即绑定EventLoop,绑定端口和获取pipeline

public interface Channel {
    /**
     * 绑定事件持续处理器
     * @param eventLoop
     */
    void register(EventLoop eventLoop);

    /**
     * 获取事件持续处理器
     * @return
     */
    EventLoop eventLoop();

    /**
     * 通道内部的管道
     * @return
     */
    ChannelPipeline pipeline();

    /**
     * 绑定端口
     * @param localAddress
     */
    void bind(SocketAddress localAddress);

    /**
     * 开始读取,nio的实现即注册感兴趣的事件
     */
    void beginRead();
}
ChannelHandler

即管道绑定的处理器,一个管道对应一个pipeline,一个pipeline用链表形式存储多个处理器,这个我简化了一下,只是空接口,他有两种子类ChannelInboundHandler和ChannelOutboundHandler,分别管道进入即读事件的处理器和管道流出即写事件的处理器

public interface ChannelHandler {
}
ChannelInboundHandler

管道进入事件处理器,与之对应管道返回事件处理器,我并没有写

public interface ChannelInboundHandler extends ChannelHandler{

    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
}
ChannelHandlerContext

由于用户再写处理器时需要得知上下文信息,即channel是哪个(可以通过channel写回数据),eventLoop是哪个,所以需要用一个上下文对象把handler包装起来,同时Context对象还保存了链表的关系,使得handler形成链表

public class ChannelHandlerContext {

    private final ChannelHandler handler;

    /**
     * 链表
     */
    volatile ChannelHandlerContext next;
    volatile ChannelHandlerContext prev;

    private final ChannelPipeline pipeline;

    public ChannelHandlerContext(ChannelPipeline pipeline, ChannelHandler handler) {
        this.pipeline = pipeline;
        this.handler = handler;
    }

    /**
     * 当前通道
     * @return
     */
    public Channel channel() {
        return pipeline.channel();
    }

    /**
     * 当前管道
     * @return
     */
    public ChannelPipeline pipeline() {
        return pipeline;
    }

    /**
     * 当前执行器
     * @return
     */
    public EventLoop executor() {
        return channel().eventLoop();
    }

    public ChannelHandler handler() {
        return handler;
    }

    /**
     * 把信息传给链表下一个read节点去处理
     * @param msg
     * @return
     */
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        findContextInbound().invokeChannelRead(msg);
        return this;
    }

    /**
     * 找到自己后面的Inbound处理器
     * @return
     */
    private ChannelHandlerContext findContextInbound() {
        ChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!(ctx.handler() instanceof ChannelInboundHandler));
        return ctx;
    }

    /**
     * 调用handler的channelRead方法
     * @param msg
     */
    private void invokeChannelRead(Object msg) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            // 如果当前的handler不是ChannelInboundHandler则报错
        }
    }

}
ChannelPipeline

保存链表的收尾,因为链表是双向链表(in和out反向),同时支持添加新处理器至链表

public class ChannelPipeline {
    /**
     * 管道的第一个处理器,管道的处理器是链式结构
     */
    final ChannelHandlerContext head;
    final ChannelHandlerContext tail;
    /**
     * 所在的通道
     */
    private final Channel channel;

    public ChannelPipeline(Channel channel) {
        this.channel = channel;
        head = new HeadContext(this);
        tail = new TailContext(this);
        // 头尾互指
        head.next = tail;
        tail.prev = head;
    }

    public final Channel channel() {
        return channel;
    }

    /**
     * 添加处理器
     * @param handler
     * @return
     */
    public final ChannelPipeline addLast(ChannelHandler handler) {
        // 把handler包装成上下文
        ChannelHandlerContext newCtx = new ChannelHandlerContext(this, handler);
        addLast0(newCtx);
        return this;
    }

    /**
     * 在链表结尾添加新的节点
     * @param newCtx
     */
    private void addLast0(ChannelHandlerContext newCtx) {
        ChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

    /**
     * 结尾,简化处理
     */
    final class TailContext extends ChannelHandlerContext {
        public TailContext(ChannelPipeline pipeline) {
            super(pipeline, null);
        }
    }

    /**
     * 头部简化处理
     */
    final class HeadContext extends ChannelHandlerContext {
        public HeadContext(ChannelPipeline pipeline) {
            super(pipeline, null);
        }
    }

    /**
     * 开始处理read操作
     * @param msg
     * @return
     */
    public final ChannelPipeline fireChannelRead(Object msg) {
        head.fireChannelRead(msg);
        return this;
    }

    public final ChannelPipeline fireChannelReadComplete() {
        // 省略不写了,和fireChannelRead差不多道理
        return this;
    }
}
AbstractChannel

AbstractChannel 抽象的channel实现,主要实现了绑定EventLoop对象和初始化ChannelPipeline,也就是构造出Channel数据结构

public abstract class AbstractChannel implements Channel {

    /**
     * 父通道
     */
    private final Channel parent;

    /**
     * 绑定的事件循环器
     */
    private volatile EventLoop eventLoop;

    /**
     * 管道
     */
    private final ChannelPipeline pipeline;

    public AbstractChannel(Channel parent) {
        this.parent = parent;
        pipeline = newChannelPipeline();
    }

    protected ChannelPipeline newChannelPipeline() {
        return new ChannelPipeline(this);
    }

    /**
     * 返回绑定的事件处理器
     *
     * @return
     */
    @Override
    public EventLoop eventLoop() {
        return eventLoop;
    }

    @Override
    public ChannelPipeline pipeline() {
        return pipeline;
    }

    @Override
    public void beginRead() {
        doBeginRead();
    }

    protected abstract void doBeginRead();

    @Override
    public void bind(SocketAddress localAddress) {
        try {
            doBind(localAddress);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    protected abstract void doBind(SocketAddress localAddress) throws Exception;

    protected void doRegister() throws Exception {
        // NOOP
    }

    /**
     * Channel绑定eventLoop
     *
     * @param eventLoop
     */
    @Override
    public final void register(EventLoop eventLoop) {
        // 省去乱七八遭的判断,源码实在内部类Unsafe下,所以是:AbstractChannel.this
        AbstractChannel.this.eventLoop = eventLoop;
        eventLoop.execute(() -> {
            register0();
        });
    }

    /**
     * 实际注册
     */
    private void register0() {
        try {
            doRegister();
            // 开始读取感兴趣事件
            beginRead();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

通过new ChannelPipeline(this),初始化了一个channel下的空管道,register方法只是存储EventLoop对象,而实际的doRegister抽象处理,因为不同的channel注册方式不一样,NIO是注册到多路复用器,其它的注册方式并非如此

AbstractNioChannel

这个就是专门处理NIO的抽象channel了,就可以实际去实现NIO的注册了

public abstract class AbstractNioChannel extends AbstractChannel {

    /**
     * java的channel 包括ServerSocketChannel和SocketChannel
     */
    private final SelectableChannel ch;

    private SelectionKey selectionKey;

    /**
     * 感兴趣的事件
     */
    protected final int readInterestOp;

    public AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 返回java的channel
     *
     * @return
     */
    protected SelectableChannel javaChannel() {
        return ch;
    }

    /**
     * 把通道注册到多路复用器
     * @throws Exception
     */
    @Override
    protected void doRegister() throws Exception {
        // 最后一个字段this,相当于selectionKey.attach(this),后续可以通过attachment()方法取到
        // 由于多个channel注册到一个eventLoop,所以需要传递当前的channel以便eventLoop获取到事件时可以知道是哪个channel产生的事件
        selectionKey = javaChannel().register(((NioEventLoop) eventLoop()).unwrappedSelector(), 0, this);
    }

    /**
     * 注册感兴趣事件
     */
    @Override
    protected void doBeginRead() {
        selectionKey.interestOps(readInterestOp);
    }

    /**
     * 从 {@link SelectableChannel} 读取事件,源码是写在Unsafe里
     */
    public abstract void read();

}

存储了SelectableChannel,即ServerSocketChannel和SocketChannel的共同父类

实现了doRegister方法,即把jdk的channel注册到EventLoop下面的多路复用器

实现了doBeginRead方法,即注册感兴趣事件

抽象了一个read方法,即从channel读取信息,由于客户端与服务端读取方法不一样,所以抽象出来

而客户端与服务端的read实现也是分别抽象了两个类来提交给channel的handler,即AbstractNioMessageChannel和AbstractNioByteChannel

AbstractNioMessageChannel

主要封装了通过抽象doReadMessages读取事件信息后传递给channel的管道

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    /**
     * 读取到的缓存
     */
    private final List<Object> readBuf = new ArrayList<Object>();

    public AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }

    /**
     * 从SelectableChannel中读取信息
     */
    @Override
    public void read() {
        final ChannelPipeline pipeline = pipeline();
        // 实际读取信息,由子类实现
        doReadMessages(readBuf);
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            // 调用管道的read处理器
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
    }

    protected abstract int doReadMessages(List<Object> buf);
}

其中doReadMessages实际读取信息,看一下子类NioServerSocketChannel如何实现

NioServerSocketChannel

NIO服务端通道,实现了doReadMessages,即读取客户端channel

public class NioServerSocketChannel extends AbstractNioMessageChannel {

    /**
     * 多路复用器提供者
     */
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    /**
     * 开启一个 java ServerSocketChannel
     * @param provider
     * @return
     */
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            return null;
        }
    }

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
    }

    /**
     * 覆盖,因为可以确定返回的是ServerSocketChannel
     * @return
     */
    @Override
    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }

    /**
     * 绑定端socket
     * @param localAddress
     * @throws Exception
     */
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        javaChannel().socket().bind(localAddress);
    }

    /**
     * 读取信息,作为SeverSocketChannel(服务端通道),读取信息即accept后的SocketChannel(客户端通道)
     * @param buf
     * @return
     */
    @Override
    protected int doReadMessages(List<Object> buf) {
        SocketChannel ch = null;
        try {
            ch = javaChannel().accept();
        } catch (IOException e) {
        }
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
        return 0;
    }

}

同时封装了ServerSocketChannel的创建,并和感兴趣SelectionKey.OP_ACCEPT的事件传递给父类,实现了绑定端口javaChannel().socket().bind(localAddress)

AbstractNioByteChannel

与AbstractNioMessageChannel对应,是读取byte也就是字节并传递给channel的管道,实际的读取还是抽象的doReadBytes

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

    public AbstractNioByteChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }

    @Override
    public void read() {
        final ChannelPipeline pipeline = pipeline();
        // 这里做了简化处理,源码用的自封装ByteBuf
        ByteBuffer byteBuf = ByteBuffer.allocate(128);
        doReadBytes(byteBuf);
        pipeline.fireChannelRead(byteBuf);
    }

    protected abstract int doReadBytes(ByteBuffer buf);
}

源码使用自己封装的ByteBuf,这里简化了,看一下它的子类即NioSocketChannel

NioSocketChannel

客户端的channel

public class NioSocketChannel extends AbstractNioByteChannel {

    /**
     * 多路复用器提供者
     */
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    /**
     * 开启一个 java SocketChannel 这个方法为客户端创建通道使用
     *
     * @param provider
     * @return
     */
    private static SocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openSocketChannel();
        } catch (IOException e) {
            return null;
        }
    }

    public NioSocketChannel() {
        this(null, newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    /**
     * SocketChannel 感兴趣的事件是READ
     *
     * @param parent
     * @param channel
     */
    public NioSocketChannel(Channel parent, SocketChannel channel) {
        super(parent, channel, SelectionKey.OP_READ);
    }

    /**
     * 覆盖,因为可以确定返回的是SocketChannel
     *
     * @return
     */
    @Override
    protected SocketChannel javaChannel() {
        return (SocketChannel) super.javaChannel();
    }

    /**
     * 绑定端socket
     *
     * @param localAddress
     * @throws Exception
     */
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        javaChannel().socket().bind(localAddress);
    }

    @Override
    protected int doReadBytes(ByteBuffer buf) {
        try {
            return javaChannel().read(buf);
        } catch (IOException e) {
            throw new RuntimeException();
        }
    }
}

这个和Server端差不多,只不过一个读channel,一个读字节。感兴趣的事件是SelectionKey.OP_READ

到此channel相关类写完~

EventLoop事件循环器

EventLoopGroup

事件循环器组,和EventLoop提供一样的功能,同时可以选择下一个EventLoop且可以迭代

public interface EventLoopGroup extends Executor, Iterable<EventLoop>{

    void register(Channel channel);

    EventLoop next();

    @Override
    Iterator<EventLoop> iterator();
}
EventLoop

继承EventLoopGroup,并且可以查到父Group

public interface EventLoop extends EventLoopGroup {
    EventLoopGroup parent();
}
SingleThreadEventExecutor

本来打算把EventExecutor去掉,代码写在EventLoop,但这个类太重要了,所以保留了下来

public abstract class SingleThreadEventExecutor implements Executor {
    /**
     * 默认任务列表长度
     */
    protected static final int DEFAULT_MAX_PENDING_TASKS = 16;
    /**
     * 待完成的任务
     */
    private final Queue<Runnable> taskQueue;
    /**
     * 实际工作者
     */
    private final Executor executor;
    /**
     * 当前运行线程
     */
    private volatile Thread thread;

    /**
     * ST_NOT_STARTED: 未启动, ST_STARTED 已启动
     */
    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;

    /**
     * 标记是否启动
     */
    private volatile int state = ST_NOT_STARTED;

    /**
     * 原子启动标记更新器
     */
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");

    public SingleThreadEventExecutor(Executor executor) {
        taskQueue = newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
        this.executor = executor;
    }

    /**
     * 初始化一个新的任务对列
     * @param maxPendingTasks
     * @return
     */
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        return new LinkedBlockingQueue<>(maxPendingTasks);
    }

    /**
     * 添加任务
     * @param task
     */
    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!taskQueue.offer(task)) {
            throw new RejectedExecutionException("event executor terminated");
        }
    }

    /**
     * 检查是否有任务
     *
     * @return
     */
    protected boolean hasTasks() {
        return !taskQueue.isEmpty();
    }

    /**
     * 运行所有任务
     * @return
     */
    protected boolean runAllTasks() {
        // 省略乱七八糟的判断,把多个子方法简化
        Runnable task = taskQueue.poll();
        if (task == null) {
            return false;
        }
        for (;;) {
            task.run();
            task = taskQueue.poll();
            if (task == null) {
                return true;
            }
        }
    }

    @Override
    public void execute(Runnable task) {
        addTask(task);
        startThread();
    }

    /**
     * 开启线程执行(判断已启动过不再启动)
     */
    private void startThread() {
        // 未启动才能启动,也就是只启动一次
        if (state == ST_NOT_STARTED) {
            // 再次CAS判断避免线程不安全
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }

    /**
     * 实际开启线程执行run方法
     */
    private void doStartThread() {
        // 使用真实的执行者执行任务
        executor.execute(() -> {
            // 保存执行的 线程
            thread = Thread.currentThread();
            // 省去乱起八遭的判断
            SingleThreadEventExecutor.this.run();
            // 如果执行结束,则报错
            System.out.println("Buggy EventExecutor implementation; SingleThreadEventExecutor.confirmShutdown() must be called before run() implementation terminates");
        });
    }

    /**
     * 抽象run方法,是一个不能运行结束的方法(除非手动关闭),即loop
     */
    protected abstract void run();

}

可以这么概括它:SingleThreadEventExecutor是一种特殊的任务执行器,第一次收到任务它就会启动(开启一个线程运行run方法,run方法是一个会一直执行的方法,否则报错),当任务来时并不是立即执行,它们会加入到自己的任务对列中,并且按自己的套路(run)去执行

SingleThreadEventLoop

单线程事件循环器

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    private final Collection<EventLoop> selfCollection = Collections.<EventLoop>singleton(this);

    private final EventLoopGroup parent;

    public SingleThreadEventLoop(EventLoopGroup parent, Executor executor) {
        super(executor);
        this.parent = parent;
    }

    @Override
    public EventLoopGroup parent() {
        return parent;
    }


    @Override
    public void register(Channel channel) {
        // netty源码  promise.channel().unsafe().register(this, promise); 简化不区分unsafe,如下
        channel.register(this);
    }

    @Override
    public EventLoop next() {
        return this;
    }

    @Override
    public Iterator<EventLoop> iterator() {
        return selfCollection.iterator();
    }

}

这个类其实就是实现了parent(),和next() ,iterator()这种应对Group接口的方法,之所以敢称单线程,是因为他爹是单线程处理器SingleThreadEventExecutor,它的存在就是为了让子类不用再处理与Group的关系

NioEventLoop

重点来了,Nio版的事件事件循环器

public class NioEventLoop extends SingleThreadEventLoop {

    private Selector selector;

    private final SelectorProvider provider;

    public NioEventLoop(NioEventLoopGroup parent, SelectorProvider selectorProvider, Executor executor) {
        super(parent, executor);
        this.provider = selectorProvider;
        this.selector = openSelector();
    }

    public SelectorProvider selectorProvider() {
        return provider;
    }

    private Selector openSelector() {
        try {
            return selectorProvider().openSelector();
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 由于没做装饰,所以selector即unwrappedSelector
     *
     * @return
     */
    public Selector unwrappedSelector() {
        return selector;
    }

    /**
     * 运行
     */
    @Override
    protected void run() {
        for (;;) {
            try {
                select();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                processSelectedKeys();
            } finally {
                runAllTasks();
            }
        }
    }

    private void select() throws IOException {
        // 拿到多路复用器
        Selector selector = this.selector;
        for (;;) {
            // 等待,简化固定1秒
            int selectedKeys = selector.select(1000);
            // 如果有事件发生或当前有任务跳出循环
            if (selectedKeys != 0 || hasTasks()) {
                break;
            }
        }
    }

    private void processSelectedKeys() {
        processSelectedKeysPlain(selector.selectedKeys());
    }

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        if (selectedKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();
            // 获取注册时绑定的参数
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                // 由于手写简版只attach了AbstractNioChannel所以不会出现,但源码有其它的attach
            }
            if (!i.hasNext()) {
                break;
            }
        }
    }

    /**
     * 处理单个事件
     * @param k
     * @param ch
     */
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // 这个try是为了和源码尽量长得像,简版不处理异常
        try {
            // 获取发生的事件标识
            int readyOps = k.readyOps();
            // 如果是read事件 或 accpet事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                // channel读取
                ch.read();
            }
        } catch (Exception e) {

        }
    }
}

内部携带了一个多路复用器,作为一个SingleThreadEventExecutor,它的运行套路是不断的监听selector,如果任务队列有任务,就处理任务,这里我简化了代码,监听1秒再去查看是否有任务,没有再回来监听,源码是有个策略判断该执行任务还是该阻塞到selector上

MultithreadEventLoopGroup

它存在的意义和SingleThreadEventLoop差不多,帮助子类处理group和成员的关系,实现了next方法(这里我简化了用轮训,源码可以自定义chooser)

public abstract class MultithreadEventLoopGroup implements EventLoopGroup {

    private final EventLoop[] children;
    /**
     * 为了迭代用
     */
    private final Set<EventLoop> readonlyChildren;

    public MultithreadEventLoopGroup(int nThreads, Executor executor) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException();
        }
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(new DefaultThreadFactory());
        }
        this.children = new EventLoop[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            children[i] = newChild(executor);
        }

        /**
         * 为了迭代用
         */
        Set<EventLoop> childrenSet = new LinkedHashSet<EventLoop>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

    protected abstract EventLoop newChild(Executor executor);

    /**
     * 源码用一个chooser对象选择子线程,这里简化一下,就轮训吧
     * @return
     */
    int i=0;

    public EventLoop chooseNext() {
        if (i>=children.length) {
            i =0;
        }
        EventLoop child = children[i];
        i++;
        return child;
    }

    @Override
    public void register(Channel channel) {
        next().register(channel);
    }

    @Override
    public EventLoop next() {
        return chooseNext();
    }

    @Override
    public Iterator<EventLoop> iterator() {
        return readonlyChildren.iterator();
    }

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }
}

抽象了newChild交给子类去实际创建组成员

NioEventLoopGroup

NioEventLoop的组,实现了newChild创建组成员即:NioEventLoop

public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    public NioEventLoopGroup(int nThreads) {
        super(nThreads, null);
    }

    public NioEventLoopGroup(int nThreads, Executor executor) {
        super(nThreads, executor);
    }

    @Override
    protected EventLoop newChild(Executor executor) {
        return new NioEventLoop(this, SelectorProvider.provider(), executor);
    }
}
ThreadPerTaskExecutor 和 DefaultThreadFactory

这俩的存在主要是给提供真实线程,并统一命名

public class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
public class DefaultThreadFactory implements ThreadFactory {
    private AtomicInteger no = new AtomicInteger(0);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "nio-thread-"+(no.incrementAndGet()));
    }
}

Bootstrap

channel和eventLoop都定义完了,接下来就要给他们串联起来,做启动类了,由于有服务端和客户端两种启动类,所以还是抽象了一个Bootstrap

AbstractBootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> {

    volatile EventLoopGroup group;

    /**
     * 源码使用工厂模式存储是channelFactory,这里简化处理
     */
    private volatile Channel channel;

    AbstractBootstrap() {
        // Disallow extending from a different package.
    }

    private B self() {
        return (B) this;
    }

    public B group(EventLoopGroup group) {
        this.group = group;
        return self();
    }

    public B channel(Class<? extends C> channelClass) {
        try {
            channel = channelClass.getConstructor().newInstance();
        } catch (Exception e) {
        }
        return self();
    }

    public void bind(int inetPort) {
        doBind(new InetSocketAddress(inetPort));
    }

    private void doBind(final SocketAddress localAddress) {
        initAndRegister();
        // 让channel绑定的线程去实际绑定
        channel.eventLoop().execute(()->{
            channel.bind(localAddress);
        });
    }

    final void initAndRegister() {
        init(channel);
        group.register(channel);
    }

    abstract void init(Channel channel);
}

抽象了一个channel init方法,如果前面看懂了,这里应该能猜到主要是为了给ServerChannel添加默认的handler

ServerBootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, NioServerSocketChannel> {

    private volatile EventLoopGroup childGroup;

    /**
     * 这里简化处理
     */
    private volatile ChannelHandler[] childHandlers;

    public ServerBootstrap() { }

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        this.childGroup = childGroup;
        return this;
    }

    public ServerBootstrap childHandler(ChannelHandler... childHandlers) {
        this.childHandlers = childHandlers;
        return this;
    }

    @Override
    void init(Channel channel) {
        ChannelPipeline p = channel.pipeline();
        // 这里给ServerChannel添加管道处理器,简化了代码
        p.addLast(new ServerBootstrapAcceptor(childGroup, childHandlers));
    }

    private static class ServerBootstrapAcceptor implements ChannelInboundHandler {

        private final EventLoopGroup childGroup;
        private final ChannelHandler[] childHandlers;

        private ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler[] childHandlers) {
            this.childGroup = childGroup;
            this.childHandlers = childHandlers;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            final Channel child = (Channel) msg;
            for (ChannelHandler childHandler : childHandlers) {
                child.pipeline().addLast(childHandler);
            }
            childGroup.register(child);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 略
        }
    }
}

子类ServerBootstrapAcceptor 就是ServerSocketChannel默认的handler,通过init方法添加上

总结

到此,山寨版的netty写完了,累了一头汗,代码的命名和类命名尽量和源码保持一致,因为都是复制过来的,有些很复杂的地方做了简化处理,但个人感觉核心的代码除了bytebuf应该都写上了,使用方式开头有写基本和netty差不多,还是那句话,手写的目的是理解netty源码

篇幅有限,这里面的事好多代码都没细讲,但代码也摘的很轻量,完全可以自行理解,回头再看netty,基本就差不多

回想一下,其实netty的核心概念就总结出来了:channel维护了jdk的通道,并可以设置后置处理器实现,EventLoop是针对channel事件的专用线程,而EventLoopGroup是它们组合即专用线程池

相关文章

网友评论

    本文标题:netty源码看不懂?试着写一个吧

    本文链接:https://www.haomeiwen.com/subject/fdbivrtx.html