美文网首页大数据架构社区
第四届阿里中间件性能挑战赛总结和思考

第四届阿里中间件性能挑战赛总结和思考

作者: maskwang520 | 来源:发表于2018-07-14 11:38 被阅读577次

    随着复赛今天截止,为期两个月的挑战赛也终于结束了.这两个月里很大一部分时间花在这上面,有过欢乐,也有为分数刷不上去而发愁.作为第一次参加比赛,对比赛结果还算是满意吧.而在这个过程中,对多线程知识,netty,nio等知识的深入认识.下面是对比赛的总结和思考.排名如下

    image.png
    初赛:《Service Mesh Agent for Apache Dubbo (Incubating) 》
    • 赛题的思考

    题目看起来是让我们实现一个rpc agent.因为官方已经给出了consumer和provider,选手就是要实现两个代理,第一个代理是consumer-agent,负责把consumer的调用通过自定义协议发动给Provider-agent.第二个代理就是provider-agent.他的任务就是接收Consumer-agent通过网络发动过来的消息,然后通过dubbo调用provider.最后把结果返回给consumer-agent.整个系统的调用图如下:


    image.png
    • 设计和实现.

    整个调用过程如下所示:


    未命名文件.jpg
    • ①处这里采用Netty Http应用作为服务端,处理Consumer发送过来的http请求.
    • ②处这里就是在Consumer-agent开启Netty Client,Provider-agent端开启Netty Server进行请求和响应.
    • ③ Provider-agent通Netty Client去调用Provider的服务.
    • ④ Provider把结果返回给Consumer-agent.
    • ⑤ Consumer-agent把结果封装成HttpResponse返回给客户端.

    Provider提供的服务如下:

    public interface IHelloService {
    
      /**
       * 计算传入参数的哈希值.
       *
       * @param str 随机字符串
       * @return 该字符串的哈希值
       */
      int hash(String str);
    }
    

    整个代码我放在github中,这里不对整个代码做分析,只分析出关键的点.

    负载均衡

    如下图,3个provider的负载能力如下,那么我们可以选择负载均衡算法的时候,把这个考虑进去.我选择是随机加权算法.根据大家的一致认同,small:meddium:large = 1:2:2.


    Selection_020.png

    所有的服务都运行在docker环境中,而用的etcd作为服务发现的组件.事先并不知道那台机器是small,large,meddium.那么我们可以考虑把参数加上启动参数.一旦服务启动,这些信息,都会注册到etcd中.然后取出来,做相应的判断就行.


    Selection_021.png

    在etcd做服务发现的时候,把型号信息转换成比例注册上去

    //small 1; meddium和large是2.
    if(val.equals("small")) {
                    endpoints.add(new Endpoint(host, port, 1));
                }else{
                    endpoints.add(new Endpoint(host, port, 2));
                }
    

    Consumer在选择那个Provider的时候就可以根据以上的信息,轮询选择一个.

    //向endpoints加入5个实例,small一个,meddium和large都是2个.
    if (null == endpoints) {
                synchronized (ConsumerAgentHttpServerHandler.class) {
                    if (null == endpoints) {
                        endpoints = RegistryInstance.getInstance().find("com.alibaba.dubbo.performance.demo.provider.IHelloService");
                        ListIterator<Endpoint> it = endpoints.listIterator();
                        while (it.hasNext()){
                            Endpoint temp = it.next();
                            if(temp.getSize()==2) {
                                it.add(temp);
                            }
                        }
                    }
                }
            }
            int id = count.getAndIncrement();
            if(id>=4){
                count.set(0);
                id=4;
            }
            // 简单的负载均衡,随机取一个
            Endpoint endpoint = endpoints.get(id);
    

    这样一个随机加权的算法就实现了.

    EventLoop复用

    当我们创建Provident-agent的时候,我们是否可以考虑Eventloop的复用,这样每个请求从接收到发动都是用同一个线程处理的,没有上下文切换.另外一个,这样做好处,把channel和Eventloop绑定起来,也就限定了channel的个数,相当于做了一个channel的缓存(因为channel的数量得控制).一举两得.

    private void providerServerStart(int port) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup(4);
            putMap(workerGroup);
            try {
                ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .localAddress(new InetSocketAddress(port))
                        .childHandler(new ProviderAgentHttpServerChannelInitializer());
                LOGGER.info("provider netty server start");
                ChannelFuture future = sbs.bind(port).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
     //预先把channel设置好,复用上面的eventloop.
        public void putMap(EventLoopGroup group) {
            for (EventExecutor executor : group) {
                try {
                    map.put((EventLoop) executor, connecManager.getChannel( (EventLoop) executor));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    回调的设计

    当Provider返回给结果后,那我们应该如何把结果返回给Consumer-agent呢,也就是它如何记住之前的通道.这里采用的是一个回调的设计.这样就能够记住上下文,也就是记住过来时候的ChannelHandlerContext,通过这个把结果返回回去.

     @Override
        protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
            Map<String, String> data = HttpParser.parse(msg);
            handle(new RequestWrapper(data.get("interface"),
                    data.get("method"),
                    data.get("parameterTypesString"),
                    data.get("parameter")), (result) -> {
                FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(result.getBytes()));
                response.headers().set(CONTENT_TYPE, "text/plain");
                response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
                ctx.write(response);
                ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
            },ctx.channel().eventLoop());
        }
    

    当结果返回后,通过回调调用回调函数的逻辑

    //拿到结果后,回调
     public void done(RpcResponse response){
            this.response = response;
            sender.accept(new String(response.getBytes()).trim());
        }
    
    • 反思和思考
    可以批量flush,批量decode(来源于朋友徐靖峰的思想)
    image.png

    Netty 提供了一个方便的解码工具类 ByteToMessageDecoder ,如图上半部分所示,这个类具备 accumulate 批量解包能力,可以尽可能的从 socket 里读取字节,然后同步调用 decode 方法,解码出业务对象,并组成一个 List 。最后再循环遍历该 List ,依次提交到 ChannelPipeline 进行处理。此处我们做了一个细小的改动,如图下半部分所示,即将提交的内容从单个 command ,改为整个 List 一起提交,如此能减少 pipeline 的执行次数,同时提升吞吐量。这个模式在低并发场景,并没有什么优势,而在高并发场景下对提升吞吐量有不小的性能提升。

    负载均衡

    上面我的做法有点硬编码的意思,而且随机的话,而且不确定性有点大.那是是否可以考虑根据调用的次数来做负载均衡,也就是说,给句每个Provider请求的次数,尽量把请求分给请求量少的Provider,当然这个量还是得加权.实现的复杂性有点高.

    限流

    经过朋友提醒,是否可以尝试下,限流,也就是说不放那么多请求进取,只通过一部分来请求,待完成之后,再放另外一部分,这个可以尝试用令牌桶来实现.处于理论阶段,没实际尝试过.

    编码

    我做的处理里面都是采用的jdk自带的编码方式.如果采用kryo,protobuf的方式,性能上也会有一定的提升.

    我的代码:https://github.com/maskwang520/springforall.git


    复赛:实现一个进程内的队列引擎,单机可支持100万队列以上,能够承受2亿消息的存取.
    • 赛题的思考
      题目要求有5个:
      1.各个阶段线程数在20~30左右
      2.发送阶段:消息大小在50字节左右,消息条数在20亿条左右,也即发送总数据在100G左右
      3.索引校验阶段:会对所有队列的索引进行随机校验;平均每个队列会校验1~2次;
      4.顺序消费阶段:挑选20%的队列进行全部读取和校验;
      5.发送阶段最大耗时不能超过1800s;索引校验阶段和顺序消费阶段加在一起,最大耗时也不能超过1800s;超时会被判断为评测失败。

    100万个queue,20亿消息,如果放内存是完全不现实的,内存肯定会爆.接下来自然想到把消息存放到文件中,内存中只放索引就行.但是内存存放索引,是20亿消息的消息,索引自然是由(消息起始位置+长度)构成.但是这样的Map<queue,Index>存放的索引有20亿,疯狂的FullGc是不可避免的,Full Gc一多,Tps自然上不去.后来想到,消息按块存储(多个消息存在一个块中),索引的时候按块索引.这样就能把Map里面存的只有100万(queue的个数),示意图如下:

    未命名文件.png
    Block的设计
    public class Block {
        //开始位置
        public long startPosition;
    
        //长度
        public int length;
        //Block中已经存放的消息的条数
        public int size;
    
        public Block(Long startPosition, int length) {
            this.startPosition = startPosition;
            this.length = length;
            this.size = 0;
        }
    }
    
    • 因为一个queue中可能有多个Block,在消息检索的时候给出的是在队列中的偏移量,那么size这个域方便后面消息检索的时候判断在哪个block中.

    • 消息缓存的设计
      因为每当来一个消息都要flush到文件中去,这样Io的时间就太多了,题目的关键点在于如何减少Io的时间.所以可以采用消息的缓存来处理.每当来一个消息,就放入缓存中,当缓存中超过10次消息的时候,就同步写入到文件中去.这样的话,相当于每10次写,才做一次Io.

    public class DataCache {
       //消息缓存
        public ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
        public int count;
    }
    

    这里将缓存的大小设置为1024Byte,当然你也可以设置成更大.这里有个小Tips.缓存的消息最好设置成Block的大小.这样当缓存满了之后,就可以直接写入到一个Block块中,而不用接着上一个Block写(上面一个Block写),这样设计,写入更简单,每次flush到文件的时候,只要新开辟一个新的Block,而不用管之前的Block.

        //以块为索引,一个队列可能有多个块,且块的写入有顺序,所有用List来存Block.
        public Map<String, List<Block>> blockMap = new ConcurrentHashMap<>();
        public Map<String, DataCache> cacheMap = new ConcurrentHashMap<>();
    
    消息的存储

    因为不可能每个队列的消息都用一个文件来存放,所以这里用hash来把文件限定在32个.一个queue的Block必须在一个文件里面.不同queue的Block可以在一个文件里面.

     //根据队列的名字hash到对应的文件中,共32个文件
        int hashFile(String queueName) {
            return queueName.hashCode() & 0x1f;
            //return 0;
        }
    

    还存在一个问题就是,往一个文件中写入消息的时候,什么位置写,因为按块写.所以已经写过的块不能用.只能从新开辟一个块,块与块之间尽可能紧凑.

        //block的大小为1024,根据当前文件已经存在的写的位置,找到下一个比该位置大的,且是1024的倍数
        public long getLeastBlockPosition(long length) {
            if (length == 0) {
                return 0;
            }
            int initSize = 1 << 10;
            int i = 1;
            while (i * initSize <= length) {
                i++;
            }
            //定义到可用的块的第一个位置
            return i * initSize;
        }
    
    消息存放

    这里采用的是原生的filechannel去读写.本打算用mmap去写的,经过一位朋友提醒,mmap在这个场景下不合适.原因是不是长期读写,写完就释放,不是长期的.

    public void put(String queueName, byte[] message) {
            int hash = hashFile(queueName);
            String path = DIRPATH + hash + ".txt";
            lock.lock();
            //创建文件
            File file = new File(path);
            if (!file.exists()) {
                try {
                    file.createNewFile();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            if (!blockMap.containsKey(queueName)) {
                List<Block> list = new ArrayList();
                blockMap.put(queueName, list);
            }
    
            if (!cacheMap.containsKey(queueName)) {
                DataCache dataCache = new DataCache();
                cacheMap.put(queueName, dataCache);
            }
    
            DataCache dataCache = cacheMap.get(queueName);
          //每10次flush到文件中
            if (dataCache.count == 10) {
                FileChannel fileChannel = null;
                // long fileLength = 0;
                try {
                    fileChannel = new RandomAccessFile(file, "rw").getChannel();
                    //fileLength = raf.length();
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
                long blockPosition;
                try {
                    blockPosition = getLeastBlockPosition(getLeastBlockPosition(fileChannel.size()));
                    Block block = new Block(blockPosition, dataCache.dataBuffer.position());
                    block.size = 10;
                    blockMap.get(queueName).add(block);
                    dataCache.dataBuffer.flip();
                    fileChannel.position(blockPosition);
                    fileChannel.write(dataCache.dataBuffer);
                    dataCache.dataBuffer.clear();
    
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    try {
                        fileChannel.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            } else {
                //放入缓存中
                dataCache.dataBuffer.putInt(message.length);
                dataCache.dataBuffer.put(message);
                dataCache.count++;
            }
    
    
            lock.unlock();
    
    
        }
    
    消息获取

    消息获取的思路是根据队列名,找到该队列对应的List<Block>,然后根据偏移量,找到属于哪个block.找到具体的Block后,然后遍历Block,找到偏移量的开始位置,取相应数量的消息即可.

    public Collection<byte[]> get(String queueName, long offset, long num) {
    
            //队列不存在
            if (!blockMap.containsKey(queueName)) {
                return EMPTY;
            }
            //消息集合
            List<byte[]> msgs = new ArrayList();
            List<Block> blocks = blockMap.get(queueName);
    
            int hash = hashFile(queueName);
            String path = DIRPATH + hash + ".txt";
            FileChannel fileChannel = null;
            int size = blocks.get(0).size;
            int eleNum = 0;
            //记录了目标block所在的下标
            int blockNum = 0;
            lock.lock();
            try {
                fileChannel = new RandomAccessFile(new File(path), "rw").getChannel();
                for (int i = 1; i < blocks.size() && size < offset; i++, blockNum++) {
                    size += blocks.get(i).size;
                }
    
                size = size - blocks.get(blockNum).size;
    
    
                for (int i = blockNum; i < blocks.size(); i++) {
                    //size+=blocks.get(i).size;
                    // size-=blocks.get(i).size;
                    int length = blocks.get(i).length;
                    MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, blocks.get(i).startPosition, length);
                    int sum = 0;
                    while (sum < length && size < offset) {
                        int len = buffer.getInt();
                        sum += 4;
                        sum += len;
                        buffer.position(sum);
                        size++;
                    }
    
                    if (size >= offset) {
                        while (buffer.position() < length && eleNum <= num) {
                            int len = buffer.getInt();
                            byte[] temp = new byte[len];
                            buffer.get(temp, 0, len);
                            eleNum++;
                            msgs.add(temp);
                        }
                        if (eleNum > num) {
                            break;
                        }
                    }
    
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
    
                try {
                    fileChannel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                lock.unlock();
    
            }
    
            return msgs;
        }
    
    思考
    • 将所有的ByteBuf池化,包括缓存的那部分ByteBuf.通过ThreadLocal,将ByteBuf与线程绑定起来,后面申请Buffer,直接从对应的线程里面去申请即可.
    • 在写入的时候,可以不同步写,实现异步写.由一个线程去异步flush到文件里面
    • 当读取消息块达到临界点的时候,由单线程申请buffer资源来预读后面的消息块存入,并缓存.
      我的代码:https://github.com/maskwang520/messagequeue.git

    相关文章

      网友评论

      本文标题:第四届阿里中间件性能挑战赛总结和思考

      本文链接:https://www.haomeiwen.com/subject/pikdpftx.html