在使用netty的时候,由于经常要自定义channelHander,去处理我们自己的业务,或者是使用特定的解码器,编码器。
demo代码如下,其中TimeClientHandler是自定义的一个channelHandler
public class TimeClient {
public static void main(String args[]) {
connect();
}
private static void connect() {
//用于客户端处通道的读写
EventLoopGroup work = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(work).option(ChannelOption.TCP_NODELAY, true).channel(NioSocketChannel.class)
.handler(new TimeClientHandler());
ChannelFuture cf = null;
try {
//一直阻塞,直到连接上服务端
cf = b.connect(ConnectConfig.getHost(), ConnectConfig.getPort()).sync();
//一直阻塞,直到该通道关闭
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//避免线程没有杀死
work.shutdownGracefully();
}
}
}
TimeClientHandler代码如下
public class TimeClientHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
}
}
那么channel是如何维护自己的channelHandler链呢?
首先有一个比较重要的概念。ChannelPipeline。每个channel都会有一个自己的channelPipeline。
在讲ChannelPipeline之前,先了解一下ChannelPipeline的结构。
ChannelPipeline是ChannelInboundInvoker与ChannelOutboundInvoker的子类。
结构比较简单,就不画图了。
ChannelInboundInvoker与ChannelOutboundInvoker其实都是接口,没有任何实现。
ChannelInboundInvoker
channelInBoundInvoker这个接口,有点像回调,都是channel完成各种类的操作之后,去调用对应的方法。
这个fire有点像发事件类似。
public interface ChannelInboundInvoker {
ChannelInboundInvoker fireChannelRegistered();
ChannelInboundInvoker fireChannelUnregistered();
ChannelInboundInvoker fireChannelActive();
ChannelInboundInvoker fireChannelInactive();
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
ChannelInboundInvoker fireUserEventTriggered(Object event);
ChannelInboundInvoker fireChannelRead(Object msg);
ChannelInboundInvoker fireChannelReadComplete();
ChannelInboundInvoker fireChannelWritabilityChanged();
}
ChannelOutboundInvoker
ChannelOutboundInvoker是主动发起的,比如channel的的connect,disconnect等等。
public interface ChannelOutboundInvoker {
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
ChannelFuture disconnect(ChannelPromise promise);
ChannelFuture close(ChannelPromise promise);
ChannelFuture deregister(ChannelPromise promise);
ChannelOutboundInvoker read();
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
ChannelPromise newPromise();
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable cause);
ChannelPromise voidPromise();
}
ChannelPipeline
现在结合一下上述接口,说明了ChannelPipeline具备的基本功能,那就是即可以主动去做一些事情(如连接),也可以被动的去做一些事情(如作为回调,比如fireChannelReadComplete,读取完成之后去做些什么)。
另外ChannelPipeline提供的一些方法, 可以对ChannelHandler进行增删改查。另外对接口的一些方法进行了覆写,返回类型改为ChannelPipeline.
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
<T extends ChannelHandler> T remove(Class<T> handlerType);
ChannelHandler removeFirst();
ChannelHandler removeLast();
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
ChannelHandler newHandler);
ChannelHandler first();
ChannelHandlerContext firstContext();
ChannelHandler last();
ChannelHandlerContext lastContext();
ChannelHandler get(String name);
ChannelHandlerContext context(ChannelHandler handler);
ChannelHandlerContext context(String name);
ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
Channel channel();
List<String> names();
Map<String, ChannelHandler> toMap();
@Override
ChannelPipeline fireChannelRegistered();
@Override
ChannelPipeline fireChannelUnregistered();
@Override
ChannelPipeline fireChannelActive();
@Override
ChannelPipeline fireChannelInactive();
@Override
ChannelPipeline fireExceptionCaught(Throwable cause);
@Override
ChannelPipeline fireUserEventTriggered(Object event);
@Override
ChannelPipeline fireChannelRead(Object msg);
@Override
ChannelPipeline fireChannelReadComplete();
@Override
ChannelPipeline fireChannelWritabilityChanged();
@Override
ChannelPipeline flush();
介绍完以上的概念。
下面先说一下,ChannelPipeline是如何初始化的。
这里又不得不回到Channel的初始化。
ChannelPipeline初始化
channelPipeline的初始化代码如下,channel进行实例化的时候,会顺便将channelPipeline实例化
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
channel的实例化以及初始化
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
//由此看来channelPipeline的类型是DefaultChannelPipeline
// 下面看看DefaultChannelPipeline的构造方法
return new DefaultChannelPipeline(this);
}
}
channelPipeline的实例化
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//创建tail head的容器,是一个双向链表。AbstractChannelHandlerContext 是前面提到的ChannelInboundInvoker以及ChannelOutboundInvoker的子类
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
}
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
}
tail以及head的实例化, 从代码中是可以看出tail是属于InBound类型,而head是属于OutBound类型
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
}
ChannelPipeline初始化完之后,接下来就是看如何将ChannelHandlerContext放到channelPipeline中
如何将ChannelHandlerContext放到channelPipeline中
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
实例化通道
channel = channelFactory.newChannel();
初始化通道,在初始化通道的时候,会构建一个新的ChannelHandlerContext,并且加入到channel对应的ChannelPipeline中。具体还是看看Init方法内部,是在BootStrap中实现的
===========================
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
省略
}
}
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
//这个handler其实就是我们自己定义的handler。具体看DefalutChannelPipeline中的实现。
p.addLast(config.handler());
省略.....
}
DefalutChannelPipeline中addLast的实现。
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//检查是否已经加过了
checkMultiplicity(handler);
//生成新的channelHandlerContext.这个group传进来的是空,而channelPipeline跟channel有关系,EventExecutorGroup 就会使用channel所绑定的EventLoop
newCtx = newContext(group, filterName(name, handler), handler);
============================这个方法需要重点看看,其实说白了就是将这个ChannelHandlerContext插入到Head以及tail中间。代码在下面。
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
下面说白了就是将该ChannelHandlerContext的handlerState改为ADD_COMPLETE
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
}
通过以上,就将新生成的ChannelHandlerContext插入到ChannelPipeline中去了。
如何将ChannelHandler放入ChannelPipeline中
其实是在初始化完channel,后面进行注册的时候。
具体的代码位置。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final DefaultChannelPipeline pipeline;
protected abstract class AbstractUnsafe implements Unsafe {
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//通道进行注册。
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
通道对应的pipeline进行注册事件的发起。....................................................这里是需要重点看的。
因为pipeline对应的实现类是DefaultChannelPipeline。所以直接进入方法内部查看
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
//因为一开始注册就是在异步中发起的,所以必然在eventLoop的线程中。
if (executor.inEventLoop()) {
所以逻辑会走到这里。是一个private方法。
具体往里面走============================================
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
这个方法是true,因为传进来的head,本身的handlerState == ADD_COMPLETE,所以会返回true
if (invokeHandler()) {
try {
head实现的方法就是返回自身。而channelRegistered这个方法,也被覆写了。
接下来看看headContext对于channelRegistered的实现。=====================
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
fireChannelRegister方法是在AbstractChannelHandlerContext中实现的
ctx.fireChannelRegistered();
}
}
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
//因为一开始传进来是head,说白了就是找到第一个为InBound的handlerContext
//因为一开始就是next,所以这个东西,就是我们上面生成的ctx(ChannelHandlerContext),巧了一开始我们定义的channelHander恰好是ChannelInboundHandler的子类,且生成ctx的时候,inbound属性为true。
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
//这个方法的返回值就是前面生成的ctx。往下走.....就是下面这个方法了。
invokeChannelRegistered(findContextInbound());
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
handler方法的实现在DefalutChannelHandlerContext。就是返回ChannelHandler.其实就是我们自己定义的ChannelHandler. 我们自己定义的ChannelHandler就是TimeClientHandler。
channelRegistered这个方法是在ChannelInitializer中实现的。
因此看具体实现。
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
}
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
这个方法就会调用,我们去实现的initChannel方法了。
if (initChannel(ctx)) {
再发起channelRegister.因为可能我们不止添加一个ChannelInitializer。还有剩余的ChannelHandler需要被加进去。
ctx.pipeline().fireChannelRegistered();
} else {
ctx.fireChannelRegistered();
}
}
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
//判断是否初始化过,initMap是线程安全的map。相当于一个锁的操作。
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
再往里面走。这里就是我们自己实现的方法了。
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
最后把生成的ctx移除。从链表中移除
remove(ctx);
}
return true;
}
return false;
}
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
synchronized (this) {
remove0(ctx);
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
callHandlerRemoved0(ctx);
return ctx;
}
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
}
public class TimeClientHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
重复上述的过程。。。
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new TimeClientChannelHandlerAdapter());
}
}
添加的ChannelHandler是如何传播的呢?
public class TimeClientHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
重复上述的过程。。。
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
}
}
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
这个方法就会调用,我们去实现的initChannel方法了。
if (initChannel(ctx)) {
再发起channelRegister.因为可能我们不止添加一个ChannelInitializer(实现这个接口可以自行添加ChannelHandler)。还有剩余的ChannelHandler需要被加进去。
原先一开始是 head -> ctx(包含了我们自定义的TimeClientHandler ) -> tail
后面变成head ->LineBasedFrameDecoder->StringDecoder->tail
所以再次fire的时候,当然是会把新加进去的channelHandler的channelRegistered方法执行一遍。
其实像LineBasedFrameDecoder,StringDecoder的channelRegistered都是用父类的实现,所以都是简单的做一个传播,并没有太复杂的实现。===========================代码如下
ctx.pipeline().fireChannelRegistered();
} else {
ctx.fireChannelRegistered();
}
}
}
因为我们自定义添加的LineBasedFrameDecoder,StringDecoder都是ChannelInboundHandlerAdapter 的子类
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//再次传播
ctx.fireChannelRegistered();
}
}
至此,就完了~也说明了一个东西。InBound的channelHandler是从前到后去执行的。
总结一下。
1.在channel实例化的时候,先实例化ChannelPipeline.在此过程中会实例化head和tail,这俩个channelHandlerContext的链。
2.随后,在channel初始化完,就要进行注册。
3.注册的过程中,当channel注册到对应的selector中后,就要将对应的ChannelHandler都注册到ctx(channelHandlerContext)链中。
4.首先会从链的头部,head开始,找到第一个为Inbound的ctx,逐级传播。如果在中间遇到实现了ChannelInitializer的子类,可能会往中间插入ctx,再把旧的ctx从链中移除,然后再次逐级传播(避免有ctx没有被插入链中),然后再次从head逐级传播。当然很多channelHandler的注册方法,其实只是为了传播,没有对channelPipeline中的ctx链做任何操作。
网友评论