美文网首页
netty实战-netty client连接池设计

netty实战-netty client连接池设计

作者: Sam同学 | 来源:发表于2018-02-14 13:17 被阅读0次

    概述


    最近有很多网友在咨询netty client中,netty的channel连接池应该如何设计。这是个稍微有些复杂的主题,牵扯到蛮多技术点,要想在网上找到相关的又相对完整的参考文章,确实不太容易。

    在本篇文章中,会给出<font color="red">其中</font>一种解决方案,并且附带完整的可运行的代码。如果网友有更好的方案,可以回复本文,我们一起讨论讨论,一起开阔思路和眼界。

    阅读本文之前需要具备一些基础知识

    1、知道netty的一些基础知识,比如ByteBuf类的相关api;
    2、知道netty的执行流程;
    3、 必须阅读过我之前写的netty实战-自定义解码器处理半包消息,因为本文部分代码来自这篇文章。

    现在微服务非常的热门,也有很多公司在用。微服务框架中,如果是使用thrift、grpc来作为数据序列化框架的话,通常都会生成一个SDK给客户端用户使用。客户端只要使用这个SDK,就可以方便的调用服务端的微服务接口。本文讨论的就是使用SDK的netty客户端,它的netty channel连接池的设计方案。至于netty http client的channel连接池设计,基于http的,是另外一个主题了,需要另外写文章来讨论的。


    netty channel连接池设计


    DB连接池中,当某个线程获取到一个db connection后,在读取数据或者写数据时,如果线程没有操作完,这个db connection一直被该线程<font color="red">独占</font>着,直到线程执行完任务。如果netty client的channel连接池设计也是使用这种独占的方式的话,有几个问题。

    1、netty中channel的writeAndFlush方法,调用完后是不用等待返回结果的,writeAndFlush一被调用,马上返回。对于这种情况,是完全没必要让线程独占一个channel的。
    2、使用类似DB pool的方式,从池子里拿连接,用完后返回,这里的一进一出,需要考虑并发锁的问题。另外,如果请求量很大的时候,连接会不够用,其他线程也只能等待其他线程释放连接。

    因此不太建议使用上面的方式来设计netty channel连接池,channel独占的代价太大了。可以使用<font color="red">Channel数组</font>的形式, <font color="red">复用</font>netty的channel。当线程要需要Channel的时候,随机从数组选中一个Channel,如果Channel还未建立,则创建一个。如果线程选中的Channel已经建立了,则复用这个Channel。

    这里写图片描述

    假设channel数组的长度为4

    private Channel[] channels = new Channel[4];
    

    当外部系统请求client的时候,client从channels数组中随机挑选一个channel,如果该channel尚未建立,则触发建立channel的逻辑。无论有多少请求,都是复用这4个channel。假设有10个线程,那么部分线程可能会使用相同的channel来发送数据和接收数据。因为是随机选择一个channel的,多个线程命中同一个channel的机率还是很大的。如下图

    这里写图片描述

    10个线程中,可能有3个线程都是使用channel2来发送数据的。这个会引入另外一个问题。thread1通过channel2发送一条消息msg1到服务端,thread2也通过channel2发送一条消息msg2到服务端,当服务端处理完数据,通过channel2返回数据给客户端的时候,如何区分哪条消息是哪个线程的呢?如果不做区分,万一thread1拿到的结果其实是thread2要的结果,怎么办?

    那么如何做到让thread1和thread2拿到它们自己想要的结果呢?

    之前我在netty实战-自定义解码器处理半包消息一文中提到,自定义消息的时候,通常会在消息中加入一个序列号,用来唯一标识消息的。当thread1发送消息时,往消息中插入一个唯一的消息序列号,同时为thread1建立一个callback回调程序,当服务端返回消息的时候,根据消息中的序列号从对应的callback程序获取结果。这样就可以解决上面说到的问题。

    消息格式

    这里写图片描述

    消息、消息seq以及callback对应关系

    这里写图片描述 这里写图片描述

    OK,下面就基于上面的设计来进行编码。


    代码


    先来实现netty客户端,设置10个线程并发获取channel,为了达到真正的并发,利用CountDownLatch来做开关,同时channel连接池设置4个channel。

    package nettyinaction.nettyclient.channelpool.client;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.UnpooledByteBufAllocator;
    import io.netty.channel.Channel;
    import nettyinaction.nettyclient.channelpool.ChannelUtils;
    import nettyinaction.nettyclient.channelpool.IntegerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    
    public class SocketClient {
        public static void main(String[] args) throws InterruptedException {
            //当所有线程都准备后,开闸,让所有线程并发的去获取netty的channel
            final CountDownLatch countDownLatchBegin = new CountDownLatch(1);
    
            //当所有线程都执行完任务后,释放主线程,让主线程继续执行下去
            final CountDownLatch countDownLatchEnd = new CountDownLatch(10);
    
            //netty channel池
            final NettyChannelPool nettyChannelPool = new NettyChannelPool();
    
            final Map<String, String> resultsMap = new HashMap<>();
            //使用10个线程,并发的去获取netty channel
            for (int i = 0; i < 10; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //先让线程block住
                            countDownLatchBegin.await();
    
                            Channel channel = null;
                            try {
                                channel = nettyChannelPool.syncGetChannel();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
    
                            //为每个线程建立一个callback,当消息返回的时候,在callback中获取结果
                            CallbackService callbackService = new CallbackService();
                            //给消息分配一个唯一的消息序列号
                            int seq = IntegerFactory.getInstance().incrementAndGet();
                            //利用Channel的attr方法,建立消息与callback的对应关系
                            ChannelUtils.putCallback2DataMap(channel,seq,callbackService);
    
                            synchronized (callbackService) {
                                UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
                                ByteBuf buffer = allocator.buffer(20);
                                buffer.writeInt(ChannelUtils.MESSAGE_LENGTH);
    
                                buffer.writeInt(seq);
                                String threadName = Thread.currentThread().getName();
                                buffer.writeBytes(threadName.getBytes());
                                buffer.writeBytes("body".getBytes());
    
                                //给netty 服务端发送消息,异步的,该方法会立刻返回
                                channel.writeAndFlush(buffer);
    
                                //等待返回结果
                                callbackService.wait();
    
                                //解析结果,这个result在callback中已经解析到了。
                                ByteBuf result = callbackService.result;
                                int length = result.readInt();
                                int seqFromServer = result.readInt();
    
                                byte[] head = new byte[8];
                                result.readBytes(head);
                                String headString = new String(head);
    
                                byte[] body = new byte[4];
                                result.readBytes(body);
                                String bodyString = new String(body);
                                resultsMap.put(threadName, seqFromServer + headString + bodyString);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        finally {
                            countDownLatchEnd.countDown();
                        }
                    }
                }).start();
            }
    
            //开闸,让10个线程并发获取netty channel
            countDownLatchBegin.countDown();
    
            //等10个线程执行完后,打印最终结果
            countDownLatchEnd.await();
            System.out.println("resultMap="+resultsMap);
        }
    
        public static class CallbackService{
            public volatile ByteBuf result;
            public void receiveMessage(ByteBuf receiveBuf) throws Exception {
                synchronized (this) {
                    result = receiveBuf;
                    this.notify();
                }
            }
        }
    }
    
    

    其中IntegerFactory类用于生成消息的唯一序列号

    package nettyinaction.nettyclient.channelpool;
    
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class IntegerFactory {
        private static class SingletonHolder {
            private static final AtomicInteger INSTANCE = new AtomicInteger();
        }
    
        private IntegerFactory(){}
    
        public static final AtomicInteger getInstance() {
            return SingletonHolder.INSTANCE;
        }
    }
    
    

    而ChannelUtils类则用于建立channel、消息序列号和callback程序的对应关系。

    package nettyinaction.nettyclient.channelpool;
    
    import io.netty.channel.Channel;
    import io.netty.util.AttributeKey;
    
    import java.util.Map;
    
    public class ChannelUtils {
        public static final int MESSAGE_LENGTH = 16;
        public static final AttributeKey<Map<Integer, Object>> DATA_MAP_ATTRIBUTEKEY = AttributeKey.valueOf("dataMap");
        public static <T> void putCallback2DataMap(Channel channel, int seq, T callback) {
            channel.attr(DATA_MAP_ATTRIBUTEKEY).get().put(seq, callback);
        }
    
        public static <T> T removeCallback(Channel channel, int seq) {
            return (T) channel.attr(DATA_MAP_ATTRIBUTEKEY).get().remove(seq);
        }
    }
    
    

    NettyChannelPool则负责创建netty的channel。

    package nettyinaction.nettyclient.channelpool.client;
    
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.util.Attribute;
    import nettyinaction.nettyclient.channelpool.ChannelUtils;
    import nettyinaction.nettyclient.channelpool.SelfDefineEncodeHandler;
    
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class NettyChannelPool {
        private Channel[] channels;
        private Object [] locks;
        private static final int MAX_CHANNEL_COUNT = 4;
    
        public NettyChannelPool() {
            this.channels = new Channel[MAX_CHANNEL_COUNT];
            this.locks = new Object[MAX_CHANNEL_COUNT];
            for (int i = 0; i < MAX_CHANNEL_COUNT; i++) {
                this.locks[i] = new Object();
            }
        }
    
        /**
         * 同步获取netty channel
         */
        public Channel syncGetChannel() throws InterruptedException {
            //产生一个随机数,随机的从数组中获取channel
            int index = new Random().nextInt(MAX_CHANNEL_COUNT);
            Channel channel = channels[index];
            //如果能获取到,直接返回
            if (channel != null && channel.isActive()) {
                return channel;
            }
    
            synchronized (locks[index]) {
                channel = channels[index];
                //这里必须再次做判断,当锁被释放后,之前等待的线程已经可以直接拿到结果了。
                if (channel != null && channel.isActive()) {
                    return channel;
                }
    
                //开始跟服务端交互,获取channel
                channel = connectToServer();
    
                channels[index] = channel;
            }
    
            return channel;
        }
    
        private Channel connectToServer() throws InterruptedException {
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                     .channel(NioSocketChannel.class)
                     .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                     .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                     .handler(new LoggingHandler(LogLevel.INFO))
                     .handler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) throws Exception {
                             ChannelPipeline pipeline = ch.pipeline();
                             pipeline.addLast(new SelfDefineEncodeHandler());
                             pipeline.addLast(new SocketClientHandler());
                         }
                     });
    
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899);
            Channel channel = channelFuture.sync().channel();
    
            //为刚刚创建的channel,初始化channel属性
            Attribute<Map<Integer,Object>> attribute = channel.attr(ChannelUtils.DATA_MAP_ATTRIBUTEKEY);
            ConcurrentHashMap<Integer, Object> dataMap = new ConcurrentHashMap<>();
            attribute.set(dataMap);
            return channel;
        }
    }
    
    

    先使用构造方法,初始化channels数组,长度为4。NettyChannelPool类有两个关键的地方。
    第一个是获取channel的时候必须加上锁。另外一个是当获取到channel后,利用channel的属性,创建一个Map,后面需要利用这个Map建立消息序列号和callback程序的对应关系。

    //初始化channel属性
            Attribute<Map<Integer,Object>> attribute = channel.attr(ChannelUtils.DATA_MAP_ATTRIBUTEKEY);
            ConcurrentHashMap<Integer, Object> dataMap = new ConcurrentHashMap<>();
            attribute.set(dataMap);
    

    这个map就是我们上面看到的


    这里写图片描述

    Map的put的动作,就是在SocketClient类中的

    ChannelUtils.putCallback2DataMap(channel,seq,callbackService);
    

    执行的。客户端处理消息还需要两个hanlder辅助,一个是处理半包问题,一个是接收服务端的返回的消息。

    SelfDefineEncodeHandler类用于处理半包消息

    package nettyinaction.nettyclient.channelpool;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    import java.util.List;
    
    public class SelfDefineEncodeHandler extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List<Object> out) throws Exception {
            if (bufferIn.readableBytes() < 4) {
                return;
            }
    
            int beginIndex = bufferIn.readerIndex();
            int length = bufferIn.readInt();
    
            if (bufferIn.readableBytes() < length) {
                bufferIn.readerIndex(beginIndex);
                return;
            }
    
            bufferIn.readerIndex(beginIndex + 4 + length);
    
            ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
    
            otherByteBufRef.retain();
    
            out.add(otherByteBufRef);
        }
    }
    
    

    SocketClientHandler类用于接收服务端返回的消息,<font color="red">并且根据消息序列号获取对应的callback程序</font>

    package nettyinaction.nettyclient.channelpool.client;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import nettyinaction.nettyclient.channelpool.ChannelUtils;
    
    public class SocketClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Channel channel = ctx.channel();
    
            ByteBuf responseBuf = (ByteBuf)msg;
            responseBuf.markReaderIndex();
    
            int length = responseBuf.readInt();
            int seq = responseBuf.readInt();
    
            responseBuf.resetReaderIndex();
    
            //获取消息对应的callback
            SocketClient.CallbackService callbackService = ChannelUtils.<SocketClient.CallbackService>removeCallback(channel, seq);
            callbackService.receiveMessage(responseBuf);
        }
    }
    

    到此客户端程序编写完毕。至于服务端的代码,则比较简单,这里直接贴上代码。

    package nettyinaction.nettyclient.channelpool.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import nettyinaction.nettyclient.channelpool.SelfDefineEncodeHandler;
    
    public class SocketServer {
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup parentGroup = new NioEventLoopGroup();
            EventLoopGroup childGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(parentGroup, childGroup)
                               .channel(NioServerSocketChannel.class)
                               .handler(new LoggingHandler(LogLevel.INFO))
                               .childHandler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    protected void initChannel(SocketChannel ch) throws Exception {
                                        ChannelPipeline pipeline = ch.pipeline();
                                        pipeline.addLast(new SelfDefineEncodeHandler());
                                        pipeline.addLast(new BusinessServerHandler());
                                    }
                               });
    
                ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
                channelFuture.channel().closeFuture().sync();
            }
            finally {
                parentGroup.shutdownGracefully();
                childGroup.shutdownGracefully();
            }
        }
    }
    
    package nettyinaction.nettyclient.channelpool.server;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.UnpooledByteBufAllocator;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import nettyinaction.nettyclient.channelpool.ChannelUtils;
    
    public class BusinessServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Channel channel = ctx.channel();
            ByteBuf buf = (ByteBuf)msg;
            //1、读取消息长度
            int length = buf.readInt();
    
            //2、读取消息序列号
            int seq = buf.readInt();
    
            //3、读取消息头部
            byte[] head = new byte[8];
            buf.readBytes(head);
            String headString = new String(head);
    
            //4、读取消息体
            byte[] body = new byte[4];
            buf.readBytes(body);
            String bodyString = new String(body);
    
            //5、新建立一个缓存区,写入内容,返回给客户端
            UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
            ByteBuf responseBuf = allocator.buffer(20);
            responseBuf.writeInt(ChannelUtils.MESSAGE_LENGTH);
            responseBuf.writeInt(seq);
            responseBuf.writeBytes(headString.getBytes());
            responseBuf.writeBytes(bodyString.getBytes());
    
            //6、将数据写回到客户端
            channel.writeAndFlush(responseBuf);
        }
    }
    
    
    

    运行服务端代码和客户端代码,期望的结果是

    10个线程发送消息后,能从服务端获取到正确的对应的返回信息,这些信息不会发生错乱,各个线程都能拿到自己想要的结果,不会发生错读的情况。

    运行后的结果如下


    <font color="red">Thread-3</font>=9<font color="red">Thread-3</font>body,
    <font color="red">Thread-4</font>=8<font color="red">Thread-4</font>body,
    Thread-5=5Thread-5body,
    Thread-6=1Thread-6body,
    Thread-7=3Thread-7body,
    Thread-8=10Thread-8body,
    Thread-9=4Thread-9body,
    Thread-0=7Thread-0body,
    Thread-1=6Thread-1body,
    Thread-2=2Thread-2body


    通过观察结果,可以知道10个线程并发获取channel后,部分线程共享一个channel,但是10个线程能仍然能正确获取到结果。


    代码细节解析


    1、等待服务端的返回

    由于 channel.writeAndFlush是异步的,必须有一种机制来让线程等待服务端返回结果。这里采用最原始的wait和notify方法。当writeAndFlush调用后,立刻让当前线程wait住,放置在callbackservice对象的等待列表中,当服务器端返回消息时,客户端的SocketClientHandler类中的channelRead方法会被执行,解析完数据后,从channel的attr属性中获取DATA_MAP_ATTRIBUTEKEY 这个key对应的map。并根据解析出来的seq从map中获取事先放置好的callbackservice对象,执行它的receiveMessage方法。将receiveBuf这个存放结果的缓存区对象赋值到callbackservice的result属性中。并调用callbackservice对象的notify方法,唤醒wait在callbackservice对象的线程,让其继续往下执行。


    2、产生消息序列号

                            int seq = IntegerFactory.getInstance().incrementAndGet();
    
    

    为了演示的方便,这里是产生单服务器全局唯一的序列号。如果请求量大的话,就算是AtomicInteger是CAS操作,也会产生很多的竞争。建议产生channel级别的唯一序列号,降低竞争。只要保证在一个channel内的消息的序列号是不重复的即可。

    至于其他的一些代码细节,读者可以自己再细看。

    相关文章

      网友评论

          本文标题:netty实战-netty client连接池设计

          本文链接:https://www.haomeiwen.com/subject/nkdetftx.html