From 09e03247ec6159e4d4d115ca0ee1fdb1b83879da Mon Sep 17 00:00:00 2001 From: zhiqqqq Date: Fri, 22 Sep 2023 08:54:57 +0800 Subject: [PATCH] [chore](readability) Better readability of ExecNode.cpp #24733 --- be/src/exec/exec_node.cpp | 84 +++++++++++---------------------- be/src/exec/exec_node.h | 10 ++-- be/src/vec/exec/vunion_node.cpp | 4 +- 3 files changed, 34 insertions(+), 64 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 21f4ce9121..c21ae2e5f5 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -248,49 +248,58 @@ Status ExecNode::create_tree(RuntimeState* state, ObjectPool* pool, const TPlan& } Status ExecNode::create_tree_helper(RuntimeState* state, ObjectPool* pool, - const std::vector& tnodes, + const std::vector& thrift_plan_nodes, const DescriptorTbl& descs, ExecNode* parent, int* node_idx, ExecNode** root) { // propagate error case - if (*node_idx >= tnodes.size()) { + if (*node_idx >= thrift_plan_nodes.size()) { // TODO: print thrift msg return Status::InternalError("Failed to reconstruct plan tree from thrift."); } - const TPlanNode& tnode = tnodes[*node_idx]; - int num_children = tnodes[*node_idx].num_children; - ExecNode* node = nullptr; - RETURN_IF_ERROR(create_node(state, pool, tnodes[*node_idx], descs, &node)); + const TPlanNode& cur_plan_node = thrift_plan_nodes[*node_idx]; + int num_children = cur_plan_node.num_children; - // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); + // Step 1 Create current ExecNode according to current thrift plan node. + ExecNode* cur_exec_node = nullptr; + RETURN_IF_ERROR(create_node(state, pool, cur_plan_node, descs, &cur_exec_node)); + + // Step 1.1 + // Record current node if we have parent or record myself as root node. if (parent != nullptr) { - parent->_children.push_back(node); + parent->_children.push_back(cur_exec_node); } else { - *root = node; + *root = cur_exec_node; } + // Step 2 + // Create child ExecNode tree of current node in a recursive manner. for (int i = 0; i < num_children; i++) { ++*node_idx; - RETURN_IF_ERROR(create_tree_helper(state, pool, tnodes, descs, node, node_idx, nullptr)); + RETURN_IF_ERROR(create_tree_helper(state, pool, thrift_plan_nodes, descs, cur_exec_node, + node_idx, nullptr)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail - if (*node_idx >= tnodes.size()) { + if (*node_idx >= thrift_plan_nodes.size()) { // TODO: print thrift msg return Status::InternalError("Failed to reconstruct plan tree from thrift."); } } - RETURN_IF_ERROR(node->init(tnode, state)); + // Step 3 Init myself after sub ExecNode tree is created and initialized + RETURN_IF_ERROR(cur_exec_node->init(cur_plan_node, state)); // build up tree of profiles; add children >0 first, so that when we print // the profile, child 0 is printed last (makes the output more readable) - for (int i = 1; i < node->_children.size(); ++i) { - node->runtime_profile()->add_child(node->_children[i]->runtime_profile(), true, nullptr); + for (int i = 1; i < cur_exec_node->_children.size(); ++i) { + cur_exec_node->runtime_profile()->add_child(cur_exec_node->_children[i]->runtime_profile(), + true, nullptr); } - if (!node->_children.empty()) { - node->runtime_profile()->add_child(node->_children[0]->runtime_profile(), true, nullptr); + if (!cur_exec_node->_children.empty()) { + cur_exec_node->runtime_profile()->add_child(cur_exec_node->_children[0]->runtime_profile(), + true, nullptr); } return Status::OK(); @@ -298,48 +307,8 @@ Status ExecNode::create_tree_helper(RuntimeState* state, ObjectPool* pool, Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, ExecNode** node) { - std::stringstream error_msg; - - switch (tnode.node_type) { - case TPlanNodeType::OLAP_SCAN_NODE: - case TPlanNodeType::ASSERT_NUM_ROWS_NODE: - case TPlanNodeType::HASH_JOIN_NODE: - case TPlanNodeType::AGGREGATION_NODE: - case TPlanNodeType::UNION_NODE: - case TPlanNodeType::CROSS_JOIN_NODE: - case TPlanNodeType::SORT_NODE: - case TPlanNodeType::EXCHANGE_NODE: - case TPlanNodeType::ODBC_SCAN_NODE: - case TPlanNodeType::MYSQL_SCAN_NODE: - case TPlanNodeType::INTERSECT_NODE: - case TPlanNodeType::EXCEPT_NODE: - case TPlanNodeType::ES_HTTP_SCAN_NODE: - case TPlanNodeType::EMPTY_SET_NODE: - case TPlanNodeType::SCHEMA_SCAN_NODE: - case TPlanNodeType::ANALYTIC_EVAL_NODE: - case TPlanNodeType::SELECT_NODE: - case TPlanNodeType::REPEAT_NODE: - case TPlanNodeType::TABLE_FUNCTION_NODE: - case TPlanNodeType::DATA_GEN_SCAN_NODE: - case TPlanNodeType::FILE_SCAN_NODE: - case TPlanNodeType::JDBC_SCAN_NODE: - case TPlanNodeType::META_SCAN_NODE: - case TPlanNodeType::PARTITION_SORT_NODE: - case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: - break; - default: { - const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); - const char* str = "unknown node type"; - - if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) { - str = i->second; - } - error_msg << "V" << str << " not implemented"; - return Status::InternalError(error_msg.str()); - } - } - VLOG_CRITICAL << "tnode:\n" << apache::thrift::ThriftDebugString(tnode); + switch (tnode.node_type) { case TPlanNodeType::MYSQL_SCAN_NODE: #ifdef DORIS_WITH_MYSQL @@ -467,6 +436,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN str = i->second; } + std::stringstream error_msg; error_msg << str << " not implemented"; return Status::InternalError(error_msg.str()); } diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index ae7b40dc20..0fd691e3d8 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -318,11 +318,6 @@ protected: static Status create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, ExecNode** node); - static Status create_tree_helper(RuntimeState* state, ObjectPool* pool, - const std::vector& tnodes, - const DescriptorTbl& descs, ExecNode* parent, int* node_idx, - ExecNode** root); - virtual bool is_scan_node() const { return false; } void init_runtime_profile(const std::string& name); @@ -333,6 +328,11 @@ protected: std::atomic _can_read = false; private: + static Status create_tree_helper(RuntimeState* state, ObjectPool* pool, + const std::vector& tnodes, + const DescriptorTbl& descs, ExecNode* parent, int* node_idx, + ExecNode** root); + friend class pipeline::OperatorBase; bool _is_closed; bool _is_resource_released = false; diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index 90cbe5bb7a..10a3ca001a 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -61,8 +61,8 @@ Status VUnionNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); DCHECK(tnode.__isset.union_node); // Create const_expr_ctx_lists_ from thrift exprs. - auto& const_texpr_lists = tnode.union_node.const_expr_lists; - for (auto& texprs : const_texpr_lists) { + const auto& const_texpr_lists = tnode.union_node.const_expr_lists; + for (const auto& texprs : const_texpr_lists) { VExprContextSPtrs ctxs; RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, ctxs)); _const_expr_lists.push_back(ctxs);