ChannelHandler
ChannelHandler基本上是我们第一次接触Netty就会碰到的对象,我们自定义的各种ChannelHandler主要用于处理我们系统的各种业务逻辑,比如发生了active事件后的处理逻辑,发生了读事件的处理逻辑,下面先来看一下ChannelHandler的类继承图:
image.png
ChannelHandler被分为两部分,分别为ChannelOutboundHandler与ChannelInboundHandler。其中ChannelInboundHandler提供了一些方法再接收数据或Channel状态改变时被调用,ChannelOutboundHandler则提供了与网络I/O相关的方法。
同时Netty也提供了相应的Adapter,主要是为了我们编码的方便,我们可以通过继承Adapter,这样ChannelHandler里便只需要关注需要重写的方法。而不是实现所有接口的方法。
StringDecoder源码
我们来关注一下StringDecoder这个类,StringDecoder用于对读入的数据根据指定的字符编码进行转换。StringDecoder继承MessageToMessageDecoder,而MessageToMessageDecoder继承ChannelInboundHandlerAdapter。StringDecoder便是一个典型的ChannelInboundHandler啦,先来看看MessageToMessageDecoder里都有那些内容,源码如下:
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {
//matcher用于检验是否对msg进行Decoder
private final TypeParameterMatcher matcher;
/**
* Create a new instance which will try to detect the types to match out of the type parameter of the class.
*/
protected MessageToMessageDecoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
}
/**
* Create a new instance
*
* @param inboundMessageType The type of messages to match and so decode
*/
protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
matcher = TypeParameterMatcher.get(inboundMessageType);
}
/**
* Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
* {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*/
public boolean acceptInboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
//可以看出MessageToMessageDecoder只对 channelRead进行了重写,这就是Adapter提供的好处
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//这里的out是个list对象
CodecOutputList out = CodecOutputList.newInstance();
try {
//acceptInboundMessage判断是否对msg进行解析
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
//这是个留给子类实现的方法啦, 也就是我们的StringDecoder里会实现的方法啦
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
int size = out.size();
//对out里的对象触发fireChannelRead,让其它的channelhandler处理
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
out.recycle();
}
}
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}
MessageToMessageDecoder方法只做了两件事:1:判断当前个对象是否需要调用decode方法,2:将decode结果的对象调用fireChannelRead方法交给其它的ChannelHandler处理。StringDecoder类里的方法就更简单了,源码如下:
@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
// TODO Use CharsetDecoder instead.
//传入字节码
private final Charset charset;
/**
* Creates a new instance with the current system character set.
*/
public StringDecoder() {
this(Charset.defaultCharset());
}
/**
* Creates a new instance with the specified character set.
*/
public StringDecoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//这里对msg进行处理
out.add(msg.toString(charset));
}
}
StringEncoder源码
我们再来关注一下StringEncoder的处理流程,StringEncoder用于对需要写的数据进行字符编码,StringEncoder继承自MessageToMessageEncoder,而MessageToMessageEncoder又继承ChannelOutboundHandlerAdapter。下面是MessageToMessageEncoder的源码:
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
private final TypeParameterMatcher matcher;
/**
* Create a new instance which will try to detect the types to match out of the type parameter of the class.
*/
protected MessageToMessageEncoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
}
/**
* Create a new instance
*
* @param outboundMessageType The type of messages to match and so encode
*/
protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
matcher = TypeParameterMatcher.get(outboundMessageType);
}
/**
* Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
* {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*/
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
//只需要关注这个方法啦,这里会对面要写的数据进行encode
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
CodecOutputList out = null;
try {
//跟上面decode一样,需要验证msg能不能处理
if (acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
//具体的encode留给子类处理
encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
if (out.isEmpty()) {
out.recycle();
out = null;
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
//out不为空的话,就会调用ctx的witer方法触发写数据的逻辑啦
if (out != null) {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
ctx.write(out.get(0), promise);
} else if (sizeMinusOne > 0) {
// Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
// See https://github.com/netty/netty/issues/2525
ChannelPromise voidPromise = ctx.voidPromise();
boolean isVoidPromise = promise == voidPromise;
for (int i = 0; i < sizeMinusOne; i ++) {
ChannelPromise p;
if (isVoidPromise) {
p = voidPromise;
} else {
p = ctx.newPromise();
}
ctx.write(out.getUnsafe(i), p);
}
ctx.write(out.getUnsafe(sizeMinusOne), promise);
}
out.recycle();
}
}
}
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}
MessageToMessageEncoder类里也只做了三件事:1:判断当前的对象是否需要进行encoder。2:调用子类encoder方法对对象进行encoder。3:将encoder好了的对象调用发送逻辑。下面是StringEncoder源码:
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
// TODO Use CharsetEncoder instead.
private final Charset charset;
/**
* Creates a new instance with the current system character set.
*/
public StringEncoder() {
this(Charset.defaultCharset());
}
/**
* Creates a new instance with the specified character set.
*/
public StringEncoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
if (msg.length() == 0) {
return;
}
//根据 charset将String转成ByteBuf对象
out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}
}
ChannelPipeline
ChannelPipeline用于组织ChannelHandlerContext(内部含有ChannelHandler),在Netty里采用的是双端链表的方式来管理ChannelHandlerContext。在ChannelPipeline里提供了各种对双端链表处理的方法,同时也提供了各种触发ChannelHandlerContext的方法,比如:fireChannelActive方法,下面是部分源码:
public class DefaultChannelPipeline implements ChannelPipeline {
//双端链表的head对象
final AbstractChannelHandlerContext head;
//双端链表的tail对象
final AbstractChannelHandlerContext tail;
//持用的channel对象
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;
//的链表的未位增加一个ChannelHandler
public final ChannelPipeline addLast(ChannelHandler handler) {
return addLast(null, handler);
}
//的链表的未位增加一个ChannelHandler ,需要传入这个ChannelHandler的名称
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
//最终会调用到这个方法来对channelHandler处理
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
//这是一个同步方法,需要锁住这个pipeline对象
synchronized (this) {
//参数合法性验证
checkMultiplicity(handler);
//这里会将ChannelHandler 包装成ChannelHandlerContext对象,这也就是为什么双端链表里存的是ChannelHandlerContext啦其中filterName会对为null的name生成一个名称
newCtx = newContext(group, filterName(name, handler), handler);
//这里才是具体处理链表的方法啦
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
//下面的方法是对链表进行操作的代码
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
//pipeline里提供了类似fireChannelActive方法,这些方法最络会调用到channelHandler对应的方法上
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
}
ChannelHandlerContext
ChannelHandlerContext对于连接ChannelHandler与ChannelPipeline。
ChannelHandlerContext内部持有ChannelHandler对象,同时又是ChannelPipeline链表里的节点,串起了ChannelPipeline的整个逻辑,下面来看看ChannelHandlerContext最重要的类AbstractChannelHandlerContext源码:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
//当前ChannelHandlerContext指向的下一个ChannelHandlerContext
volatile AbstractChannelHandlerContext next;
//当前ChannelHandlerContext指向的前一个ChannelHandlerContext
volatile AbstractChannelHandlerContext prev;
//用于标识channelHanlder是否为inbound
private final boolean inbound;
//用于标识channelHanlder是否为outbound
private final boolean outbound;
//同时也持胡pipeline对象
private final DefaultChannelPipeline pipeline;
//channelHandler取的名称
private final String name;
//是否需要排序
private final boolean ordered;
//构造方法如下
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
//这个方法是个static方法,用于给pipeline对象调用,
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//会触发ChannelHandlerContext的invokeChannelActive方法
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
//active的逻辑会调用到这个方法里
private void invokeChannelActive() {
//确认当前channelhandler的状态
if (invokeHandler()) {
try {
//最络会调用到channelhandler的channelActive方法,其中handler()方法是留给子类实现的可以看DefaultChannelHandlerContext源码部分
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
}
DefaultChannelHandlerContext源码就很简单了,提供了一个handler方法用于得到当前的ChannelHandler和判断当前ChannelHandler的类型。代码如下:
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}
ChannelHandlerContext里作为ChannelPipeline的链表节点,决定着事件是否进行向下流转,如果想让事件向下流转,只需要通过ChannelHandlerContext调用相应的fire方法就行了
网友评论