1.新建bootstrap(用户调用层)
//serverbootstrap初始化什么也不做
public ServerBootstrap() {
}
2.为Bootstrap新增组件(用户调用层)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new AuthHandler());
}
});
3.正式启动(进入源码):
ChannelFuture f = b.bind(8888).sync();
- 四大步骤启动:
- 创建服务端channel
- 初始化服务端channel
- 注册selector
- 端口绑定
对应代码调用:
- AbstractBootstrap bind()
- initAndRegister()->channelFactory.newChannel();//制造一个channel
- initAndRegister()->init(channel);//初始化channel
- initAndRegister()->register//注册selector
- dobind()//绑定端口
3.1创建服务端channel
通过绑定.class,工厂反射创建channel;
AbstractBootstrap.java
public ChannelFuture bind(int inetPort) {
return this.bind(new InetSocketAddress(inetPort));//step:1
}
...
public ChannelFuture bind(SocketAddress localAddress) {
this.validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
} else {
return this.doBind(localAddress);//step:2
}
}
...
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = this.initAndRegister();//step:3 //...
}
...
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = this.channelFactory.newChannel();//3.1内容制造一个channel
this.init(channel);// 3.2内容:初始化
}
//...
}
可以看到通过channelFactoryinitAndRegister()
制造了一个channel,channelFactory的产生代码
ReflectiveChannelFactory.java
private final Class<? extends T> clazz;
...
public T newChannel() {
try {
return (Channel)this.clazz.newInstance();//反射机制
}
//...
}
这里显然是在之前已经绑定好了工厂生产的产品,是在用户自己初始化的时候绑定的
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.....
是在AbstractBootstrap.java 中这个函数创建了上面的工厂,并对channel类型绑定
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
} else {
return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
}
}
3.1.2 NioServerSocketChannel实例化的过程
1.反射的东东看完了,来看下channel是怎么new 出来的:
NioServerSocketChannel.java
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
//...
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
}
//...
}
而其中的这个provider是
NioServerSocketChannel.java
import java.nio.channels.spi.SelectorProvider;
class NioServerSocketChannel{
//...
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
//...
}
2.使channel非阻塞
NioServerSocketChannel.java
public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
super((Channel)null, channel, 16);// step1:调用父类AbstractNioMessageChannel
//...
}
上述代码调用AbstractNioMessageChannel的构造函数,而后者也是直接调用AbstractNioChannel的构造函数,对于16
这个参数:应该是TCP的Accept
属性,将这个服务端socket设为接收请求的socket;
AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
//...
try {
ch.configureBlocking(false);//step2:这一部将jdk底层调用产生的channel变成非阻塞
}
//...
}
3.创建channel有关的的id,unsafe,pipline等
在2的最后一步try
非阻塞前,AbstrctNioChannel调用其父类AbstrctChannel
AbstrctChannel.java
protected AbstractChannel(Channel parent) {
this.parent = parent;
this.id = this.newId(); //channel唯一id
this.unsafe = this.newUnsafe();//tcp读写底层类
this.pipeline = this.newChannelPipeline();//pipline创建
}
4.然后回到NioServerSocketChannel,TCP参数配置存放在这个NioServerSocketChannelConfig
里面
public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
//...
this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(
this, this.javaChannel().socket()
);
}
3.2初始化服务端channel
在init函数里面有四个步骤:
3.2.1 配置自身channel的option
void init(Channel channel) throws Exception {
Map<ChannelOption<?>, Object> options = this.options0();
synchronized(options) {
channel.config().setOptions(options);// 1
}
Map<AttributeKey<?>, Object> attrs = this.attrs0();//2
//...
}
这一块的option默认是空的,而设置的地方就是在下图中最开始bootstrap的那一堆链式调用初始化里面自己指定,然后在上面代码中options()
调用获取
同理,也会顺着将创建channel前就设置的attribute正式绑定到channel(如上面代码
//2
)
3.2.2配置客户端产生连接后的childChannel的option
这一步类似上一步,获取到childChannel的option和attribute,但这里是暂存,暂存原因在3.2.4解释:
void init(Channel channel) throws Exception {
//...3.2.1部分...
synchronized(this.childOptions) {
currentChildOptions = (Entry[])this.childOptions
.entrySet()
.toArray(newOptionArray(this.childOptions.size()));
}
final Entry[] currentChildAttrs;
synchronized(this.childAttrs) {
currentChildAttrs = (Entry[])this
.childAttrs
.entrySet()
.toArray(newAttrArray(this.childAttrs.size()));
}
3.2.3配置服务端的pipline
3.1中看到服务端的serversocketchannel绑定了一个Pipline,这里主要在pipline里面添加了一个channelhandler
这个handler也是用户在bootstrap一堆链式调用里面设置的,没有就算了
void init(Channel channel) throws Exception {
//...3.2.1 3.2.2部分...
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = ServerBootstrap.this.config.handler();
if (handler != null) {
pipeline.addLast(new ChannelHandler[]{handler});
}
//...3.2.4
}
3.2.4添加ServerBootstrapAcceptor用于把用户连接分配给线程
这里是个核心逻辑,本来就是一个普通的Nio封装,但是这一步将Acceptor作为了handller放在了最前面;
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
public void initChannel(Channel ch) throws Exception {
//... 3.2.3 ...内容
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ChannelHandler[]{
new ServerBootstrap
.ServerBootstrapAcceptor(currentChildGroup // here
,currentChildHandler //之前暂存的用户配置的handler放在这个后面
,currentChildOptions, //之前暂存的用户配置的op放在这个后面
currentChildAttrs)}); //之前暂存的用户配置的attr放在这个后面
}
});
}
}});
至此,已经完成了启动过程的2/4步;
3.3 注册selector
同样在abstractBootStrap.java的initAndRegister()
,完成上述两步之后,有一个函数
ChannelFuture regFuture = this.config().group().register(channel);
这里调用了AbstractChannel.register(channel)
,又分为几步
- this.eventloop=eventloop 绑定线程
- regitster0() 核心注册逻辑
- doRegister()调用jdk把jdk原生channel注册到selector
- 回调a:invokeHandlerAddedifNeed() 触发用户在Handler中编写的,channel注册的时候回调函数
- 回调b:fireChannelRegister() 将注册成功的这个事件通知listener,触发用户在Handler中编写的的回调
- 回调c:fireChannelActive();同上,都是用户写的回调触发
3.3.1绑定eventLoop
AbstractChannel.java
channel绑定eventLoop后,其后续的事件都会在eventLoop中
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//....
AbstractChannel.this.eventLoop = eventLoop; //3.3.1
if (eventLoop.inEventLoop()) {
this.register0(promise); //3.3.2 a
} else {
try {
eventLoop.execute(new Runnable() {
public void run() {
AbstractUnsafe.this.register0(promise);//3.3.2 b
}
});
}
//....
3.3.2 注册selector
在《java读源码之netty》中,注册是执行的上述代码的3.3.2 a,然而自己调试的时候是在3.3.2 b处,看了下这个判断函数,是为了判断channel绑定的EventLoop启动线程与当前线程相同,相同表示已经启动,不同则有两种可能:未启动或者线程不同
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
因为这个监听的ServerSocketChannel是第一个channel,此时还没有已经开启运行的eventloop,这个时候就需要执行3.3.2b的启动了,eventLoop新开了一个线程,并且将channel在里面绑定;
但不管是3.2.2a 还是3.2.2b,最终都进入了——
private void register0(ChannelPromise promise) {
//...
AbstractChannel.this.doRegister(); //step 1
//...
AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();//step 2a
this.safeSetSuccess(promise);
AbstractChannel.this.pipeline.fireChannelRegistered();//step 2b
if (AbstractChannel.this.isActive()) {
if (firstRegistration) {
AbstractChannel.this.pipeline.fireChannelActive(); //step 2c
} else if (AbstractChannel.this.config().isAutoRead()) {
this.beginRead();
}
}
- step1 .doRegister()方法注册到selector
AbstractNioChannel.java
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
this.selectionKey = this.javaChannel()
.register(this.eventLoop().selector, 0, this); //here
return;
}
这里的this.javaChannel()
this指的是netty封装后的channel,通过.javaChannel获取到其内部jdk底层的channel。
.register( xx,xx,xx) 第一个参数就是正常的jdk底层selector,第二个参数是一些设置(这里没设置),this是指netty自己的封装的channel,作为一个attachment放进去,这样当channel事件发生,就能通过selector直接获取到netty封装好的channel。
-
step2
- 回调a 对应用户在Handlder中写的handler中的
@Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("handlerAdded"); }
- 回调b 对应用户在Handlder中写的handler中的
@Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("channelRegistered"); }
- 回调c 对应用户在Handlder中写的handler中的,但是这里并不是和step a b 紧接着调用,而是在active之后(第四步的端口绑定后)再次调用这个函数后才会active
@Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive"); }
3.4 端口绑定
前面三步都是通过bind()
调用doBind()
再调用的initAndRegister()
中实现,而端口绑定则是在doBind()
中调用dobind0()
实现,跳出了之前所关注的焦点initAndRegitser()
在AbstractBootstrap.java中的dobind0()
最终会调用AbstractChannel.java的Bind()
方法
public final void bind(SocketAddress localAddress, ChannelPromise promise) {
//...
try {
AbstractChannel.this.doBind(localAddress); //step 3.4.1
}
//...
if (!wasActive && AbstractChannel.this.isActive()) {
this.invokeLater(new Runnable() {
public void run() {
AbstractChannel.this.pipeline
.fireChannelActive();//step 3.4.2也就是在这调用用户的channelActive方法
}
});
}
- step 3.4.1 这一步最终调用的底层的channel的jdk原生bind绑定端口
- step 3.4.2 fireChannelActive()触发pipline上的一系列active回调,最后将端口设置为允许接收请求
网友评论