美文网首页
Doris 源码分析 (五) gRpc 与 thrift 接口

Doris 源码分析 (五) gRpc 与 thrift 接口

作者: 走在成长的道路上 | 来源:发表于2021-10-14 12:07 被阅读0次

FEBE 交互

任务队列及交互

FE 端与 BE 端均存在一个任务 Queue,如上图所示,从当前版本来看 thrift 实现的与 BE 之间的交互主要是用 AgentClient 来承载,而查看代码仅用于 snapshot 的管理过程,其他的并非通过该接口实现。

这里存在疑问 gRpcthrift 接口混着用的目的,不太明确,有了解的小伙伴如果方便可以告知下我哟, 现谢谢咯

SQL 执行过程

Doris 源码分析 (二) 代码结构分析 中的主架构图所示,大多数是来自客户端请求,通过 QeServer 中封装的 MysqlServer 来接受 Mysql 客户端的请求,具体流程如下:

Fe 端主要以 java 为主,如下:

  • ReadListener 接受处理数据请求传递给 ConnectProcessor.processOnce() 函数
  • 通过 ConnectProcessor.dispatch() 函数将命令进行任务区分,以 query 为例
  • 通过 ConnectProcessor.handleQuery() 函数将 originStmt 中含有的 sql 转为 StatementBase 实例列表
  • 遍历 StatementBase 实例列表,将其逐一构建 StmtExecutor 实例并执行
  • StmtExecutor.execute() 函数中将 StatementBase 实例解析生成执行计划(详细参见: StmtExecutor.handleQueryStmt() 实现)
  • 根据 planner 等信息构建 Coordinator 实例并执行执行计划
  • Coordinator.exec() 函数中,可以看到,将planner 进行分布式拆解,并生成为 PlanFragment 然后通过调用 Coordinator.sendFragment() 将所有的 PlanFragment 任务进行下发到 BE 端,同时包含状态的维护过程。
  • Coordinator.sendFragment() 中对每个 PlanFragment 通过 toThrift() 转化为 TExecPlanFragmentParams 然后封装为 BackendExecState 实例,然后调用 BackendExecState.execRemoteFragmentAsync() 通过 BackendServiceProxy.execPlanFragmentAsyncBRpcService 后端服务发起 execPlanFragment 请求(后端使用 PInternalServiceImpl 来接受处理请求内容)

BackendServiceProxy 中主要是与 Be 端进行交互部分,接下拉是 Be 端处理过程,具体如下:

  • PInternalServiceImpl 实现中_exec_env->fragment_mgr()->exec_plan_fragment(t_request) 可以看到将所有请求直接交付给 FragmentMgr 实例来完成对应的操作
  • FragmentMgr 中,先判断是否为事务操作, 并将任务传递给下游继续处理 (接下来,以非事务逻辑为例)
1. 事务操作转换为 `StreamLoadContext` 键后交由 `exec_env->load_stream_mgr()->put(stream_load_cxt->id, pipe)` 进行管理,然后由 `_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt, pipe)` 进行执行
2. 非事务操作,直接调用 `exec_plan_fragment()` 函数执行即可
  • exec_plan_fragmentTExecPlanFragmentParams 数据封装为 FragmentExecState 实例,将其转化为 PlanFragmentExecutor 参数(在
    PlanFragmentExecutor::prepare() 函数中2通过 ExecNode::create_tree() 创建执行计划树 ExecNode ),然后将其放入到线程池中进行执行即可:
// 从代码可看出执行过程由 FragmentMgr::_exec_actual() 函数来完成
// 其中主要是构建 PlanFragmentExecutor 实例并管理所有的 PlanFragment 执行状态
_thread_pool->submit_func(
            std::bind<void>(&FragmentMgr::_exec_actual, this, exec_state, cb))
  • FragmentMgr::_exec_actual() 函数中直接会调用 FragmentExecState.execute()PlanFragmentExecutor.open() 函数进行调用, 最终调用 ExecNode.get_next() 函数来获取执行计划结束之后的数据内容

Be 端的执行计划树节点如下(参见 ExecNode::create_node() 函数实现):

Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode,
                             const DescriptorTbl& descs, ExecNode** node) {
    std::stringstream error_msg;

    VLOG_CRITICAL << "tnode:\n" << apache::thrift::ThriftDebugString(tnode);
    switch (tnode.node_type) {
    case TPlanNodeType::CSV_SCAN_NODE:
        *node = pool->add(new CsvScanNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::MYSQL_SCAN_NODE:
#ifdef DORIS_WITH_MYSQL
        *node = pool->add(new MysqlScanNode(pool, tnode, descs));
        return Status::OK();
#else
        return Status::InternalError(
                "Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON");
#endif
    case TPlanNodeType::ODBC_SCAN_NODE:
        *node = pool->add(new OdbcScanNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::ES_SCAN_NODE:
        *node = pool->add(new EsScanNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::ES_HTTP_SCAN_NODE:
        *node = pool->add(new EsHttpScanNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::SCHEMA_SCAN_NODE:
        *node = pool->add(new SchemaScanNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::OLAP_SCAN_NODE:
        if (state->enable_vectorized_exec()) {
        } else {
            *node = pool->add(new OlapScanNode(pool, tnode, descs));
        }
        return Status::OK();

    case TPlanNodeType::AGGREGATION_NODE:
        if (state->enable_vectorized_exec()) {
        } else {
            if (config::enable_partitioned_aggregation) {
                *node = pool->add(new PartitionedAggregationNode(pool, tnode, descs));
            } else {
                *node = pool->add(new AggregationNode(pool, tnode, descs));
            }
        }
        return Status::OK();

    case TPlanNodeType::HASH_JOIN_NODE:
        *node = pool->add(new HashJoinNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::CROSS_JOIN_NODE:
        if (state->enable_vectorized_exec()) {
        } else {
            *node = pool->add(new CrossJoinNode(pool, tnode, descs));
        }
        return Status::OK();

    case TPlanNodeType::MERGE_JOIN_NODE:
        *node = pool->add(new MergeJoinNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::EMPTY_SET_NODE:
        *node = pool->add(new EmptySetNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::EXCHANGE_NODE:
        if (state->enable_vectorized_exec()) {
        } else {
            *node = pool->add(new ExchangeNode(pool, tnode, descs));
        }
        return Status::OK();

    case TPlanNodeType::SELECT_NODE:
        *node = pool->add(new SelectNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::OLAP_REWRITE_NODE:
        *node = pool->add(new OlapRewriteNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::SORT_NODE:
        if (state->enable_vectorized_exec()) {
        } else {
            if (tnode.sort_node.use_top_n) {
                *node = pool->add(new TopNNode(pool, tnode, descs));
            } else {
                *node = pool->add(new SpillSortNode(pool, tnode, descs));
            }
        }
        return Status::OK();
    case TPlanNodeType::ANALYTIC_EVAL_NODE:
        *node = pool->add(new AnalyticEvalNode(pool, tnode, descs));
        break;

    case TPlanNodeType::MERGE_NODE:
        *node = pool->add(new MergeNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::UNION_NODE:
        if (state->enable_vectorized_exec()) {
        } else {
            *node = pool->add(new UnionNode(pool, tnode, descs));
        }
        return Status::OK();

    case TPlanNodeType::INTERSECT_NODE:
        *node = pool->add(new IntersectNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::EXCEPT_NODE:
        *node = pool->add(new ExceptNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::BROKER_SCAN_NODE:
        *node = pool->add(new BrokerScanNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::REPEAT_NODE:
        *node = pool->add(new RepeatNode(pool, tnode, descs));
        return Status::OK();

    case TPlanNodeType::ASSERT_NUM_ROWS_NODE:
        *node = pool->add(new AssertNumRowsNode(pool, tnode, descs));
        return Status::OK();

    default:
        map<int, const char*>::const_iterator i =
                _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
        const char* str = "unknown node type";

        if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
            str = i->second;
        }

        error_msg << str << " not implemented";
        return Status::InternalError(error_msg.str());
    }

    return Status::OK();
}

相关文章

  • Doris 源码分析 (五) gRpc 与 thrift 接口

    FE 与 BE 交互 在 FE 端与 BE 端均存在一个任务 Queue,如上图所示,从当前版本来看 thrift...

  • Thrift学习

    Thrift源码剖析 Thrift源码分析及一个完整的例子 CSDN Thrift源码分析 Thrift二进制序列...

  • RPC Architecture

    The Architecture Thrift GRPC

  • 微服务开发

    微服务业务分析 Thrift安装和验证 安装Thrift 验证安装:验证安装 vi demo.thrift定义接口...

  • Fabric源码基础-grpc的使用03

    Fabric的节点通过grpc向内部或外部提供接口,在学习源码之前,需要对grpc的基本使用有所了解,并了解如何在...

  • Fabric源码基础-grpc的使用01

    Fabric的节点通过grpc向内部或外部提供接口,在学习源码之前,需要对grpc的基本使用有所了解,并了解如何在...

  • Fabric源码基础-grpc的使用02

    Fabric的节点通过grpc向内部或外部提供接口,在学习源码之前,需要对grpc的基本使用有所了解,并了解如何在...

  • thrift源码分析-架构设计

    已迁移至掘金社区thrift源码分析-架构设计 前言 thrift是一个轻量级、跨语言、提供代码生成机制的rp...

  • thrift入门教程

    概述 本文是入门教程,想要了解thrift的源码实现可以移步我的CSDN专栏thrift源码解析 Thrift最初...

  • Thrift源码分析-Processor

    [TOC] TProcessor TProcessor定义了一个接口,负责从输入中获取请求信息,调用用户自己实现的...

网友评论

      本文标题:Doris 源码分析 (五) gRpc 与 thrift 接口

      本文链接:https://www.haomeiwen.com/subject/lnnyoltx.html