美文网首页
【工作】Presto Hive读取代码阅读笔记

【工作】Presto Hive读取代码阅读笔记

作者: 苏柏亚的星空 | 来源:发表于2021-08-18 16:57 被阅读0次

    PS:基于 presto-0.258

    整体流程

    接收语句

    DispatchManager createQueryInternal
        queryPreparer.prepareQuer // preparedQuery [封装Statement]
            dispatchQueryFactory.createDispatchQuery => DispatchQuery 
                resourceGroupManager.submit(preparedQuery.getStatement(), dq, selectionContext, queryExecutor)
    

    提交成功

    InternalResourceGroup run (LocalDispatchQuery)
        InternalResourceGroup startInBackground
            LocalDispatchQuery waitForMinimumWorkers
                LocalDispatchQuery startExecution
                    SqlQueryExecution start
    

    开始执行

        PlanRoot plan = analyzeQuery();
        planDistribution(plan);
        scheduler.start(); // SqlQueryScheduler
    

    一些细节

    hive表的元数据访问

    元数据总体由 HiveMetadata维护,里面包含metastore连接,partitionManager以及一些辅助方法。

    获取表的元数据

    
            StatementAnalyzer visitTable
                TableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle.get());
                    ConnectorMetadata metadata = getMetadata(session, connectorId); -> HiveMetadata
                        解析一些 
                            HiveStorageFormat 
                            properties 
                            partitionedBy 
                            bucketProperty 
                            preferredOrderingColumns 
                            orcBloomFilterColumns
                            orcBloomFilterFfp
                            comment
                        等信息
                    封装到ConnectorTableMetadata
    

    Source Split的切分

            从plan里createStageScheduler
                splitSourceProvider // 这里会出现HiveTableLayoutHandle 描述了表的目录 分区 字段 谓词等 甚至有tableParameters
                    HiveSplitSource allAtOnce //返回的是HiveSplitSource实例 封装了一个AsyncQueue队列去存储split
                        HiveSplitSource getNextBatch //这是每一批
                            BackgroundHiveSplitLoader loadSplits //这里触发分区 文件的迭代 和split计算 。。。
                                StoragePartitionLoader loadPartition //这里有个 DirectoryLister 【重点关注】
                                    这里夹杂几种情况
                                        SymlinkTextInputFormat
                                        shouldUseFileSplitsFromInputFormat(inputFormat))
                                            InputSplit[] splits = inputFormat.getSplits(jobConf, 0); 去拿到split 。。
                                        if (tableBucketInfo.isPresent()) {
                                    不同情况解析split的逻辑不一样
                                    正常情况是非bucket普通表
                                    是用DirectoryLister去list分区目录path 一个文件对应一个InternalHiveSplit(也可能被path filter过滤)
                                        Optional<InternalHiveSplit> createInternalHiveSplit(HiveFileInfo fileInfo
                                        这里的逻辑:
                                            1)提取 List<HostAddress> addresses
                                            2)计算分区这个文件的相对路径 URI relativePath = partitionInfo.getPath().relativize(path.toUri());
    
    • 上面返回的只是InternalHiveSplit 还需要在 HiveSplitSource的getNextBatch里变成HiveSplit

    • queues.borrowBatchAsync(bucketNumber xxx 触发future list目录任务 。。

    • 最后对外输出的是 HiveSplit【封装了一大堆东西。。基于maxSplitSize算出来的 即一个文件 可能有多个】

    • 关于split元数据这块比spark调度要好很多 因为是流式的 不是静态的集合 。。 内存需求会少很多。

    • 最主要的是ListenableFuture<?> future = hiveSplitSource.addToQueue(splits.next());

    • 最后输出的HiveSplit在一个PerBucket + AsyncQueue 组合的复杂的队列缓存结构里

    节点选择 [SOFT Affinity scheduler]

    • 这里实际上是用path的哈希取模所有节点 得到固定的目标节点映射列表
      (好像忽略了文件实际位置。。但是因为这有缓存 包括文件的 所以可能是综合考虑 如果是hard的话 是不是可能不均衡 ?)
    • 貌似只适合于存算分离的架构。。
    • 如果是存算一体的 建议选HARD Affinity ,即类似spark的preference local node

    缓存(Raptorx中的特性)

    • 1)文件 cache 【coordinater上 放内存】【done】
                本质是guava的Cache<Path, List<HiveFileInfo>> cache类实例 分区目录也假设为不动的。。
                This can only be applied to sealed directories
                    见:StoragePartitionLoader.createInternalHiveSplitIterator 
                        boolean cacheable = isUseListDirectoryCache(session);
                        if (partition.isPresent()) {
                            // Use cache only for sealed partitions
                            cacheable &= partition.get().isSealedPartition();
                        }
    
                文件的list是根据 hdfs 的 remoteIterator 迭代的 。。不像spark 跑了并行任务去获取location信息 全部一起缓存 。。
    
    • 2)tail/footer cache【在节点上 也是放内存】
                注:OrcDataSource这个类和tail/footer没关系 只是封装了流读取的一些入口 
                这个类是必须要打开至少一次ORC文件的 
    
                HiveClientModule -> createOrcFileTailSource 里决定了是否启用缓存 。。
                    Cache<OrcDataSourceId, OrcFileTail> cache
    
                具体来说
    
                OrcReader里面的两个主要元数据 都来自 orcFileTailSource提供的OrcFileTail // Slice 里保存了 byte[]
                    private final Footer footer; // 文件级别的统计 stripe摘要
                    private final Metadata metadata; //stripe级的统计 
                    
                还有stripe的StripeMetadataSource -> 这个类提供获取StripeFooter的方法 
                    (StripeFooter 包含一堆Stream 即各列数据信息 以及索引信息 StripeReader会用 selectRowGroups )
                    这里面会判断是否要缓存isCachedStream 
                    return streamKind == BLOOM_FILTER || streamKind == ROW_INDEX; 
    
                注意:这个方法调用时是传入OrcDataSource的 所以能拿到ORC文件流 但是之后就不需要这个流了。seek 等也不需要了。
                OrcFileTail orcFileTail = orcFileTailSource.getOrcFileTail(OrcDataSource orcDataSource)
    

    谓词裁剪(plan层)

    • 1)分区裁剪
                SqlQueryExecution analyzeQuery
                    logicalPlanner plan
                        IterativeOptimizer【这个类类似于scala里面的模式匹配 不同的规则去catch其对应的语法树节点去执行逻辑】
                        而所有的规则都在 PlanOptimizers 去添加 每个匹配逻辑是一个Rule类的实现
                            如PickTableLayout 有一个规则是pickTableLayoutForPredicate
                                hivePartitionResult = partitionManager.getPartitions(
                                    这里如果有谓词 where 就会把tablescan替换成FilterNode(里边包含tablescan)
                这样就完成了查询计划的替换
    
                分区裁剪过程【这里很抽象 谓词传递 命名很不好理解 。。。】
    
    • 2)谓词表示体系

    重要
    这里要解释一个较Domain的类。。实际上就是表示某个值的范围(离散值,范围,无穷等)
    以及其服务类:TupleDomain 。。是限定了字段 + 值范围的组合
    (PS:这命名实在让人别扭。)

                参考 TestTupleDomainFilter 
                还搞了个缓存去防止多次解析 。。
                TupleDomainFilterCache -> Converting Domain into TupleDomainFilter is expensive, hence, we use a cache keyed on Domain
    
                传递到下游的时候 是TupleDomain<Subfield> domainPredicate 
                这里面Subfield是一个可以多层表达的字段表示
                TupleDomain 是一个泛型Map 大概就是<字段 值范围>的一个模式。
    
                Constraint<ColumnHandle> 
                    // 这又是另一个表示条件的类 。。里面封装了 TupleDomain<T> summary; 
                    // 和另一个 Optional<Predicate<Map<T, NullableValue>>> predicate 这个是Java Function接口里面的Predicate 
                    // 有几个主要方法 and/or/test -> 得到返回值是Boolean抽象 。
    
                这里面涉及到的泛型有
                    ColumnHandle -> 一个空接口 这是presto spi 定义的 各个connector可能有不同实现 
                    Map<Column, Domain> effectivePredicate -> 这个Column就是Hive元数据里Table下的列,获取分区列表时候用到
                    HiveColumnHandle -> hive的实现 
                    HivePartition -> Map<ColumnHandle, NullableValue> getKeys() //表示field -> value 
    

    读split逻辑

            具体的task读的是 hiveSplit 
    
            弄清楚split切分逻辑【】
    
            worker上的调用链:
            PrioritizedSplitRunner process
                DriverSplitRunner processFor
                    Driver processInternal
                        xxOperator getOutput -> 触发计算
                            HivePageSourceProvider createHivePageSource
                                OrcBatchPageSourceFactory createOrcPageSource
                                    之后就是ORC的解析 OrcReader -> OrcRecordReader 去读取到presto的page相关逻辑了。
    

    是否缓存文件footer元数据 不只是开启了cache配置 还需要选择的split节点在期望节点里 才会去缓存 。即 和nodeSelector策略有关。而且这个缓存 是以每个文件粒度调度的 。(包含在hiveSplit里面)

    梳理stage/task/driver/split的并发关系

    • Query 根据SQL语句生成查询执行计划,进而生成可以执行的查询(Query),一个查询执行由Stage、Task、Driver、Split、Operator和DataSource组成
    • Stage 执行查询阶段 Stage之间是树状的结构 ,RootStage 将结果返回给coordinator ,SourceStage接收coordinator数据 其他stage都有上下游 stage分为四种 single(root)、Fixed、source、coordinator_only(DML or DDL)
    • Exchange 两个stage数据的交换通过Exchange 两种Exchange ;Output Buffer (生产数据的stage通过此传给下游stage)Exchange Client (下游消费);如果stage 是source 直接通过connector 读数据
    • 一个Task包含一或多个Driver,是作用于一个Split的一系列Operator集合。一个Driver用于处理一个Split产生相应输出,输出由Task收集并传递给下游Stage中的Task

    核心问题
    1)task个数
    正常就是1个stage节点个数个,而presto会尽可能使用资源。每个stage每个节点都有一个task。(当然是非root stage)
    2)driver个数
    其实就是split个数
    3)split个数(根据stage的类型不同而不同)

        single(root)-> 1个
        coordinator only -> 元数据操作 也是一个
        如果是source的stage -> 由connector的splitmanager决定
        一个文件最少一个split
        remainingInitialSplits 有个参数影响了maxSplitBytes // 如果计算次数少于remainingInitialSplits 会采用 maxInitialSplitSize
            否则用配置的maxSplitSize去滚动每个文件生成HiveSplit
                    (最后2个split会平衡 避免过小的split 导致时间不太均衡...)
          hive.max-split-size
          hive.max-initial-splits(默认200 不调节也行。。需要调节 maxInitialSplitSize 如果不设置就是默认 maxSplitSize/2 )
          hive.max-initial-split-size
    
        如果是中间stage -> hash_partition_count 这个session 参数?还是 task.concurrency ?
    

    举例说明:对与读取hive表来说,1G的数据,设置 hive.max-split-size = 64MB,hive.max-initial-split-size= 64MB,最后才会得到期望的1G/64MB个source split

    线程并发模型

    • task.max-worker-threads // worker启动的线程池的大小,即工作线程个数
    • task.concurrency // set session task_concurrency=1; 这个影响 agg/join 的并发
    • task.min-drivers // 默认是 task.max-worker-threads x2 ,worker最少在执行的split数,如果有足够资源和任务
    • task.min-drivers-per-task // task最少并行执行的split数
    • initial_splits_per_node // 。。(应该是调度时候)
    在taskExecutor的enqueueSplits里
            for (SplitRunner taskSplit : taskSplits) {
                xxx
                scheduleTaskIfNecessary(taskHandle); //按task级别调度 会用到 task.min-drivers-per-task 即可并发运行的split 
    
                addNewEntrants(); 
                //在资源变动( 如task remove/split finish/等时候 去尝试去调度更多split 【这里比较模糊。。】用到 task.min-drivers 参数 )
                //比如 task.min-drivers-per-task 是4 task.min-drivers是10 则相当于进行了2次调度 。。
            }
    
        在Presto中有一个配置query.execution-policy,它有两个选项,一个是all-at-once,另一个是 phased // set session execution_policy='phased'; 
    
        线程和并发模型:
            SqlTaskExecutionFactory -> SqlTaskExecution
            Coordinator分发Task到对应Worker,通过HttpClient发送给节点上TaskResource提供的RESTful接口
            Worker启动一个SqlTaskExecution对象或者更新对应对象需要处理的Split
                这里能看到每个split其实对应一个driverSplitRunner(这个类里面有DriverSplitRunnerFactory)
                        // Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination.
                        ImmutableList.Builder<DriverSplitRunner> runners = ImmutableList.builder();
                        for (ScheduledSplit scheduledSplit : pendingSplits.removeAllSplits()) {
                            // create a new driver for the split
                            runners.add(partitionedDriverRunnerFactory.createDriverRunner(scheduledSplit, lifespan));
                        }
                        enqueueDriverSplitRunner(false, runners.build());
    
                        在DriverSplitRunner的Process方法里
                        this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
    
            TaskExecutor 封装了TaskRunner(执行split的地方 PrioritizedSplitRunner(实现类是DriverSplitRunner))
            TaskExecutor 里具体执行任务是是一个线程池
                    config.getMaxWorkerThreads(), // 这个是启动的固定线程池 。。不同SQL不同task都在里面执行 。。线程池大小是固定的:task.max-worker-threads
                    config.getMinDrivers(),// 这个默认是上面 x 2 不知有什么用?
                    config.getMinDriversPerTask(), // ?
                    config.getMaxDriversPerTask(),
            PrioritizedSplitRunner实现了时间片机制(固定1秒去执行split 挑选优先级)
            这种调度是不是牺牲了部分性能 换取迭代 优先级 多租户 多任务管理 结果快速反馈机制。。。
            
            PrioritizedSplitRunner里实际运行的是Driver,封装的一堆Operatior 如表Scan/filter/limit/taskoutPut 作用在split上
    
    
    

    相关文章

      网友评论

          本文标题:【工作】Presto Hive读取代码阅读笔记

          本文链接:https://www.haomeiwen.com/subject/nawgbltx.html