一、pipeline初始化
- Pipeline在创建Channel的时候被创建
调用newChannelPipeline()方法创建Channel对应的Pipeline,创建tail节点和head节点通过prev/next组成双向链表 - Pipeline节点数据结构:ChannelHandlerContext[存储自定义属性,Inbound/Outbound事件传播]
- Pipeline中的两大哨兵:head和tail
tail节点传播Inbound事件,用于异常未处理警告/Msg未处理建议处理收尾
head节点传播Outbound事件,用于传播事件/读写事件委托Unsafe操作
无论是服务端还是客户端Channel都会调用父类AbstractChannel构造方法
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
创建Pipeline时,创建head、tail两个ChannelHandlerContext,形成双向链表结构。
ChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
二、ChannelHandlerContext
ChannelHandlerContext .pngpublic interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
/**
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
*/
Channel channel();
/**
* Returns the {@link EventExecutor} which is used to execute an arbitrary task.
*/
EventExecutor executor();
/**
* The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
* was added to the {@link ChannelPipeline}. This name can also be used to access the registered
* {@link ChannelHandler} from the {@link ChannelPipeline}.
*/
String name();
/**
* The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.
*/
ChannelHandler handler();//ChannelHandler 处理方法
/**
* Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed
* from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the
* {@link EventLoop}.
*/
boolean isRemoved();
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();
/**
* Return the assigned {@link ChannelPipeline}
*/
ChannelPipeline pipeline();//当前pipeline
/**
* Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
*/
ByteBufAllocator alloc();
/**
* @deprecated Use {@link Channel#attr(AttributeKey)}
*/
@Deprecated
@Override
<T> Attribute<T> attr(AttributeKey<T> key);
/**
* @deprecated Use {@link Channel#hasAttr(AttributeKey)}
*/
@Deprecated
@Override
<T> boolean hasAttr(AttributeKey<T> key);
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
...
}
DefaultChannelHandlerContext
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;//判断isInbound
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;//判断isOutbound
}
}
TailContext,主要做收尾的工作,如异常没处理或者消息没处理进行警告
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);//inbound传true,outbound传false
setAddComplete();//设置为已经添加
}
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// This may not be a configuration error and so don't log anything.
// The event may be superfluous for the current pipeline configuration.
ReferenceCountUtil.release(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
}
protected void onUnhandledInboundException(Throwable cause) {
try {
//捕获未处理的异常信息
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);//进行释放,防止内存泄漏
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
//未处理的msg
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
HeadContext,主要进行事件传播、委托unsafe 进行读写操作。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);//inbound设置为false,outbound设置为true
unsafe = pipeline.channel().unsafe();//进行读写操作
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
}
三、添加ChannelHandler
添加ChannelHandler->addLast():
- 判断是否重复添加
通过checkMultiplicity()方法强制转换ChannelHandler判断是否可共享&是否被添加过 - 创建节点并添加至链表
通过filterName()检查重复名称调用newContext()封装ChannelHandler创建节点
调用addLast0()方法通过链表的方式添加到Channel的Pipeline - 回调添加完成事件
调用callHandlerAdded0()方法添加ChannelInitializer执行handlerAdded()方法回调自定义的添加完成事件删除自身节点
EventLoopGroup boss = new NioEventLoopGroup();//继承线程池ScheduledExecutorService
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);//利用反射构造NioServerSocketChannel实例
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new LoggingServerHandler());//handler与childHandler不同
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//ChannelInitializer,添加自定义hander
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyChannelHandler1());
ch.pipeline().addLast(new MyChannelHandler2());
ch.pipeline().addLast(new MyChannelHandler3());
}
});
ChannelFuture f = bootstrap.bind(port).sync();//bind方法实现
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
//启动成功
}
});
f.channel().closeFuture().sync();
addLast方法
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);//判断是否被重复添加
newCtx = newContext(group, filterName(name, handler), handler);//创建一个节点
addLast0(newCtx);//添加到链表
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {//如果不在当前EventLoop中,则添加到任务队列,异步执行
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);//如果在当前EventLoop中,执行执行
return this;
}
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {//非共享(@Sharable注解)并且已经添加过,则抛出重复添加的异常
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);//检查名字是否重复
return name;
}
private void checkDuplicateName(String name) {
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
//向双向链表中添加hander
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
//回调用户方法
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);//调用用户方法(服务端启动时ChannelInitializer中initChannel)
ctx.setAddComplete();
} catch (Throwable t) {
boolean removed = false;
try {
remove0(ctx);
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
//ChannelInitializer中handlerAdded方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
initChannel(ctx);
}
}
//ChannelInitializer中initChannel方法
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());//执行initChannel
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);//将自身节点删除
}
return true;
}
return false;
}
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
//自旋,compareAndSet设置handlerState
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
四、删除ChannelHander
删除ChannelHander的使用场景:
AuthHandler 权限控制demo
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf password) throws Exception {
if (paas(password)) {
ctx.pipeline().remove(this);//通过密码验证删除自身,否则关闭连接
} else {
ctx.close();
}
}
private boolean paas(ByteBuf password) {
return false;
}
}
删除ChannelHandler过程:
- 找到节点
通过getContextOrDie()方法使用context()方法从head的next遍历获取封装的ChannelHandlerContext节点 - 链表的删除
调用remove()方法删除节点使用remove0()方法获取当前节点的prev和next,prev的next置为当前节点的next,next的prev置为当前节点的prev - 回调删除Handler事件
调用callHandlerRemoved0()方法获取当前节点的ChannelHandler使用handlerRemove()方法回调删除Handler事件
#DefaultChannelPipeline类
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
#DefaultChannelPipeline类
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
#DefaultChannelPipeline类
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {//找到该hander节点
return ctx;
}
ctx = ctx.next;
}
}
//删除方法
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;//当前节点不是head也不是tail
synchronized (this) {
remove0(ctx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered.
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
callHandlerRemoved0(ctx);//回调删除hander方法
return ctx;
}
//双向链表的删除
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
try {
ctx.handler().handlerRemoved(ctx);//回调用户定义的删除ChannelHander事件
} finally {
ctx.setRemoved();
}
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
五、 inBound事件的传播
- 何为inBound事件以及ChannelInboundHandler
inBound事件包括Registered事件、Active事件、Read事件回调
ChannelInboundHandler添加到Pipeline通过instanceof判断当前ChannelHandler类型设置inBound为true标识为ChannelInboundHandler处理inBound事件 - ChannelRead事件的传播
按照添加ChannelRead事件顺序正序[添加ChannelInboundHandler顺序正序]传播[从head节点/当前节点事件传播],最终传播到tail节点释放对象 - SimpleInboundHandler处理器
channelRead()方法调用ReferenceCountUtil的release()方法自动释放对象
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
/**
* Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
/**
* Calls {@link ChannelHandlerContext#fireChannelReadComplete()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
/**
* Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
/**
* Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
/**
* Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}
测试demo
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerB: " + msg);
ctx.fireChannelRead(msg);//向后传播
}
@Override
public void channelActive(ChannelHandlerContext ctx) {//链接建立时
ctx.channel().pipeline().fireChannelRead("hello world");//触发fireChannelRead
}
}
ChannelInboundHandlerAdapter 执行顺序,按添加顺序
InBoundHandler测试输出.png
inBound事件传播.png
inBound事件传播.png
注意:
直接调用pipeline().fireChannelRead("hello world");是从head节点往后传播;
而ChannelInboundHandlerAdapter中channelRead调用ctx.fireChannelRead,是从当前节点往后传播。
#AbstractChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);//findContextInbound寻找next Inbound
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {//判断执行handler还是向后传递
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);//直到下一个inbound
return ctx;
}
SimpleChannelInboundHandler
在没有调用fireChannelRead继续往后传播直到tail节点时,需要手动释放资源。而SimpleChannelInboundHandler可以帮助自动释放。
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//
}
//在channelRead0中实现业务逻辑
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf password) throws Exception {
if (paas(password)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}
private boolean paas(ByteBuf password) {
return false;
}
}
SimpleChannelInboundHandler的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);//释放资源
}
}
}
六、outBound事件的传播
- 何为outBound事件以及ChannelOutboundHandler
outBound事件,主要是服务端主动向客户端发起,如[发起事件]包括Bind事件[端口绑定]、Connect事件[连接/断连]、Close事件[关闭]、取消注册事件、Write事件以及Flush事件回调
ChannelOutboundHandler添加到Pipeline通过instanceof判断当前ChannelHandler类型设置outBound为true标识为ChannelOutboundHandler处理outBound事件 - write()事件的传播
通过write()方法按照添加Write事件顺序进行倒序[添加ChannelOutboundHandler倒序倒序]传播,从tail节点事件传播[ChannelPipeline#write()]/从当前节点事件传播[ChannelHandlerContext#write()],最终传播到head节点调用Unsafe的write()方法
outBound.png
package io.netty.channel;
import java.net.SocketAddress;
/**
* {@link ChannelHandler} which will get notified for IO-outbound-operations.
*/
public interface ChannelOutboundHandler extends ChannelHandler {
/**
* Called once a bind operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the bind operation is made
* @param localAddress the {@link SocketAddress} to which it should bound
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a connect operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the connect operation is made
* @param remoteAddress the {@link SocketAddress} to which it should connect
* @param localAddress the {@link SocketAddress} which is used as source on connect
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a disconnect operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a close operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a deregister operation is made from the current registered {@link EventLoop}.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Intercepts {@link ChannelHandlerContext#read()}.
*/
void read(ChannelHandlerContext ctx) throws Exception;
/**
* Called once a write operation is made. The write operation will write the messages through the
* {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
* {@link Channel#flush()} is called
*
* @param ctx the {@link ChannelHandlerContext} for which the write operation is made
* @param msg the message to write
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
/**
* Called once a flush operation is made. The flush operation will try to flush out all previous written messages
* that are pending.
*
* @param ctx the {@link ChannelHandlerContext} for which the flush operation is made
* @throws Exception thrown if an error occurs
*/
void flush(ChannelHandlerContext ctx) throws Exception;
}
write事件传播
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerB: " + msg);
ctx.write(msg, promise);//从当前节点往前传播
}
@Override
public void handlerAdded(final ChannelHandlerContext ctx) {
ctx.executor().schedule(() -> {//定时写数据,模拟给客户端的相应
ctx.channel().write("hello world");//从tail节点往前传播
//ctx.write("hello world");//从当前节点往前传播
}, 3, TimeUnit.SECONDS);
}
}
注意:
ctx.channel().write("hello world");//从tail节点往前传播
ctx.write("hello world");//从当前节点往前传播
输出与添加hander的顺序相反。即outBound事件传播顺序和添加顺序相反。
outBound事件传播.png
outBound事件传播.png
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return pipeline.write(msg, promise);
}
#AbstractChannelHandlerContext类
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}
#AbstractChannelHandlerContext类
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();//向前寻找outbound
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;//向前寻找outbound
} while (!ctx.outbound);
return ctx;
}
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);//调用自定义hander的write方法
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
最终调用HeadContext的write,调用unsafe写数据给客户端
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
{
unsafe.write(msg, promise);
}
七、异常的传播
- 异常的触发链
调用channelRead()方法抛异常使用notifyHandlerException()方法发起Exception事件,通过fireExceptionCaught()方法按照添加事件顺序正序[添加ChannelHandler顺序正序]从当前节点触发传播到后置节点next,最终传播到tail节点调用onUnhandledInboundException()方法打印Pipeline未处理异常告警。
只按添加顺序,与是Inbound节点还是OutBound节点无关。 - 异常处理的最佳实践
Pipeline添加ChannelHandler最后添加ExceptionCaughtHandler异常捕获处理器,所有异常都归异常捕获处理器按照异常类型处理
demo
package com.imooc.netty.ch6.exceptionspread;
import com.sun.corba.se.impl.presentation.rmi.ExceptionHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;
/**
* @author
*/
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
ch.pipeline().addLast(new ExceptionCaughtHandler());//异常处理器
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
throw new BusinessException("from InBoundHandlerB");//模拟异常触发
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InBoundHandlerB.exceptionCaught()");
ctx.fireExceptionCaught(cause);//传递异常
}
}
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutBoundHandlerA.exceptionCaught()");
ctx.fireExceptionCaught(cause);//传递异常
}
}
public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// ..
if (cause instanceof BusinessException) {
System.out.println("BusinessException");
}
}
}
输出.png
按添加顺序,这里InBoundHandlerA不输出
异常传播.png
异常传播.png
异常传播.png
#AbstractChannelHandlerContext类
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event", cause);
}
return;
}
invokeExceptionCaught(cause);
}
private static boolean inExceptionCaught(Throwable cause) {
do {
StackTraceElement[] trace = cause.getStackTrace();
if (trace != null) {
for (StackTraceElement t : trace) {
if (t == null) {
break;
}
if ("exceptionCaught".equals(t.getMethodName())) {
return true;
}
}
}
cause = cause.getCause();
} while (cause != null);
return false;
}
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);//调用ChannelHander中定义的exceptionCaught方法
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug(
"An exception {}" +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
ThrowableUtil.stackTraceToString(error), cause);
} else if (logger.isWarnEnabled()) {
logger.warn(
"An exception '{}' [enable DEBUG level for full stacktrace] " +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:", error, cause);
}
}
} else {
fireExceptionCaught(cause);//
}
}
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
invokeExceptionCaught(next, cause);
return this;
}
//next,传播到下一个
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}
//一直到tail节点
protected void onUnhandledInboundException(Throwable cause) {
try {
//捕获未处理的异常信息
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);//进行释放,防止内存泄漏
}
}
网友评论