美文网首页
NettyClient.java

NettyClient.java

作者: 上海马超23 | 来源:发表于2018-11-14 21:09 被阅读0次
public class NettyClientStream extends AbstractClientStream {
    // 发送请求报文
    @Override
    public void send(Object packet) {
        channel.writeAndFlush(packet);
    }
}

// consumer的client端
public class NettyClient extends AbstractClient {
    @Override
    public ClientStream connect(final ConnectionID connectionID) {
        Bootstrap bootstrap = new Bootstrap();
        // EventLoopGroup其实就是管理线程
        bootstrap.group(Holder.WORKER_POOL)//
                // 禁用了Nagle算法,允许小包的发送。对于延时敏感型,同时数据传输量比较小的应用,开启TCP_NODELAY选项无疑是一个正确的选择。
                .option(ChannelOption.TCP_NODELAY, true)//
                // 如果其绑定的ip和port和一个处于TIME_WAIT状态的socket冲突时,内核将忽略这种冲突
                .option(ChannelOption.SO_REUSEADDR, true)//
                // 默认使用对象池
                .option(ChannelOption.ALLOCATOR, Holder.byteBufAllocator)//
                .option(ChannelOption.AUTO_CLOSE, Boolean.TRUE)
                // 一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;
                .option(ChannelOption.ALLOW_HALF_CLOSURE, Boolean.FALSE)
                .channel(NioSocketChannel.class)//
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast("protocol", new NettyProtocolHandler())
                                .addLast("clientIdleHandler", new IdleStateHandler(getHbSentInterval(), 0, 0))
                                .addLast("clientHandler",
                                        new NettyClientStreamHandler(NettyClient.this, connectionID,
                                                clientStreamLifecycleListeners, clientStreamMessageListeners));
                    }
                });

        //int connectTimeout = connectionID.getParameter(TRConstants.CONNECT_TIMEOUT_KEY, 4000);
        int connectTimeout = connectionID.getServiceURL().getParameter(CONNECT_TIMEOUT_KEY, 4000);
        if (connectTimeout < 1000) {
            connectTimeout = 4000;
        }
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);

        if (isWaterMarkEnabled()) {
            bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
                    new WriteBufferWaterMark(getLowWaterMark(), getHighWaterMark()));
        }

        String targetIP = connectionID.getServiceURL().getHost();
        int targetPort = connectionID.getServiceURL().getPort();
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
        future.awaitUninterruptibly();

        ClientStream result = null;
        if (future.isSuccess()) {
            if (StreamUtils.streamOfChannel(future.channel()) == null) {
                NettyClientStream clientStream = new NettyClientStream(connectionID, future.channel());
                clientStream.setClient(this);
                StreamUtils.bindChannel(future.channel(), clientStream);
            }
            result = (ClientStream) StreamUtils.streamOfChannel(future.channel());
        }

        return result;
    }
}

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    // Bootstrap.connect实质是调用doConnect
    private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                channel.connect(remoteAddress, connectPromise);
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        });
    }

}

// 客户端的启动类父类,使用自限定范式
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    final ChannelFuture initAndRegister() {
        // 这里通过Class的反射newInstance创建channel实例
        Channel channel = channelFactory.newChannel();
        // init的实现是在子类: 客户端Bootstrap和服务端ServerBootstrap
        init(channel);
        // 将初始化好的 Channel 注册到 EventGroup 中
        ChannelFuture regFuture = config().group().register(channel);
        return regFuture;
    }
}

// 客户端NIO的Channel实现
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    // 构造方法,DEFAULT_SELECTOR_PROVIDER和操作系统有关,OSX上是KQueueSelectorProvider
    public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }
}

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    // 父类的构造函数,parent是null
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

    // 上一个super的实现
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        // NIO默认设置成非阻塞
        ch.configureBlocking(false);
    }

    // 上一个super的实现
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        // NioSocketChannelUnsafe实例
        unsafe = newUnsafe();
        // 每个channel有它自己的管道
        pipeline = newChannelPipeline();
    }

    // AbstractBootstrap.initAndRegister -> group().register(channel) -> MultithreadEventLoopGroup.register
    //  -> 通过 next() 获取一个可用的 SingleThreadEventLoop ->  SingleThreadEventLoop.register -> AbstractUnsafe.register
    //  -> AbstractUnsafe.register0
    private void register0(ChannelPromise promise) {
        // 调用具体channel子类的方法,NIO是把channel注册到eventLoop的selector上
        doRegister();

        pipeline.fireChannelRegistered();
    }

    @Override
    protected void doRegister() throws Exception {
        // 将底层的socketChannel注册到eventLoop的selector上
        selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
        return;
    }
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // channel和eventloop产生引用关联
        AbstractChannel.this.eventLoop = eventLoop;

        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, promise);
    }
}

protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
    @Override
    public final void connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        // 调用java nio的connect
        doConnect(remoteAddress, localAddress);
        // 触发fireChannelActive事件,发送通道激活消息,Inbound事件的起点
        fulfillConnectPromise(promise, wasActive);
    }
}

protected abstract class AbstractUnsafe implements Unsafe {
     @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {

        AbstractChannel.this.eventLoop = eventLoop;

        // 判断是在eventLoop子线程,还是在主线程
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            // 执行子线程注册
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        }
    }
}

public class DefaultChannelPipeline implements ChannelPipeline {

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");

        // 管道机制的关键:双向链表结构
        tail = new TailContext(this); // tail是ChannelInboundHandler接口
        head = new HeadContext(this); // head是ChannelOutboundHandler

        head.next = tail;
        tail.prev = head;
    }

    // Bootstrap的connect实质是channel的connect -> pipeline.connect
    // 从链表的tail节点开始往前找,默认一直找到head节点的connect
    @Override
    public final ChannelFuture connect(SocketAddress remoteAddress) {
        return tail.connect(remoteAddress);
    }
}

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

    private final Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        // head调用父类构造是false+true, tail相反是true+false
        super(pipeline, null, HEAD_NAME, false, true);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }

    // 默认会一直传递到最后一个OutBoundChannelHandler
    @Override
    public void connect(ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    @Override
    public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        // 从 DefaultChannelPipeline 内的双向链表的 tail 开始, 不断向前寻找第一个 outbound 为 true 的 AbstractChannelHandlerContext
        final AbstractChannelHandlerContext next = findContextOutbound();
        // 调用outbound是true的context的 invokeConnect 方法
        next.invokeConnect(remoteAddress, localAddress, promise);
        return promise;
    }

    // 注册channel时候触发
    @Override
    public ChannelHandlerContext fireChannelRegistered() {
        // findContextInbound 获取pipeline双向链表里第1个inbound的ChannelHandlerContext, 也就是ChannelInitializer的ChannelHandlerContext
        // ChannelInitializer就是Bootstrap.handler(new ChannelInitializer(...))里添加的自定义ChannelHandler
        invokeChannelRegistered(findContextInbound());
        return this;
    }

    private void invokeChannelRegistered() {
        // handler返回的就是context下的ChannelHandler,执行注册
        // 如果是ChannelInitializer,就会触发在Bootstrap里自定义的initChannel方法,remove自己替换真正的ChannelHandler
        ((ChannelInboundHandler) handler()).channelRegistered(this);
    }
}


public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (initChannel(ctx)) {
            ctx.pipeline().fireChannelRegistered();
        } else {
            ctx.fireChannelRegistered();
        }
    }
}

相关文章

网友评论

      本文标题:NettyClient.java

      本文链接:https://www.haomeiwen.com/subject/xjssfqtx.html