Connection 事件处理相关类
- ConnectionEventType:定义了三种 Connection 相关事件
- ConnectionEventHandler:Connection 事件处理器,处理两类事件
- Netty 定义的事件:例如 connect,channelActive 等
- SOFABolt 定义的事件:事件类型 ConnectionEventType
- RpcConnectionEventHandler:ConnectionEventHandler 实现类,重写了其 channelInactive 方法
- ConnectionEventListener:Connection 事件监听器,存储处理对应 ConnectionEventType 的 ConnectionEventProcessor 列表
- ConnectionEventProcessor:真正的 Connection 事件处理器接口
基本原理
- 继承 ConnectionEventProcessor,编写自定义的事件处理类
- 将自定义的事件处理类添加到 ConnectionEventListener 中
- 当触发 ConnectionEventType 相关事件时,ConnectionEventHandler 通知监听器 ConnectionEventListener,ConnectionEventListener 取出 ConnectionEventType 的自定义事件处理器列表,执行其 onEvent 方法
一、使用姿势
事件处理器
=========================== 连接处理器 ===========================
public class MyCONNECTEventProcessor implements ConnectionEventProcessor {
@Override
public void onEvent(String remoteAddr, Connection conn) {
System.out.println("hello, " + remoteAddr);
}
}
=========================== 断开处理器 ===========================
public class MyCLOSEEventProcessor implements ConnectionEventProcessor {
@Override
public void onEvent(String remoteAddr, Connection conn) {
System.out.println("bye, " + remoteAddr);
}
}
服务端
RpcServer server = new RpcServer(8888);
server.registerUserProcessor(new MyServerUserProcessor());
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, new MyCONNECTEventProcessor());
server.addConnectionEventProcessor(ConnectionEventType.CLOSE, new MyCLOSEEventProcessor());
server.start();
客户端
RpcClient client = new RpcClient();
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, new MyCONNECTEventProcessor());
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, new MyCLOSEEventProcessor());
client.init();
二、源码分析
2.1 服务端
public class RpcServer extends AbstractRemotingServer implements RemotingServer {
...
/** connection event handler */
private ConnectionEventHandler connectionEventHandler;
/** connection event listener */
private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
/** connection manager */
private DefaultConnectionManager connectionManager;
protected void doInit() {
...
// 服务端打开了 连接管理器 开关
if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
// 创建 ConnectionEventHandler 处理器
this.connectionEventHandler = new RpcConnectionEventHandler(switches());
// 创建 连接管理器
this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
// 设置 connectionManager 到 ConnectionEventHandler 中
this.connectionEventHandler.setConnectionManager(this.connectionManager);
// 设置 connectionEventListener 到 ConnectionEventHandler 中
this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
} else {
// 创建 ConnectionEventHandler 处理器
this.connectionEventHandler = new ConnectionEventHandler(switches());
// 设置 connectionEventListener 到 ConnectionEventHandler 中
this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
}
...
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
...
// 添加 connectionEventHandler 到 netty 的 pipeline
pipeline.addLast("connectionEventHandler", connectionEventHandler);
...
createConnection(channel);
}
private void createConnection(SocketChannel channel) {
Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
connectionManager.add(new Connection(channel, url), url.getUniqueKey());
} else {
new Connection(channel, url);
}
// 发布 ConnectionEventType.CONNECT 事件
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
}
});
}
public void addConnectionEventProcessor(ConnectionEventType type, ConnectionEventProcessor processor) {
this.connectionEventListener.addConnectionEventProcessor(type, processor);
}
}
2.2 客户端
public class RpcClient extends AbstractConfigurableInstance {
/** connection event handler */
private ConnectionEventHandler connectionEventHandler = new RpcConnectionEventHandler(switches());
/** reconnect manager */
private ReconnectManager reconnectManager;
/** connection event listener */
private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
/** connection manager */
private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy, connectionFactory, connectionEventHandler, connectionEventListener, switches());
public void init() {
...
this.connectionManager.init();
...
// 重连开关
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
// 创建 ReconnectManager
reconnectManager = new ReconnectManager(connectionManager);
// 设置 ReconnectManager 到 connectionEventHandler 中,当 channelInactive 时,进行重连操作
connectionEventHandler.setReconnectManager(reconnectManager);
}
}
public void addConnectionEventProcessor(ConnectionEventType type,
ConnectionEventProcessor processor) {
this.connectionEventListener.addConnectionEventProcessor(type, processor);
}
}
======================== DefaultConnectionManager ==========================
public void init() {
// 将当前的 DefaultConnectionManager 设置到 connectionEventHandler 中,用于 channelInactive 时,从 DefaultConnectionManager 中移除指定 Connection
this.connectionEventHandler.setConnectionManager(this);
// 将 connectionEventListener 设置到 connectionEventHandler 中
this.connectionEventHandler.setConnectionEventListener(connectionEventListener);
this.connectionFactory.init(connectionEventHandler);
}
======================== AbstractConnectionFactory ==========================
public void init(final ConnectionEventHandler connectionEventHandler) {
...
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
...
pipeline.addLast("connectionEventHandler", connectionEventHandler);
...
}
});
}
不论是服务端还是客户端,其实本质都在做一件事情:创建 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中。
之后当有 ConnectionEvent 触发时(无论是 Netty 定义的事件被触发,还是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会通过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。具体源码如下:
2.3 事件处理机制核心部分
======================== ConnectionEventListener ==========================
/**
* Listen and dispatch connection events.
*/
public class ConnectionEventListener {
private ConcurrentHashMap<ConnectionEventType, List<ConnectionEventProcessor>> processors = new ConcurrentHashMap<ConnectionEventType, List<ConnectionEventProcessor>>(3);
/**
* Dispatch events.
*/
public void onEvent(ConnectionEventType type, String remoteAddr, Connection conn) {
List<ConnectionEventProcessor> processorList = this.processors.get(type);
if (processorList != null) {
for (ConnectionEventProcessor processor : processorList) {
processor.onEvent(remoteAddr, conn);
}
}
}
/**
* Add event processor.
*/
public void addConnectionEventProcessor(ConnectionEventType type,
ConnectionEventProcessor processor) {
List<ConnectionEventProcessor> processorList = this.processors.get(type);
if (processorList == null) {
this.processors.putIfAbsent(type, new ArrayList<ConnectionEventProcessor>(1));
processorList = this.processors.get(type);
}
processorList.add(processor);
}
}
======================== ConnectionEventProcessor ==========================
/**
* Process connection events.
*/
public interface ConnectionEventProcessor {
/**
* Process event.
*/
public void onEvent(String remoteAddr, Connection conn);
}
======================== ConnectionEventHandler ==========================
/**
* Log the channel status event.
*/
@Sharable
public class ConnectionEventHandler extends ChannelDuplexHandler {
private ConnectionManager connectionManager;
private ConnectionEventListener eventListener;
private ConnectionEventExecutor eventExecutor;
private ReconnectManager reconnectManager;
private GlobalSwitch globalSwitch;
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isInfoEnabled()) {
...
}
super.connect(ctx, remoteAddress, localAddress, promise);
}
...
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
...
super.channelInactive(ctx);
Attribute attr = ctx.channel().attr(Connection.CONNECTION);
if (null != attr) {
// 进行重连操作,这也是 ConnectionEventHandler 持有 reconnectManager 引用的原因
if (this.globalSwitch != null
&& this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
Connection conn = (Connection) attr.get();
if (reconnectManager != null) {
reconnectManager.addReconnectTask(conn.getUrl());
}
}
// 调用 ConnectionEventType.CLOSE 事件
onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
if (event instanceof ConnectionEventType) {
switch ((ConnectionEventType) event) {
case CONNECT:
Channel channel = ctx.channel();
if (null != channel) {
Connection connection = channel.attr(Connection.CONNECTION).get();
// 调用 ConnectionEventType.CONNECT 事件
this.onEvent(connection, connection.getUrl().getOriginUrl(), ConnectionEventType.CONNECT);
}
break;
default:
return;
}
} else {
super.userEventTriggered(ctx, event);
}
}
private void onEvent(Connection conn, String remoteAddress, ConnectionEventType type) {
if (this.eventListener != null) {
// 1. 创建任务:该任务执行调用 ConnectionEventListener 的 onEvent
// 2. 使用 ConnectionEventExecutor 执行该任务
this.eventExecutor.onEvent(new Runnable() {
@Override
public void run() {
ConnectionEventHandler.this.eventListener.onEvent(type, remoteAddress, conn);
}
});
}
}
public void setConnectionEventListener(ConnectionEventListener listener) {
if (listener != null) {
// 设置 ConnectionEventListener
this.eventListener = listener;
// 创建 ConnectionEventExecutor,事件的异步执行器
if (this.eventExecutor == null) {
this.eventExecutor = new ConnectionEventExecutor();
}
}
}
public class ConnectionEventExecutor {
ExecutorService executor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000), new NamedThreadFactory("Bolt-conn-event-executor", true));
public void onEvent(Runnable event) {
executor.execute(event);
}
}
}
======================== RpcConnectionEventHandler ==========================
public class RpcConnectionEventHandler extends ConnectionEventHandler {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Connection conn = ctx.channel().attr(Connection.CONNECTION).get();
if (conn != null) {
// 这就是 ConnectionEventHandler 持有 ConnectionManager 引用的原因
this.getConnectionManager().remove(conn);
}
super.channelInactive(ctx);
}
}
事件的处理流程
事件触发 -> RpcConnectionEventHandler -> [ ConnectionEventListener -> ConnectionEventProcessor ]
方括号内的操作由 ConnectionEventExecutor 异步执行
事件的触发有两种:Netty定义的事件(例如 channelInactive)和 SOFABolt 定义的事件,前者直接在 Netty 定义的事件触发方法中进行(例如 channelInactive),后者在 userEventTriggered 方法中进行触发。
事件的触发时机
- ConnectionEventType.CONNECT
- AbstractConnectionFactory # createConnection(客户端)
- RpcServer # doInit # childHandler # initChannel # createConnection(服务端)
- ConnectionEventType.CLOSE
- ConnectionEventHandler # channelInactive
======================== 客户端创建连接 ==========================
@Override
public Connection createConnection(Url url) throws Exception {
Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()),
url.getVersion(), url);
// 发布 ConnectionEventType.CONNECT 事件
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
return conn;
}
网友评论