代码分析
接着上次代码继续梳理,上一次分析到如下这个方法:
public static void fireChannelOpen(Channel channel) {
// Notify the parent handler.
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}
channel.getPipeline().sendUpstream(@1
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));
}
@1处的代码其实就是在展现着事件将在UpstreamHandler里面按照顺序开始流动,从head开始一直到tail...
* <pre>
*
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------+--------------------+
* | ChannelPipeline | |
* | \|/ |
* | +----------------------+ +-----------+------------+ |
* | LAST | Upstream Handler N | | Downstream Handler M | LAST |
* | . +----------+-----------+ +-----------+------------+ . |
* | . /|\ | . |
* | . | \|/ . |
* | . +----------+-----------+ +-----------+------------+ . |
* | . | Upstream Handler N-1 | | Downstream Handler M-1 | . |
* | . +----------+-----------+ +-----------+------------+ . |
* | . /|\ . . |
* | . . . . |
* | . [ Going UPSTREAM ] [ Going DOWNSTREAM ] . |
* | . . . . |
* | . . \|/ . |
* | . +----------+-----------+ +-----------+------------+ . |
* | . | Upstream Handler 2 | | Downstream Handler 2 | . |
* | . +----------+-----------+ +-----------+------------+ . |
* | . /|\ | . |
* | . | \|/ . |
* | . +----------+-----------+ +-----------+------------+ . |
* | FIRST | Upstream Handler 1 | | Downstream Handler 1 | FIRST |
* | +----------+-----------+ +-----------+------------+ |
* | /|\ | |
* +------------------+--------------------------+--------------------+
* | \|/
* +------------------+--------------------------+--------------------+
* | I/O Threads (Transport Implementation) |
* +------------------------------------------------------------------+
进入到Pipeline的sendUpstream方法如下:
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);@2
} catch (Throwable t) {
notifyHandlerException(e, t);@3
}
}
@2就是从DefaultChannelHandlerContext 上面获取与之关联的handler,然后调用对应的handleUpstream方法,就当前这个例子来说,第一个UpstreamHandler是EchoHandler,第二个是Binder,所以首先调用EchoHandler的handleUpstream方法,由于EchoHandler继承的是SimpleChannelUpstreamHandler,所以也就是调用SimpleChannelUpstreamHandler的handleUpstream方法,进入到handleUpstream方法内,如下:
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof MessageEvent) {
messageReceived(ctx, (MessageEvent) e);
} else if (e instanceof WriteCompletionEvent) {
WriteCompletionEvent evt = (WriteCompletionEvent) e;
writeComplete(ctx, evt);
} else if (e instanceof ChildChannelStateEvent) {
ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
if (evt.getChildChannel().isOpen()) {
childChannelOpen(ctx, evt);
} else {
childChannelClosed(ctx, evt);
}
} else if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN:
if (Boolean.TRUE.equals(evt.getValue())) {@4
channelOpen(ctx, evt);
} else {
channelClosed(ctx, evt);
}
break;
case BOUND:
if (evt.getValue() != null) {
channelBound(ctx, evt);
} else {
channelUnbound(ctx, evt);
}
break;
case CONNECTED:
if (evt.getValue() != null) {
channelConnected(ctx, evt);
} else {
channelDisconnected(ctx, evt);
}
break;
case INTEREST_OPS:
channelInterestChanged(ctx, evt);
break;
default:
ctx.sendDownstream(e);
}
} else if (e instanceof ExceptionEvent) {
exceptionCaught(ctx, (ExceptionEvent) e);
} else {
ctx.sendUpstream(e);
}
}
走的是@4分支,继续进去:
public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);@5
if (next != null) {
DefaultChannelPipeline.this.sendUpstream(next, e);@6
}
}
@5获取当前Pipeline的下一个handler也就是Binder,然后继续执行sendUpstream方法,当前的state仍是open,会调用到Binder的channelOpen方法,代码如下:
@Override
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Split options into two categories: parent and child.
Map<String, Object> allOptions = getOptions();
Map<String, Object> parentOptions = new HashMap<String, Object>();
for (Entry<String, Object> e: allOptions.entrySet()) {
if (e.getKey().startsWith("child.")) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!e.getKey().equals("pipelineFactory")) {
parentOptions.put(e.getKey(), e.getValue());
}
}
// Apply parent options.
evt.getChannel().getConfig().setOptions(parentOptions);
} finally {
ctx.sendUpstream(evt);@7
}
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));@8
assert finished;
}
@7处的代码是继续传递这个事件,@8处的代码就是把open的channel绑定到指定的url,然后把结果放到队列里面。进入到bind方法里面如下,这个方法主要是把bind这个事件发送给ChannelDownstreamHandler处理:
public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(@9
channel, future, ChannelState.BOUND, localAddress));
return future;
}
进入到@9sendDownstream方法里面如下:
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);@10
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
当前实例是没有DownstreamHandler的,所以代码会执行@10这里,然后进入到eventSunk和handleServerSocket方法:
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);@11
} else {
close(channel, future);
}
break;
}
}
分支会走到@11分支,当前代码还在NioServerSocketPipelineSink类里面,然后进入到@11的方法如下:
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean bossStarted = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();@12
fireChannelBound(channel, channel.getLocalAddress());@13
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
bossExecutor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new Boss(channel),@14
"New I/O server boss #" + id +
" (channelId: " + channel.getId() +
", " + channel.getLocalAddress() + ')')));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
}
}
@12处代码:Marks this future as a success and notifies all listeners
@14处代码最终会运行Boss线程里面的run方法,内容如下:
public void run() {
final Thread currentThread = Thread.currentThread();
for (;;) {
try {
if (selector.select(1000) > 0) {
selector.selectedKeys().clear();
}
SocketChannel acceptedSocket = channel.socket.accept();@15
if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket, currentThread);@16
}
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
break;
} catch (IOException e) {
logger.warn(
"Failed to accept a connection.", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
closeSelector();
}
@15用来接收客户端来的请求,转换成代码就是一个SocketChannel 。
进入@16registerAcceptedChannel的代码,内容如下:
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
try {
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker();@17
worker.register(new NioAcceptedSocketChannel(@18
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
logger.warn(
"Failed to initialize an accepted socket.", e);
try {
acceptedSocket.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
@17处的代码是根据一定的规则从NioWorker数组里面获取一个线程,来处理当前请求。
@18进入register的代码里面:
void register(NioSocketChannel channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
Runnable registerTask = new RegisterTask(channel, future, server);
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();@19
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
String threadName =
(server ? "New I/O server worker #"
: "New I/O client worker #") + bossId + '-' + id;
boolean success = false;
try {
executor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(this, threadName)));@20
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
@19说明:Open a selector if this worker didn't start yet
@20:会执行this线程的run方法,也就是NioWorker的run方法,内容如下:
public void run() {
thread = Thread.currentThread();
boolean shutdown = false;
Selector selector = this.selector;
for (;;) {
wakenUp.set(false);
if (CONSTRAINT_LEVEL != 0) {
selectorGuard.writeLock().lock();
// This empty synchronization block prevents the selector
// from acquiring its lock.
selectorGuard.writeLock().unlock();
}
try {
int selectedKeyCount = selector.select(500);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
processRegisterTaskQueue();@21
processWriteTaskQueue();@22
if (selectedKeyCount > 0) {
processSelectedKeys(selector.selectedKeys());@23
}
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
started = false;
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
} finally {
this.selector = null;
}
break;
} else {
shutdown = false;
}
}
} else {
// Give one more second.
shutdown = true;
}
} else {
shutdown = false;
}
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
@21和@22处理Queue里面的任务,@23处理具体的读或者写请求,也就是processSelectedKeys这个方法的内容如下:
private static void processSelectedKeys(Set<SelectionKey> selectedKeys) {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0) {
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
write(k);
}
} catch (CancelledKeyException e) {
close(k);
}
}
}
processSelectedKeys这个方法里面主要是遍历SelectionKey,然后处理具体的read或者write请求,东西还不少,放在下一篇内容里面继续分析......
总结:
感觉看是一个简单的东西,其实没那么简单,底层做了太多太多的东西,设计者的思想都在底层,任何看起来简单的东西都不会太简单的,慢慢体会吧。
网友评论