协议定制与数据序列化
1、长连接这里我们肯定是基于TCP的,而TCP协议其实默认已经支持长连接,但是socket连接存在随时断开的情况,这就需要有比较好的协议保障连接状态的检测。
2、定制数据序列化格式,建议使用protobuf或者thrift而不是htttp中常用的json,可以减少序列化与反序列化的开销。当然如果用一些其他的协议,你可能需要自己实现encoder decoder了,TCP是流,上层协议对TCP的流是要做分包粘包处理的,注意好对handler中channelRead和channelReadComplete的方法的复写。
基于Netty 设计的客户端架构
1、我们会需要设计一个客户端,就像netty的官方demo中做的那样,定义好bootstrap和nioEventLoopGroup。注意NioEventLoopGroup是可以复用的,线程池复用对客户端比较重要,在断线重连的时候会排上用场。
我以采用webSocket协议为例
mClientHandler = new ClientHandler(sURI); //客户端收到分包处理完的数据,然后开始分发
mMessageHandler = new MessageHandler(mHashMap, mBussinessCodeHelper); // 真正处理业务代码的handler
bootstrap.group(mWorkGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.remoteAddress(sURI.getHost(), sURI.getPort());
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new TbbLoggerHandler());
pipeline.addLast(new IdleStateHandler(200, 180, 0, TimeUnit.SECONDS)); //读超时与写超时检测的handler, 读超时200s比写超时时间长一些,发生读超时的时候直接断开重连了。
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH));
pipeline.addLast(mTbbClientHandler);
pipeline.addLast(mTbbMessageHandler);
}
});
try {
mChannel = bootstrap.connect().sync().channel();
mChannel.closeFuture().sync(); // 会阻塞
XGLog.logger_d(mChannel);
} catch (Exception e) {
XGLog.logger_d("exception " + e);
e.printStackTrace();
} finally {
XGLog.logger_d("workerGroup shall shutdown " + TextUtils.isEmpty(mToken));
if (!TextUtils.isEmpty(mToken)) {
mWorkGroup.schedule(new Runnable() {
@Override
public void run() {
connect(); // 断线重连,这里简单处理,就是断了以后每隔2s 尝试连接一次,其实为了省电需要限制次数并倍增间隔时间的
}
}, 2, TimeUnit.SECONDS);
}
}
2、设计好你的handler, netty框架的运用精髓基本都在handler当中,包括处理流解包然后处理业务最后发送数据,几乎全可以包含在handler当中,客户端主动发送数据依赖于channel,简单点讲就是channel 的 writeAndFlush,向缓冲区写数据并刷新缓冲区,刷新的操作其实就是发送数据了,socket的操作本质上都抽象成IO动作。一个简单的handler的例子,不一定能正常运行,只是作为例子,最为关键的几个方法
(1) channelRead0(ChannelHandlerContext ctx, Object msg)
处理解包后的数据,也可以分发数据包给下个handler
(2) channelActivie(ChannelHandlerContext ctx)
通道建立了,这个时候相当于tcp握手了
(3) channelInActive(ChannelHandlerContext ctx)
tcp断开连接
(4) excepitonCaught(ChannelHandlerContext ctx, Throwable cause)
异常处理,最好要处理,不处理也别忘了吧throwable发给下handler,这个一定得做
(5) userEventTriggered(final ChannelHandlerContext ctx, Object evt)
处理一些自定义的事件,包括读超时写超时这样的事件,充分体现了netty事件驱动的特点
@Sharable
public class ClientHandler extends SimpleChannelInboundHandler<Object> {
private static final int BLOCKING_QUEUE_SIZE = 1 << 12;
private static final Queue<MCProtocolPB.MCProtocol> mQueue = new LinkedList<>();
private static final long IDLE_TIME = (long) (5 * 1e9);
/**
* 用于 WebSocket 的握手
*/
private WebSocketClientHandshaker mHandshaker;
/**
*
*/
private ChannelPromise mChannelPromise;
private final PingWebSocketFrame mPingWebSocketFrame = new PingWebSocketFrame();
private final CloseWebSocketFrame mCloseWebSocketFrame = new CloseWebSocketFrame();
private ChannelHandlerContext mChannelHandlerContext;
/**
* 唯一的构造类
*
* @param uri WebSocket uri
*/
public ClientHandler(URI uri) {
mHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (!mHandshaker.isHandshakeComplete()) {
try {
mHandshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
mChannelPromise.setSuccess();
while (!mQueue.isEmpty()) {
ctx.writeAndFlush(mQueue.poll());
}
ctx.fireUserEventTriggered(Event.CONNECTED); //发送websocket协议连接正式建立的事件
} catch (WebSocketHandshakeException e) {
mChannelPromise.setFailure(e);
}
}
if (msg instanceof WebSocketFrame) {
ctx.fireChannelRead(((WebSocketFrame) msg).retain());
}
}
/**
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
mChannelHandlerContext = ctx;
mHandshaker.handshake(ctx.channel());
ctx.writeAndFlush(mPingWebSocketFrame.retain());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.fireUserEventTriggered(Event.DISCONNECTED);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
XGLog.logger_e("channel unregistered");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
XGLog.logger_e(cause.toString());
super.exceptionCaught(ctx, cause);
if (!mChannelPromise.isDone()) {
mChannelPromise.setFailure(cause);
}
cause.printStackTrace();
ctx.close();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
XGLog.logger_d("handler removed");
}
/**
*
*
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
XGLog.logger_i("handler added");
mChannelPromise = ctx.newPromise();
}
/**
* 端口闲时 发送心跳包 处理的方法
*
*/
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
final IdleStateEvent event = (IdleStateEvent) evt;
ctx.executor().execute(new Runnable() {
@Override
public void run() {
handleIdleEvent(ctx, event);
}
});
super.userEventTriggered(ctx, evt);
} else if (Event.REQUEST_TIME_OUT.equals(evt)) {
XGLog.logger_i("REQUEST triggered already");
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
}
/**
* 处理{@link IdleStateEvent}
*
* @param ctx
* @param event
*/
private void handleIdleEvent(final ChannelHandlerContext ctx, IdleStateEvent event) {
IdleState state = event.state();
if (IdleState.READER_IDLE.equals(state)) {
XGLog.logger_e("READ IDLE");
} else if (IdleState.WRITER_IDLE.equals(state)) {
XGLog.logger_e("WRITE IDLE");
ctx.writeAndFlush(mPingWebSocketFrame.retain()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else if (IdleState.ALL_IDLE.equals(state)) {
XGLog.logger_e("ALL IDLE");
}
}
long ticksInNanos() {
return System.nanoTime();
}
}
3、考虑好你的断线重连的情况,建议每次客户端发送数据后,服务端都给回包,如果链路长时间空闲,那么触发写超时事件,发送心跳包给服务端,其实也可以反过来服务端给客户端发数据,然后如果还发生读超时事件,相当于对方没有给回包,那么断开连接,尝试重连。
public class MyLoggerHandler extends LoggingHandler {
private static final long IDLE_TIME = (long) (9.9 * 1e9);
private long mLastWriteTime = -1;
private ScheduledFuture mScheduledFuture;
public MyLoggerHandler() {
super(LogLevel.INFO);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
XGLog.logger_i("read message " + msg);
long current = ticksInNanos();
long delta = Math.abs(current - mLastWriteTime);
if (delta < IDLE_TIME) {
if (mScheduledFuture != null) {
mScheduledFuture.cancel(false);
}
}
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
XGLog.logger_i("TbbLoggerHandler write message ");
mScheduledFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
long current = ticksInNanos();
long delta = Math.abs(current - mLastWriteTime);
XGLog.logger_i("current " + current + " last " + mLastWriteTime + " delta " + delta);
if (delta > IDLE_TIME) {
ctx.close();
}
}
}, 10, TimeUnit.SECONDS); // 10s 内没有收到服务端回执,断线重连
mLastWriteTime = ticksInNanos();
}
long ticksInNanos() {
return System.nanoTime();
}
}
4、如果客户端主动发起请求,那么通过我们的Client的channel引用,可以向服务端发送数据。
5、由于netty可以主动发起事件,在netty里处理完了数据如果要更新UI或者数据库,那么你需要设计一个简单的适配层,通过事件机制来触发事情就会变得简单。
针对网络波动情况的处理
1、如果发生可以主动检测到的链路断开的情况,一定会触发channelRemoved,然后channel会变成inActive,然后那个connect().sync()也就不再阻塞了,然后往下走,我们的代码中其实已经可以主动间隔2s去重连了。NioEventLoopGroup.exectue()类似于jdk的线程池,可以定时触发一个事件。
try {
mChannel = bootstrap.connect().sync().channel();
mChannel.closeFuture().sync(); // 会阻塞
XGLog.logger_d(mChannel);
} catch (Exception e) {
XGLog.logger_d("exception " + e);
e.printStackTrace();
} finally {
XGLog.logger_d("workerGroup shall shutdown " + TextUtils.isEmpty(mToken));
if (!TextUtils.isEmpty(mToken)) {
mWorkGroup.schedule(new Runnable() {
@Override
public void run() {
connect(); // 断线重连,这里简单处理,就是断了以后每隔2s 尝试连接一次,其实为了省电需要限制次数并倍增间隔时间的
}
}, 2, TimeUnit.SECONDS);
}
}
2、如果发生延时很长的情况,如果发送请求10s内没有读事件发生,那么你需要考虑重新建立连接了,简单的做法就是ChannelHandlerContext.close(),利用 1 中的NioEventLoopGroup线程池 mWorkGroup定时尝试连接,如果连接成功,该线程就阻塞,只有断开的时候才会跑到需要重连的地方。
3、如果打过电话或者检测到网络切换,那么你也需要断开然后重连,因为你的在移动网IP地址基本就变了,所以重连吧,谁让我们基于TCP/IP呢。这种情况需要借助Android的一些组件比如BroadCastReceiver来检测,与netty关系不大。
网友评论