前言
最近一直在看netty源码,观后感:很难看,于是为了屡清netty的设计思路,我参照netty源码手写一个山寨简版的“netty”,说是手写,其实也就是从源码复制出来核心的代码,并尽量保持命名,设计结构与源码基本一致,因为我的目的很明确:尝试以作者的角度理解netty的全貌
效果
最终山寨版的netty代码server端使用如下(代码没有引用任何netty的依赖)
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new NettyServerHandler(), new NettyServerHandler2());
System.out.println("netty server start...");
bootstrap.bind(9000);
} finally {
}
}
其中NettyHandler:
public class NettyServerHandler implements ChannelInboundHandler {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("thread: " + Thread.currentThread().getName());
System.out.println("msg:" + new String(((ByteBuffer) msg).array()));
// 传递给下个handler
ctx.fireChannelRead(msg);
}
}
看起来和真的netty其实差不多,最终执行的效果也比较符合预期
Netty与NIO
首先,在学习netty前,要了解它是干嘛的,从我们常使用的场景上看可以说NETTY其实就是对NIO编程的一种封装,所以在理解netty之前,nio的基础知识是必须要掌握的,下面是一个NIO编程的基础server端代码:
public class NioServer {
public static void main(String[] args) throws IOException, InterruptedException {
// 创建NIO ServerSocketChannel
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
serverSocket.configureBlocking(false);
// 打开Selector处理Channel,底层调用epoll_create
Selector selector = Selector.open();
// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣,底层调用epoll_ctl
// SelectionKey registerKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey serverRegisterKey = serverSocket.register(selector, 0);
serverRegisterKey.interestOps(SelectionKey.OP_ACCEPT);
// 测试attach
serverRegisterKey.attach(new NioServer());
System.out.println("服务启动成功");
while (true) {
// 阻塞等待需要处理的事件发生,即调用epoll_wait
selector.select();
// 获取selector中注册的全部事件的 SelectionKey 实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历SelectionKey对事件进行处理
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 测试attachment
System.out.println("attachment: "+ key.attachment());
// 测试key
System.out.println("key is serverRegisterKey:" + key.equals(serverRegisterKey));
// 如果是OP_ACCEPT事件,则进行连接获取和事件注册
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件,底层调用epoll_ctl
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
} else if (key.isReadable()) { // 如果是OP_READ事件,则进行读取和打印
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
int len = socketChannel.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) { // 如果客户端断开连接,关闭Socket
System.out.println("客户端断开连接");
socketChannel.close();
}
}
//从事件集合里删除本次处理的key,防止下次select重复处理
iterator.remove();
}
}
}
}
如果这里看不懂,可以参考Netty-理解selector是什么
上面的服务端代码有几个问题:
- 每次写基本都是大致固定的写法,很多代码可以封装
- 所有的请求都交给同一个线程处理,强求量一大就会出现问题
- 写起来非常蹩脚
而netty的存在也就是为了解决这些问题,通过封装让NIO的编程变的简单易用
所以下面我们就假装以作者的设计角度,尝试着写一个"山寨"的netty
总体思路
首先我们要把每次都会写的固定代码封装起来,比如服务端接受OP_ACCEPT事件,之后需要把accpet到的channel注册到多路复用器,这些都是基本固定代码,可以封装,而接受到客户端信息之后的处理方式是根据需求而定的,所以要暴露出来可以让用户自定义
其次就是最重要的线程模型,肯定不能所有的请求都同一个线程去处理,最次也得用个线程池
Channel的封装
回头看NIO的代码,出现了两种jdk的channel:ServerSocketChannel
和SocketChannel
,二者都继承了SelectableChannel,二者都有一些通用的行为,我们可以给它抽象出来一个channel类,里面封装了jdk的channel和感兴趣的事件,并提供了注册到多路复用器、绑定端口、设置非阻塞等行为,并可以继续泛化为服务端通道和客户端通道
Channel下的管道
回头看NIO的代码,每个channel发生事件后都会进行一些处理,所以这些处理的方式可以说是属于某个channel的,而一个channel下可能会有个步骤的处理工作,比如先解码再实际处理业务,我们可以把channel下每个处理步骤抽象成一个handler,多个handler互相连接组成channel下的一个管道(pipeline),这样channel的机构如下
pipeline用户可以通过给管道下的pipeline添加处理步骤来改变时间放生时的响应,以实现自定义的客户端通道发来信息的响应
而服务端的管道处理步骤基本是一定的,即读到channel注册到多路复用器,所以对应的handler我们可以封装一个在服务端通道建立时就绑定上
EventLoop
通道封装完了,通道发生事件后的处理步骤也可以用户自定义,下一步研究就是这些活到底谁来干,也就是方法写好了,下一步是用什么线程模型去执行这些方法能做到高效,所以现在需要的是一个执行者(Executor)
我们想想实际场景,一个服务端可能会受到多个客户端的连接请求,我们要做的是高效的让请求多时可以用多个线程去处理,同时希望线程数可以根据场景来配置,针对这种情况,大师Doug Lea给出了以一种解决思路,写在《Scalable IO in Java》中,有兴趣可以自行百度,最终Lea给出的模型大概如下
Scalable IO in Java主要思路是有一个主线程可以通过selector监听服务端通道,在发生连接事件后,把连接的通道注册到子线程池中某一个线程下的selector中,这个子线程就负责使用selector监听客户端通道,发生事件后处理
所以我们要有这样一种线程:
- 内部有多路复用器selector,可以注册多个channel,并在事件放生是执行channel下绑定的处理步骤
- 除了监控selector,也可以以执行给定任务
这种线程可以处理事件(Event),同时它一经启动就不会自动关闭,因为要监控selector,所以内部一定是个死循环Loop,所以这种执行特定任务的线程就叫做EventLoop,说的更直白点,就是一个会不断响应多路复用器事件的多channel处理者
而channel也可以注册到EventLoop,这样channel发生的事件就会由EventLoop按管道内部定义的handler执行处理
EventLoopEventLoop&EventLoopGroup
EventLoop是一个特定的线程,那么EventLoopGroup就是这种特定线程的组合,也就是特定线程池,内部包含固定数量的EventLoop,而其对外提供的服务和内部的EventLoop组一模一样,只不过选了某一个EventLoop去具体执行服务,所以二者的方法是一样的,在netty源码中,EventLoop继承了EventLoopGroup,看到这里不免很糊涂,确实很怪,但可以这样去理解:EventLoopGroup是一群一模一样的EventLoop的组合,所以EventLoop能干什么,EventLoopGroup也就能且只能干什么,这样EventLoop就可以看成一个特殊的EventLoopGroup,只不过是只有一个对象的EventLoopGroup
比如:有一个瓦匠群体,瓦匠群体的每个人都只会砌墙,那么群体对外能提供的服务即砌墙,只不过是选一个瓦匠去砌墙,而某个瓦匠对外提供的服务也是砌墙,那么他可以看做一个特殊的瓦匠群体
uml
结合以上的设计思路,再尽量贴近netty的源码,最终画出的uml图如下
netty
与上面的思路相比,更细致了多层的抽象类,尽量单一职责,主要是为了和netty基本结构一致
相比于netty,主要省略了EventExecutor和EventExecutorGroup,把对应的代码写在了EventLoop和EventLoopGroup中,省略了各种Unsafe内部类,把Unsafe的方法直接写在外部类
每个类的具体意义和实现代码中介绍
Channel管道
Channel
首先是抽象的channel接口,主要方法即绑定EventLoop,绑定端口和获取pipeline
public interface Channel {
/**
* 绑定事件持续处理器
* @param eventLoop
*/
void register(EventLoop eventLoop);
/**
* 获取事件持续处理器
* @return
*/
EventLoop eventLoop();
/**
* 通道内部的管道
* @return
*/
ChannelPipeline pipeline();
/**
* 绑定端口
* @param localAddress
*/
void bind(SocketAddress localAddress);
/**
* 开始读取,nio的实现即注册感兴趣的事件
*/
void beginRead();
}
ChannelHandler
即管道绑定的处理器,一个管道对应一个pipeline,一个pipeline用链表形式存储多个处理器,这个我简化了一下,只是空接口,他有两种子类ChannelInboundHandler和ChannelOutboundHandler,分别管道进入即读事件的处理器和管道流出即写事件的处理器
public interface ChannelHandler {
}
ChannelInboundHandler
管道进入事件处理器,与之对应管道返回事件处理器,我并没有写
public interface ChannelInboundHandler extends ChannelHandler{
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
}
ChannelHandlerContext
由于用户再写处理器时需要得知上下文信息,即channel是哪个(可以通过channel写回数据),eventLoop是哪个,所以需要用一个上下文对象把handler包装起来,同时Context对象还保存了链表的关系,使得handler形成链表
public class ChannelHandlerContext {
private final ChannelHandler handler;
/**
* 链表
*/
volatile ChannelHandlerContext next;
volatile ChannelHandlerContext prev;
private final ChannelPipeline pipeline;
public ChannelHandlerContext(ChannelPipeline pipeline, ChannelHandler handler) {
this.pipeline = pipeline;
this.handler = handler;
}
/**
* 当前通道
* @return
*/
public Channel channel() {
return pipeline.channel();
}
/**
* 当前管道
* @return
*/
public ChannelPipeline pipeline() {
return pipeline;
}
/**
* 当前执行器
* @return
*/
public EventLoop executor() {
return channel().eventLoop();
}
public ChannelHandler handler() {
return handler;
}
/**
* 把信息传给链表下一个read节点去处理
* @param msg
* @return
*/
public ChannelHandlerContext fireChannelRead(final Object msg) {
findContextInbound().invokeChannelRead(msg);
return this;
}
/**
* 找到自己后面的Inbound处理器
* @return
*/
private ChannelHandlerContext findContextInbound() {
ChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!(ctx.handler() instanceof ChannelInboundHandler));
return ctx;
}
/**
* 调用handler的channelRead方法
* @param msg
*/
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
// 如果当前的handler不是ChannelInboundHandler则报错
}
}
}
ChannelPipeline
保存链表的收尾,因为链表是双向链表(in和out反向),同时支持添加新处理器至链表
public class ChannelPipeline {
/**
* 管道的第一个处理器,管道的处理器是链式结构
*/
final ChannelHandlerContext head;
final ChannelHandlerContext tail;
/**
* 所在的通道
*/
private final Channel channel;
public ChannelPipeline(Channel channel) {
this.channel = channel;
head = new HeadContext(this);
tail = new TailContext(this);
// 头尾互指
head.next = tail;
tail.prev = head;
}
public final Channel channel() {
return channel;
}
/**
* 添加处理器
* @param handler
* @return
*/
public final ChannelPipeline addLast(ChannelHandler handler) {
// 把handler包装成上下文
ChannelHandlerContext newCtx = new ChannelHandlerContext(this, handler);
addLast0(newCtx);
return this;
}
/**
* 在链表结尾添加新的节点
* @param newCtx
*/
private void addLast0(ChannelHandlerContext newCtx) {
ChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
/**
* 结尾,简化处理
*/
final class TailContext extends ChannelHandlerContext {
public TailContext(ChannelPipeline pipeline) {
super(pipeline, null);
}
}
/**
* 头部简化处理
*/
final class HeadContext extends ChannelHandlerContext {
public HeadContext(ChannelPipeline pipeline) {
super(pipeline, null);
}
}
/**
* 开始处理read操作
* @param msg
* @return
*/
public final ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
public final ChannelPipeline fireChannelReadComplete() {
// 省略不写了,和fireChannelRead差不多道理
return this;
}
}
AbstractChannel
AbstractChannel 抽象的channel实现,主要实现了绑定EventLoop对象和初始化ChannelPipeline,也就是构造出Channel数据结构
public abstract class AbstractChannel implements Channel {
/**
* 父通道
*/
private final Channel parent;
/**
* 绑定的事件循环器
*/
private volatile EventLoop eventLoop;
/**
* 管道
*/
private final ChannelPipeline pipeline;
public AbstractChannel(Channel parent) {
this.parent = parent;
pipeline = newChannelPipeline();
}
protected ChannelPipeline newChannelPipeline() {
return new ChannelPipeline(this);
}
/**
* 返回绑定的事件处理器
*
* @return
*/
@Override
public EventLoop eventLoop() {
return eventLoop;
}
@Override
public ChannelPipeline pipeline() {
return pipeline;
}
@Override
public void beginRead() {
doBeginRead();
}
protected abstract void doBeginRead();
@Override
public void bind(SocketAddress localAddress) {
try {
doBind(localAddress);
} catch (Exception e) {
e.printStackTrace();
}
}
protected abstract void doBind(SocketAddress localAddress) throws Exception;
protected void doRegister() throws Exception {
// NOOP
}
/**
* Channel绑定eventLoop
*
* @param eventLoop
*/
@Override
public final void register(EventLoop eventLoop) {
// 省去乱七八遭的判断,源码实在内部类Unsafe下,所以是:AbstractChannel.this
AbstractChannel.this.eventLoop = eventLoop;
eventLoop.execute(() -> {
register0();
});
}
/**
* 实际注册
*/
private void register0() {
try {
doRegister();
// 开始读取感兴趣事件
beginRead();
} catch (Exception e) {
e.printStackTrace();
}
}
}
通过new ChannelPipeline(this),初始化了一个channel下的空管道,register方法只是存储EventLoop对象,而实际的doRegister抽象处理,因为不同的channel注册方式不一样,NIO是注册到多路复用器,其它的注册方式并非如此
AbstractNioChannel
这个就是专门处理NIO的抽象channel了,就可以实际去实现NIO的注册了
public abstract class AbstractNioChannel extends AbstractChannel {
/**
* java的channel 包括ServerSocketChannel和SocketChannel
*/
private final SelectableChannel ch;
private SelectionKey selectionKey;
/**
* 感兴趣的事件
*/
protected final int readInterestOp;
public AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 返回java的channel
*
* @return
*/
protected SelectableChannel javaChannel() {
return ch;
}
/**
* 把通道注册到多路复用器
* @throws Exception
*/
@Override
protected void doRegister() throws Exception {
// 最后一个字段this,相当于selectionKey.attach(this),后续可以通过attachment()方法取到
// 由于多个channel注册到一个eventLoop,所以需要传递当前的channel以便eventLoop获取到事件时可以知道是哪个channel产生的事件
selectionKey = javaChannel().register(((NioEventLoop) eventLoop()).unwrappedSelector(), 0, this);
}
/**
* 注册感兴趣事件
*/
@Override
protected void doBeginRead() {
selectionKey.interestOps(readInterestOp);
}
/**
* 从 {@link SelectableChannel} 读取事件,源码是写在Unsafe里
*/
public abstract void read();
}
存储了SelectableChannel,即ServerSocketChannel和SocketChannel的共同父类
实现了doRegister方法,即把jdk的channel注册到EventLoop下面的多路复用器
实现了doBeginRead方法,即注册感兴趣事件
抽象了一个read方法,即从channel读取信息,由于客户端与服务端读取方法不一样,所以抽象出来
而客户端与服务端的read实现也是分别抽象了两个类来提交给channel的handler,即AbstractNioMessageChannel和AbstractNioByteChannel
AbstractNioMessageChannel
主要封装了通过抽象doReadMessages读取事件信息后传递给channel的管道
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
/**
* 读取到的缓存
*/
private final List<Object> readBuf = new ArrayList<Object>();
public AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
/**
* 从SelectableChannel中读取信息
*/
@Override
public void read() {
final ChannelPipeline pipeline = pipeline();
// 实际读取信息,由子类实现
doReadMessages(readBuf);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
// 调用管道的read处理器
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
}
protected abstract int doReadMessages(List<Object> buf);
}
其中doReadMessages实际读取信息,看一下子类NioServerSocketChannel如何实现
NioServerSocketChannel
NIO服务端通道,实现了doReadMessages,即读取客户端channel
public class NioServerSocketChannel extends AbstractNioMessageChannel {
/**
* 多路复用器提供者
*/
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
/**
* 开启一个 java ServerSocketChannel
* @param provider
* @return
*/
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
return null;
}
}
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
}
/**
* 覆盖,因为可以确定返回的是ServerSocketChannel
* @return
*/
@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
/**
* 绑定端socket
* @param localAddress
* @throws Exception
*/
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress);
}
/**
* 读取信息,作为SeverSocketChannel(服务端通道),读取信息即accept后的SocketChannel(客户端通道)
* @param buf
* @return
*/
@Override
protected int doReadMessages(List<Object> buf) {
SocketChannel ch = null;
try {
ch = javaChannel().accept();
} catch (IOException e) {
}
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
return 0;
}
}
同时封装了ServerSocketChannel的创建,并和感兴趣SelectionKey.OP_ACCEPT
的事件传递给父类,实现了绑定端口javaChannel().socket().bind(localAddress)
AbstractNioByteChannel
与AbstractNioMessageChannel对应,是读取byte也就是字节并传递给channel的管道,实际的读取还是抽象的doReadBytes
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
public AbstractNioByteChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
@Override
public void read() {
final ChannelPipeline pipeline = pipeline();
// 这里做了简化处理,源码用的自封装ByteBuf
ByteBuffer byteBuf = ByteBuffer.allocate(128);
doReadBytes(byteBuf);
pipeline.fireChannelRead(byteBuf);
}
protected abstract int doReadBytes(ByteBuffer buf);
}
源码使用自己封装的ByteBuf,这里简化了,看一下它的子类即NioSocketChannel
NioSocketChannel
客户端的channel
public class NioSocketChannel extends AbstractNioByteChannel {
/**
* 多路复用器提供者
*/
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
/**
* 开启一个 java SocketChannel 这个方法为客户端创建通道使用
*
* @param provider
* @return
*/
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
return null;
}
}
public NioSocketChannel() {
this(null, newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* SocketChannel 感兴趣的事件是READ
*
* @param parent
* @param channel
*/
public NioSocketChannel(Channel parent, SocketChannel channel) {
super(parent, channel, SelectionKey.OP_READ);
}
/**
* 覆盖,因为可以确定返回的是SocketChannel
*
* @return
*/
@Override
protected SocketChannel javaChannel() {
return (SocketChannel) super.javaChannel();
}
/**
* 绑定端socket
*
* @param localAddress
* @throws Exception
*/
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress);
}
@Override
protected int doReadBytes(ByteBuffer buf) {
try {
return javaChannel().read(buf);
} catch (IOException e) {
throw new RuntimeException();
}
}
}
这个和Server端差不多,只不过一个读channel,一个读字节。感兴趣的事件是SelectionKey.OP_READ
到此channel相关类写完~
EventLoop事件循环器
EventLoopGroup
事件循环器组,和EventLoop提供一样的功能,同时可以选择下一个EventLoop且可以迭代
public interface EventLoopGroup extends Executor, Iterable<EventLoop>{
void register(Channel channel);
EventLoop next();
@Override
Iterator<EventLoop> iterator();
}
EventLoop
继承EventLoopGroup,并且可以查到父Group
public interface EventLoop extends EventLoopGroup {
EventLoopGroup parent();
}
SingleThreadEventExecutor
本来打算把EventExecutor去掉,代码写在EventLoop,但这个类太重要了,所以保留了下来
public abstract class SingleThreadEventExecutor implements Executor {
/**
* 默认任务列表长度
*/
protected static final int DEFAULT_MAX_PENDING_TASKS = 16;
/**
* 待完成的任务
*/
private final Queue<Runnable> taskQueue;
/**
* 实际工作者
*/
private final Executor executor;
/**
* 当前运行线程
*/
private volatile Thread thread;
/**
* ST_NOT_STARTED: 未启动, ST_STARTED 已启动
*/
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
/**
* 标记是否启动
*/
private volatile int state = ST_NOT_STARTED;
/**
* 原子启动标记更新器
*/
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
public SingleThreadEventExecutor(Executor executor) {
taskQueue = newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
this.executor = executor;
}
/**
* 初始化一个新的任务对列
* @param maxPendingTasks
* @return
*/
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<>(maxPendingTasks);
}
/**
* 添加任务
* @param task
*/
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!taskQueue.offer(task)) {
throw new RejectedExecutionException("event executor terminated");
}
}
/**
* 检查是否有任务
*
* @return
*/
protected boolean hasTasks() {
return !taskQueue.isEmpty();
}
/**
* 运行所有任务
* @return
*/
protected boolean runAllTasks() {
// 省略乱七八糟的判断,把多个子方法简化
Runnable task = taskQueue.poll();
if (task == null) {
return false;
}
for (;;) {
task.run();
task = taskQueue.poll();
if (task == null) {
return true;
}
}
}
@Override
public void execute(Runnable task) {
addTask(task);
startThread();
}
/**
* 开启线程执行(判断已启动过不再启动)
*/
private void startThread() {
// 未启动才能启动,也就是只启动一次
if (state == ST_NOT_STARTED) {
// 再次CAS判断避免线程不安全
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
/**
* 实际开启线程执行run方法
*/
private void doStartThread() {
// 使用真实的执行者执行任务
executor.execute(() -> {
// 保存执行的 线程
thread = Thread.currentThread();
// 省去乱起八遭的判断
SingleThreadEventExecutor.this.run();
// 如果执行结束,则报错
System.out.println("Buggy EventExecutor implementation; SingleThreadEventExecutor.confirmShutdown() must be called before run() implementation terminates");
});
}
/**
* 抽象run方法,是一个不能运行结束的方法(除非手动关闭),即loop
*/
protected abstract void run();
}
可以这么概括它:SingleThreadEventExecutor是一种特殊的任务执行器,第一次收到任务它就会启动(开启一个线程运行run方法,run方法是一个会一直执行的方法,否则报错),当任务来时并不是立即执行,它们会加入到自己的任务对列中,并且按自己的套路(run)去执行
SingleThreadEventLoop
单线程事件循环器
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
private final Collection<EventLoop> selfCollection = Collections.<EventLoop>singleton(this);
private final EventLoopGroup parent;
public SingleThreadEventLoop(EventLoopGroup parent, Executor executor) {
super(executor);
this.parent = parent;
}
@Override
public EventLoopGroup parent() {
return parent;
}
@Override
public void register(Channel channel) {
// netty源码 promise.channel().unsafe().register(this, promise); 简化不区分unsafe,如下
channel.register(this);
}
@Override
public EventLoop next() {
return this;
}
@Override
public Iterator<EventLoop> iterator() {
return selfCollection.iterator();
}
}
这个类其实就是实现了parent(),和next() ,iterator()这种应对Group接口的方法,之所以敢称单线程,是因为他爹是单线程处理器SingleThreadEventExecutor,它的存在就是为了让子类不用再处理与Group的关系
NioEventLoop
重点来了,Nio版的事件事件循环器
public class NioEventLoop extends SingleThreadEventLoop {
private Selector selector;
private final SelectorProvider provider;
public NioEventLoop(NioEventLoopGroup parent, SelectorProvider selectorProvider, Executor executor) {
super(parent, executor);
this.provider = selectorProvider;
this.selector = openSelector();
}
public SelectorProvider selectorProvider() {
return provider;
}
private Selector openSelector() {
try {
return selectorProvider().openSelector();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
/**
* 由于没做装饰,所以selector即unwrappedSelector
*
* @return
*/
public Selector unwrappedSelector() {
return selector;
}
/**
* 运行
*/
@Override
protected void run() {
for (;;) {
try {
select();
} catch (IOException e) {
e.printStackTrace();
}
try {
processSelectedKeys();
} finally {
runAllTasks();
}
}
}
private void select() throws IOException {
// 拿到多路复用器
Selector selector = this.selector;
for (;;) {
// 等待,简化固定1秒
int selectedKeys = selector.select(1000);
// 如果有事件发生或当前有任务跳出循环
if (selectedKeys != 0 || hasTasks()) {
break;
}
}
}
private void processSelectedKeys() {
processSelectedKeysPlain(selector.selectedKeys());
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
// 获取注册时绑定的参数
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
// 由于手写简版只attach了AbstractNioChannel所以不会出现,但源码有其它的attach
}
if (!i.hasNext()) {
break;
}
}
}
/**
* 处理单个事件
* @param k
* @param ch
*/
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 这个try是为了和源码尽量长得像,简版不处理异常
try {
// 获取发生的事件标识
int readyOps = k.readyOps();
// 如果是read事件 或 accpet事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// channel读取
ch.read();
}
} catch (Exception e) {
}
}
}
内部携带了一个多路复用器,作为一个SingleThreadEventExecutor,它的运行套路是不断的监听selector,如果任务队列有任务,就处理任务,这里我简化了代码,监听1秒再去查看是否有任务,没有再回来监听,源码是有个策略判断该执行任务还是该阻塞到selector上
MultithreadEventLoopGroup
它存在的意义和SingleThreadEventLoop差不多,帮助子类处理group和成员的关系,实现了next方法(这里我简化了用轮训,源码可以自定义chooser)
public abstract class MultithreadEventLoopGroup implements EventLoopGroup {
private final EventLoop[] children;
/**
* 为了迭代用
*/
private final Set<EventLoop> readonlyChildren;
public MultithreadEventLoopGroup(int nThreads, Executor executor) {
if (nThreads <= 0) {
throw new IllegalArgumentException();
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(new DefaultThreadFactory());
}
this.children = new EventLoop[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor);
}
/**
* 为了迭代用
*/
Set<EventLoop> childrenSet = new LinkedHashSet<EventLoop>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
protected abstract EventLoop newChild(Executor executor);
/**
* 源码用一个chooser对象选择子线程,这里简化一下,就轮训吧
* @return
*/
int i=0;
public EventLoop chooseNext() {
if (i>=children.length) {
i =0;
}
EventLoop child = children[i];
i++;
return child;
}
@Override
public void register(Channel channel) {
next().register(channel);
}
@Override
public EventLoop next() {
return chooseNext();
}
@Override
public Iterator<EventLoop> iterator() {
return readonlyChildren.iterator();
}
@Override
public void execute(Runnable command) {
next().execute(command);
}
}
抽象了newChild交给子类去实际创建组成员
NioEventLoopGroup
NioEventLoop的组,实现了newChild创建组成员即:NioEventLoop
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup(int nThreads) {
super(nThreads, null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
super(nThreads, executor);
}
@Override
protected EventLoop newChild(Executor executor) {
return new NioEventLoop(this, SelectorProvider.provider(), executor);
}
}
ThreadPerTaskExecutor 和 DefaultThreadFactory
这俩的存在主要是给提供真实线程,并统一命名
public class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
public class DefaultThreadFactory implements ThreadFactory {
private AtomicInteger no = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "nio-thread-"+(no.incrementAndGet()));
}
}
Bootstrap
channel和eventLoop都定义完了,接下来就要给他们串联起来,做启动类了,由于有服务端和客户端两种启动类,所以还是抽象了一个Bootstrap
AbstractBootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> {
volatile EventLoopGroup group;
/**
* 源码使用工厂模式存储是channelFactory,这里简化处理
*/
private volatile Channel channel;
AbstractBootstrap() {
// Disallow extending from a different package.
}
private B self() {
return (B) this;
}
public B group(EventLoopGroup group) {
this.group = group;
return self();
}
public B channel(Class<? extends C> channelClass) {
try {
channel = channelClass.getConstructor().newInstance();
} catch (Exception e) {
}
return self();
}
public void bind(int inetPort) {
doBind(new InetSocketAddress(inetPort));
}
private void doBind(final SocketAddress localAddress) {
initAndRegister();
// 让channel绑定的线程去实际绑定
channel.eventLoop().execute(()->{
channel.bind(localAddress);
});
}
final void initAndRegister() {
init(channel);
group.register(channel);
}
abstract void init(Channel channel);
}
抽象了一个channel init方法,如果前面看懂了,这里应该能猜到主要是为了给ServerChannel添加默认的handler
ServerBootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, NioServerSocketChannel> {
private volatile EventLoopGroup childGroup;
/**
* 这里简化处理
*/
private volatile ChannelHandler[] childHandlers;
public ServerBootstrap() { }
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
this.childGroup = childGroup;
return this;
}
public ServerBootstrap childHandler(ChannelHandler... childHandlers) {
this.childHandlers = childHandlers;
return this;
}
@Override
void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
// 这里给ServerChannel添加管道处理器,简化了代码
p.addLast(new ServerBootstrapAcceptor(childGroup, childHandlers));
}
private static class ServerBootstrapAcceptor implements ChannelInboundHandler {
private final EventLoopGroup childGroup;
private final ChannelHandler[] childHandlers;
private ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler[] childHandlers) {
this.childGroup = childGroup;
this.childHandlers = childHandlers;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final Channel child = (Channel) msg;
for (ChannelHandler childHandler : childHandlers) {
child.pipeline().addLast(childHandler);
}
childGroup.register(child);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 略
}
}
}
子类ServerBootstrapAcceptor 就是ServerSocketChannel默认的handler,通过init方法添加上
总结
到此,山寨版的netty写完了,累了一头汗,代码的命名和类命名尽量和源码保持一致,因为都是复制过来的,有些很复杂的地方做了简化处理,但个人感觉核心的代码除了bytebuf应该都写上了,使用方式开头有写基本和netty差不多,还是那句话,手写的目的是理解netty源码
篇幅有限,这里面的事好多代码都没细讲,但代码也摘的很轻量,完全可以自行理解,回头再看netty,基本就差不多
回想一下,其实netty的核心概念就总结出来了:channel维护了jdk的通道,并可以设置后置处理器实现,EventLoop是针对channel事件的专用线程,而EventLoopGroup是它们组合即专用线程池
网友评论