待完善
Channel、EventLoop和ChannelFuture
Channel
、EventLoop
和ChannelFuture
这些类组合在一起,可以被认为是Netty网络抽象的代表:
-
Channel
—Socket; -
EventLoop
—控制流、多线程处理、并发; -
ChannelFuture
—异步通知
Channel
暂略
EventLoop
EventLoop
定义了Netty的核心抽象,用于处理连接的生命周期中所发生的事件。
-
一个
EventLoopGroup
包含一个或者多个EventLoop
; -
一个
EventLoop
在它的生命周期内只和一个Thread
绑定; -
所有由
EventLoop
处理的I/O事件都将在它专有的Thread
上被处理; -
一个
Channel
在它的生命周期内只注册于一个EventLoop
; -
一个
EventLoop
可能会被分配给一个或多个Channel
。
注意,在这种设计中,一定程度上消除了对于同步的需要。
ChannelFuture
暂略
ChannelHandler和ChannelPipeline
ChannelHandler
Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in its ChannelPipeline
.
-
ChannelHandler
public interface ChannelHandler { // ChannelHandler添加到ChannelPipeline中时被调用 void handlerAdded(ChannelHandlerContext ctx) throws Exception; // ChannelHandler从ChannelPipeline中移除时被调用 void handlerRemoved(ChannelHandlerContext ctx) throws Exception; // 处理过程中在ChannelPipeline中有错误产生时被调用 void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; }
-
图片.pngChannelHandler
层次结构如下图所示:ChannelHandler
itself does not provide many methods, but you usually have to implement one of its subtypes:-
ChannelInboundHandler
to handle inbound I/O events, and -
ChannelOutboundHandler
to handle outbound I/O operations.
Alternatively, the following adapter classes are provided for your convenience:
-
ChannelInboundHandlerAdapter
to handle inbound I/O events, -
ChannelOutboundHandlerAdapter
to handle outbound I/O operations, and -
ChannelDuplexHandler
to handle both inbound and outbound events
-
A ChannelHandler often needs to store some stateful information?
-
ChannelInboundHandler
public interface ChannelInboundHandler extends ChannelHandler { void channelRegistered(ChannelHandlerContext ctx) throws Exception; void channelUnregistered(ChannelHandlerContext ctx) throws Exception; void channelActive(ChannelHandlerContext ctx) throws Exception; void channelInactive(ChannelHandlerContext ctx) throws Exception; void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; void channelReadComplete(ChannelHandlerContext ctx) throws Exception; void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; }
-
ChannelInboundHandlerAdapter
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
ChannelInboundHandlerAdapter
的channelRead
方法处理完消息后不会自动释放消息,若想自动释放收到的消息,可以使用SimpleChannelInboundHandler
。Usually,
channelRead()
handler method is implemented like the following:@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
-
ChannelOutboundHandler
public interface ChannelOutboundHandler extends ChannelHandler { void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception; void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception; void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void read(ChannelHandlerContext ctx) throws Exception; void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; void flush(ChannelHandlerContext ctx) throws Exception; }
-
ChannelOutboundHandlerAdapter
```java
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
```
ChannelHandlerContext
使ChannelHandler
能够与其ChannelPipeline
和其他处理程序进行交互。除其他事项外,处理程序可以通知ChannelPipeline
中的下一个ChannelHandler
,也可以动态修改它所属的ChannelPipeline
。
-
Notify
You can notify the closest handler in the same ChannelPipeline by calling one of the various methods provided here.
-
Modifying a pipeline
You can get the
ChannelPipeline
your handler belongs to by callingpipeline()
. A non-trivial application could insert, remove, or replace handlers in the pipeline dynamically at runtime. -
Retrieving for later use
You can keep the ChannelHandlerContext for later use, such as triggering an event outside the handler methods, even from a different thread.
public class MyHandler extends ChannelDuplexHandler { private ChannelHandlerContext ctx; public void beforeAdd(ChannelHandlerContext ctx) { this.ctx = ctx; } public void login(String username, password) { ctx.write(new LoginMessage(username, password)); } ... }
-
Storing stateful information
attr(AttributeKey)
allow you to store and access stateful information that is related with a handler and its context. Please refer toChannelHandler
to learn various recommended ways to manage stateful information. -
A handler can have more than one context
Please note that a
ChannelHandler
instance can be added to more than oneChannelPipeline
. It means a singleChannelHandler
instance can have more than oneChannelHandlerContext
and therefore the single instance can be invoked with differentChannelHandlerContexts
if it is added to one or moreChannelPipelines
more than once.For example, the following handler will have as many independent AttributeKeys as how many times it is added to pipelines, regardless if it is added to the same pipeline multiple times or added to different pipelines multiple times:
public class FactorialHandler extends ChannelInboundHandlerAdapter { private final AttributeKey<Integer> counter = AttributeKey.valueOf("counter"); // This handler will receive a sequence of increasing integers starting // from 1. @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { Integer a = ctx.attr(counter).get(); if (a == null) { a = 1; } attr.set(a * (Integer) msg); } } // Different context objects are given to "f1", "f2", "f3", and "f4" even if // they refer to the same handler instance. Because the FactorialHandler // stores its state in a context object (using an AttributeKey), the factorial is // calculated correctly 4 times once the two pipelines (p1 and p2) are active. FactorialHandler fh = new FactorialHandler(); ChannelPipeline p1 = Channels.pipeline(); p1.addLast("f1", fh); p1.addLast("f2", fh); ChannelPipeline p2 = Channels.pipeline(); p2.addLast("f3", fh); p2.addLast("f4", fh);
ChannelPipeline
ChannelPipeline
提供了ChannelHandler
链的容器,并定义了用于在该链上传播入站和出站事件流的API。当Channel
被创建时,它会被自动地分配到它专属的ChannelPipeline
。
-
Creation of a pipeline
==Each channel has its own pipeline and it is created automatically== when a new channel is created.
-
How an event flows in a pipeline
The following diagram describes how I/O events are processed by
ChannelHandler
s in aChannelPipeline
typically. An I/O event is handled by either aChannelInboundHandler
or aChannelOutboundHandler
and be forwarded to its closest handler by calling the event propagation methods defined inChannelHandlerContext
, such asChannelHandlerContext.fireChannelRead(Object)
andChannelHandlerContext.write(Object)
.I/O Request via Channel or ChannelHandlerContext +---------------------------------------------------+---------------+ | ChannelPipeline | | | \|/ | | +---------------------+ +-----------+----------+ | | | Inbound Handler N | | Outbound Handler 1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler N-1 | | Outbound Handler 2 | | | +----------+----------+ +-----------+----------+ | | /|\ . | | . . | | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| | [ method call] [method call] | | . . | | . \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 2 | | Outbound Handler M-1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 1 | | Outbound Handler M | | | +----------+----------+ +-----------+----------+ | | /|\ | | +---------------+-----------------------------------+---------------+ | \|/ +---------------+-----------------------------------+---------------+ | | | | | [ Socket.read() ] [ Socket.write() ] | | | | Netty Internal I/O Threads (Transport Implementation) | +-------------------------------------------------------------------+
An inbound event is handled by the inbound handlers in the ==bottom-up direction== as shown on the left side of the diagram. An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the diagram. The inbound data is often read from a remote peer via the actual input operation such as
SocketChannel.read(ByteBuffer)
. If an inbound event goes beyond the top inbound handler, it is discarded silently, or logged if it needs your attention.An outbound event is handled by the outbound handler in the ==top-down direction== as shown on the right side of the diagram. An outbound handler usually generates or transforms the outbound traffic such as write requests. If an outbound event goes beyond the bottom outbound handler, it is handled by an I/O thread associated with the Channel. The I/O thread often performs the actual output operation such as
SocketChannel.write(ByteBuffer)
. -
Forwarding an event to the next handler
a handler has to invoke the event propagation methods in
ChannelHandlerContext
to forward an event to its next handler. Those methods include:-
Inbound event propagation methods:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
-
Outbound event propagation methods:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)
-
-
Building a pipeline
A user is supposed to have one or more
ChannelHandler
s in a pipeline to receive I/O events (e.g. read) and to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the protocol and business logic:- Protocol Decoder - translates binary data (e.g.
ByteBuf
) into a Java object. - Protocol Encoder - translates a Java object into binary data.
- Business Logic Handler - performs the actual business logic (e.g. database access).
and it could be represented as shown in the following example:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16); ... ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new MyProtocolDecoder()); pipeline.addLast("encoder", new MyProtocolEncoder()); // Tell the pipeline to run MyBusinessLogicHandler's event handler methods // in a different thread than an I/O thread so that the I/O thread is not blocked by // a time-consuming task. // If your business logic is fully asynchronous or finished very quickly, you don't // need to specify a group. pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
- Protocol Decoder - translates binary data (e.g.
-
Thread safety
A
ChannelHandler
can be added or removed at any time because aChannelPipeline
is thread safe. For example, you can insert an encryption handler when sensitive information is about to be exchanged, and remove it after the exchange.
ServerBootstrap和Bootstrap
在深入地学习了ChannelPipeline
、ChannelHandler
和EventLoop
之后,你接下来 的问题可能是:“如何将这些部分组织起来,成为一个可实际运行的应用程序呢?”
答案是?==“引导”(Bootstrapping)==。简单来说,引导一个应用程序是指对它进行配置,并使它运行起来的过程—尽管该过程的具体细节可能并不如它的定义那样简单,尤其是对于一个网络应用程序来说。
引导类层次结构
图片.pngServerBootstrap
和Bootstrap
分别作用于==服务器==和==客户端==。ServerBootstrap
致力于使用一个==父Channel==来接受来自客户端的连接,并创建==子Channel==以用于它们之间的通信;而客户端只需要==一个单独的、没有父Channel的Channel==来用于所有的网络交互(这也适用于无连接的传输协议,如UDP,因为它们并不是每个连接都需要一个单独的Channel)。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>,
C extends Channel> implements Cloneable
子类型B
是其父类型的一个类型参数,因此可以返回到运行时实例的引用以 支持方法的链式调用(也就是所谓的流式语法)
为什么引导类是
Cloneable
?
你有时可能会需要创建多个具有类似配置或者完全相同配置的 Channel 。为了支持这种模式而又不 需 要 为 每 个 Channel 都 创 建 并 配 置 一 个 新 的 引 导 类 实 例 , AbstractBootstrap 被 标 记 为 了 Cloneable 。在一个已经配置完成的引导类实例上调用 clone() 方法将返回另一个可以立即使用的引 导类实例。
注意,这种方式只会创建引导类实例的 EventLoopGroup 的一个浅拷贝,所以,后者 将在所有克 隆的 Channel 实例之间共享。这是可以接受的,因为通常这些克隆的 Channel 的生命周期都很短暂,一 个典型的场景是——创建一个 Channel 以进行一次HTTP请求。
Bootstrap
Bootstrap
类被用于客户端或者使用了无连接协议的应用程序中。Bootstrap
类的API如下:
-
Bootstrap group(EventLoopGroup)
设置用于处理Channel所有事件的EventLoopGroup
-
Bootstrap channel( Class<? extends C>)
-
Bootstrap channelFactory(ChannelFactory<? extends C>)
channel()
方法指定了Channel的实现类。如果该实现类没提供默认的构造函数 , 可以通过调用channelFactory()
方法来指定一个工厂类,它将会被bind()
方法调用。 -
Bootstrap localAddress(SocketAddress)
指定Channel应该绑定到的本地地址。如果没有指定,则将由操作系统创建一个随机的地址。或者,也可以通过
bind()
或者connect()
方法指定localAddress。 -
<T> Bootstrap option(ChannelOption<T> option, T value)
设置
ChannelOption
, 其将被应用到每个新创建的Channel的ChannelConfig。 这些选项将会通过bind()
或者connect()
方法设置到Channel ,不管哪个先被调用。这个方法在Channel已经被创建后再调用将不会有任何的效果。支持的ChannelOption取决于使用的 Channel类型。 -
<T> Bootstrap attr( Attribute<T> key, T value)
指定新创建的Channel的属性值。这些属性值是通过
bind()
或者connect()
方法设置到Channel的,具体取决于谁最先被调用。这个方法在Channel被创建后将不会有任何的效果。 -
Bootstrap handler(ChannelHandler)
设置将被添加到
ChannelPipeline
以接收事件通知的ChannelHandler
。 -
Bootstrap clone()
创建一个当前Bootstrap的克隆,其具有和原始的Bootstrap相同的设置信息。
-
Bootstrap remoteAddress(SocketAddress)
设置远程地址。或者,也可以通过
connect()
方法来指定它。 -
ChannelFuture connect()
连接到远程节点并返回一个ChannelFuture,其将会在连接操作完成后接收到通知。
-
ChannelFuture bind()
绑定Channel并返回一个ChannelFuture,其将会在绑定操作完成后接收到通知,在那之后必须调用
Channel.connect()
方法来建立连接。
Bootstrap
类负责为客户端和使用无连接协议的应用程序创建Channel,如图所示:
在引导的过程中,在调用
bind()
或者connect()
方法之前,必须调用以下方法来设置所需的组件:
- group();
- channel()或者channelFactory();
- handler()
如果不这样做,则将会导致IllegalStateException 。对handler()方法的调用尤其重要,因为它需要配置好ChannelPipeline 。
ServerBootstrap
ServerBootstrap
的API如下:
-
group
设置ServerBootstrap要用的EventLoopGroup。这个EventLoopGroup将用于ServerChannel和被接受的子Channel的I/O处理。
-
channel
设置将要被实例化的ServerChannel类。
-
channelFactory
如果不能通过默认的构造函数创建Channel,那么可以提供一个ChannelFactory。
-
localAddress
指定ServerChannel应该绑定到的本地地址。如果没有指定,则将由操作系统使用一个随机地址。或者,可以通过
bind()
方法来指定该localAddress。 -
option
指定要应用到新创建的ServerChannel的ChannelConfig的
ChannelOption
。这些选项将会通过bind()
方法设置到Channel。在bind()
方法被调用之后,设置或者改变ChannelOption
都不会有任何的效果。所支持的ChannelOption取决于所使用的Channel类型。 -
childOption
指定当
子Channel
被接受时,应用到==子Channel的ChannelConfig==的ChannelOption
。所支持的ChannelOption取决于所使用的Channel的类型。 -
attr
指定
ServerChannel
上的属性,属性将会通过bind()
方法设置给Channel。在调用bind()
方法之后改变它们将不会有任何的效果 -
childAttr
将属性设置给已经被接受的
子Channel
。 -
handler
设置被添加到ServerChannel的ChannelPipeline中的ChannelHandler。
-
childHandler
设置将被添加到已被接受的子Channel的ChannelPipeline中的ChannelHandler。
handler()
方法和childHandler()
方法之间的区别是:前者所添加的ChannelHandler由接受子Channel的ServerChannel处理,而childHandler()方法所添加ChannelHandler将由已被接受的子Channel处理,其代表一个绑定到远程节点的套接字。 -
clone
克隆一个设置和原始的ServerBootstrap相同的ServerBootstrap。
-
bind
绑定ServerChannel并且返回一个ChannelFuture成后收到通知(带着成功或者失败的结果)
ServerBootstrap在bind()
方法被调用时创建了一个ServerChannel
,并且该ServerChannel
管理了多个子Channel
。
从Channel引导客户端?
ChannelInitializer
在引导的过程中调用了handler()
或者childHandler()
方法来添加单个的ChannelHandler
。对于简单的应用程序来说可能已经足够了,但是它不能满足更加复杂的需求。
可以通过在ChannelPipeline
中将它们链接在一起来部署尽可能多的ChannelHandler
,Netty提供了一个特殊的ChannelInitializer
类。
A special
ChannelInboundHandler
which offers an easy way to initialize aChannel
once it was registered to itsEventLoop
. Implementations are most often used in the context ofBootstrap.handler(ChannelHandler)
,ServerBootstrap.handler(ChannelHandler)
andServerBootstrap.childHandler(ChannelHandler)
to setup theChannelPipeline
of aChannel
.
public abstract class ChannelInitializer<C extends Channel>
extends ChannelInboundHandlerAdapter
它定义了下面的方法:
protected abstract void initChannel(C ch) throws Exception;
这个方法提供了一种将多个ChannelHandler
添加到一个ChannelPipeline
中的简便方法。只需要简单地向Bootstrap
或ServerBootstrap
的实例提供你的ChannelInitializer
实现即可,并且一旦Channel
被注册到了它的EventLoop
之后,就会调用你的initChannel
方法。在该方法返回之后,ChannelInitializer
的实例将会从ChannelPipeline
中移除它自己。
示例代码如下:
public class MyChannelInitializer extends ChannelInitializer {
public void initChannel(Channel channel) {
channel.pipeline().addLast("myHandler", new MyHandler());
}
}
ServerBootstrap bootstrap = ...;
...
bootstrap.childHandler(new MyChannelInitializer());
...
ChannelOption和属性
使用option()
方法可以将ChannelOption
应用到引导,你所提供的值将会被自动应用到引导所创建的所有Channel
(这样就可以不用在每个Channel创建时都手动配置它。)。可用的ChannelOption
包括了底层连接的详细信息,如keep-alive或者超时属性以及缓存区设置。
Netty应用程序通常与组织的专有软件集成在一起,而像Channel
这样的组件可能甚至会在正常的Netty生命周期之外被使用。 在某些常用的属性和数据不可用时, Netty提供了 AttributeMap
抽象(一个由Channel
和引导类提供的集合)以及AttributeKey<T>
(一个用于插入和获取属性值的泛型类)。使用这些工具,便可以安全地将任何类型的数据项与客户端和服务器Channel
(包含ServerChannel的子Channel)相关联了。
有点重要?
优雅关闭
优雅是指干净地释放资源。关闭Netty应用程序并没有太多的魔法,最重要的是需要关闭EventLoopGroup
,它将处理任何挂起的事件和任务,并且随后释放所有活动的线程。
// 创建处理 I/O 的 EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
// 创建一个 Bootstrap 类的实例并配置它
bootstrap.group(group)
.channel(NioSocketChannel.class);
...
Future<?> future = group.shutdownGracefully();
// block until the group has shutdown
future.syncUninterruptibly();
资源管理
每当通过调用 ChannelInboundHandler.channelRead()或者 ChannelOutbound- Handler.write()方法来处理数据时,你都需要确保没有任何的资源泄漏。
为了帮助你诊断潜在的(资源泄漏)问题,Netty提供了class ResourceLeakDetector1, 它将对你应用程序的缓冲区分配做大约 1%的采样来检测内存泄露。
网友评论