简介
Echoes back any received data from a client,本编文章会分析一下server端代码的启动流程,梳理一下自己的理解。
代码分析
public static void main(String[] args) throws Exception {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(@1
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the default event pipeline.
EchoHandler handler = new EchoHandler();@2
bootstrap.getPipeline().addLast("handler", handler);@3
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080));@4
// Start performance monitor.
new ThroughputMonitor(handler).start();
}
@1:处其实就是构建一个BootStrap对象,同时在构造方法里面初始化一些必要的参数,发现很多开元框架都喜欢在构造方法里面去做一些初始化操作。分别指定用来执行boss threads和I/O worker threads的Executor。
@2:构建一个Handler对象,可以是自定义的,用来处理具体的业务。
@3:构建一个DefaultChannelPipeline对象,并把涉及到的Handler加入到上面。
@4:绑定端口,开始监听客户端发过来的请求。
代码细节
@1:我们可以进入到NioServerSocketChannelFactory构造函数里面:
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
if (bossExecutor == null) {
throw new NullPointerException("bossExecutor");
}
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " +
"must be a positive integer.");
}
this.bossExecutor = bossExecutor;@5
this.workerExecutor = workerExecutor;@6
sink = new NioServerSocketPipelineSink(workerExecutor, workerCount);@7
}
NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {@8
workers = new NioWorker[workerCount];
for (int i = 0; i < workers.length; i ++)
workers[i] = new NioWorker(id, i + 1, workerExecutor);
}
}
@5和@6 分别是把CachedThreadPool赋值给final类型的bossExecutor 和workerExecutor ,一般发现源码里面有很多对象都会定义成final的,其实这有利于程序的编译,明确指出一些不会发生变化的变量和对象,同时可以告诉别人这个对象是线程安全的,不可以修改等等。
@7:进入到NioServerSocketPipelineSink代码里面看下,这个方法主要做的工作是:初始化NioWorker数组并赋值。@8处:默认工作线程数组的大小是当前点电脑的cpu核数。你也可以自定义线程池,自定义各个参数。
我们可也进入到@4的代码:如下核心方法bind,它主要是用来创建一个和指定地址绑定的channel。
public Channel bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();
ChannelPipeline bossPipeline = pipeline();
bossPipeline.addLast("binder", new Binder(localAddress, futureQueue));
ChannelHandler parentHandler = getParentHandler();
if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
}
Channel channel = getFactory().newChannel(bossPipeline);@9
// Wait until the future is available.
ChannelFuture future = null;
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Ignore
}
} while (future == null);
// Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getChannel().close().awaitUninterruptibly();
throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
}
return channel;
}
重点来看下@9处的代码:主要是create和open一个channel,并且附加上指定的ChannelPipeline。它是ChannelFactory的方法,进入到对应的实现类:NioServerSocketChannelFactory:
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return new NioServerSocketChannel(this, pipeline, sink);@10
}
sink参数是在上一步初始化过的,进入到NioServerSocketChannel构造方法里面:
NioServerSocketChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
super(factory, pipeline, sink);
try {
socket = ServerSocketChannel.open();@11
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
try {
socket.configureBlocking(false);
} catch (IOException e) {
try {
socket.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
config = new DefaultServerSocketChannelConfig(socket.socket());
fireChannelOpen(this);@12
}
感觉这个方法才是重点,点击进去@11处的代码:
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
provider方法返回的是此Java虚拟机调用的系统范围的默认SelectorProvider ,这一点和操作系统有点关系。
然后进入到@12处的代码如下:
public static void fireChannelOpen(Channel channel) {
// Notify the parent handler.
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));
}
这个方法主要是发送一个channelOpen event到当前channel关联的pipeline的handler上面,然后进行对应的业务处理,这个启动服务类上面绑定了两个handler,一个Binder,一个EchoHandler。重点是pipeline的sendUpstream方法,里面引出来的东西有很多,放到下一个文章里面分析。
总结
工匠精神......我来了......不要老羡慕别人是大佬,其实你自己很快也将成为大佬.....
网友评论