// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "exec/exec_node.h" #include #include #include #include "common/object_pool.h" #include "common/status.h" #include "exprs/expr_context.h" #include "exec/aggregation_node.h" #include "exec/partitioned_aggregation_node.h" #include "exec/new_partitioned_aggregation_node.h" #include "exec/csv_scan_node.h" #include "exec/es_scan_node.h" #include "exec/es_http_scan_node.h" #include "exec/hash_join_node.h" #include "exec/broker_scan_node.h" #include "exec/cross_join_node.h" #include "exec/empty_set_node.h" #include "exec/mysql_scan_node.h" #include "exec/schema_scan_node.h" #include "exec/exchange_node.h" #include "exec/merge_join_node.h" #include "exec/merge_node.h" #include "exec/olap_rewrite_node.h" #include "exec/olap_scan_node.h" #include "exec/topn_node.h" #include "exec/sort_node.h" #include "exec/spill_sort_node.h" #include "exec/analytic_eval_node.h" #include "exec/select_node.h" #include "exec/union_node.h" #include "exec/intersect_node.h" #include "exec/except_node.h" #include "exec/repeat_node.h" #include "exec/assert_num_rows_node.h" #include "runtime/exec_env.h" #include "runtime/descriptors.h" #include "runtime/initial_reservations.h" #include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/debug_util.h" #include "util/runtime_profile.h" namespace doris { const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate"; int ExecNode::get_node_id_from_profile(RuntimeProfile* p) { return p->metadata(); } ExecNode::RowBatchQueue::RowBatchQueue(int max_batches) : BlockingQueue(max_batches) { } ExecNode::RowBatchQueue::~RowBatchQueue() { DCHECK(cleanup_queue_.empty()); } void ExecNode::RowBatchQueue::AddBatch(RowBatch* batch) { if (!blocking_put(batch)) { std::lock_guard lock(lock_); cleanup_queue_.push_back(batch); } } bool ExecNode::RowBatchQueue::AddBatchWithTimeout(RowBatch* batch, int64_t timeout_micros) { // return blocking_put_with_timeout(batch, timeout_micros); return blocking_put(batch); } RowBatch* ExecNode::RowBatchQueue::GetBatch() { RowBatch* result = NULL; if (blocking_get(&result)) return result; return NULL; } int ExecNode::RowBatchQueue::Cleanup() { int num_io_buffers = 0; // RowBatch* batch = NULL; // while ((batch = GetBatch()) != NULL) { // num_io_buffers += batch->num_io_buffers(); // delete batch; // } lock_guard l(lock_); for (std::list::iterator it = cleanup_queue_.begin(); it != cleanup_queue_.end(); ++it) { // num_io_buffers += (*it)->num_io_buffers(); delete *it; } cleanup_queue_.clear(); return num_io_buffers; } ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : _id(tnode.node_id), _type(tnode.node_type), _pool(pool), _tuple_ids(tnode.row_tuples), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _resource_profile(tnode.resource_profile), _debug_phase(TExecNodePhase::INVALID), _debug_action(TDebugAction::WAIT), _limit(tnode.limit), _num_rows_returned(0), _rows_returned_counter(NULL), _rows_returned_rate(NULL), _memory_used_counter(NULL), _is_closed(false){ init_runtime_profile(print_plan_node_type(tnode.node_type)); } ExecNode::~ExecNode() { } void ExecNode::push_down_predicate( RuntimeState* state, std::list* expr_ctxs) { if (_type != TPlanNodeType::AGGREGATION_NODE) { for (int i = 0; i < _children.size(); ++i) { _children[i]->push_down_predicate(state, expr_ctxs); if (expr_ctxs->size() == 0) { return; } } } std::list::iterator iter = expr_ctxs->begin(); while (iter != expr_ctxs->end()) { if ((*iter)->root()->is_bound(&_tuple_ids)) { // LOG(INFO) << "push down success expr is " << (*iter)->debug_string() // << " and node is " << debug_string(); (*iter)->prepare(state, row_desc(), _expr_mem_tracker.get()); (*iter)->open(state); _conjunct_ctxs.push_back(*iter); iter = expr_ctxs->erase(iter); } else { ++iter; } } } Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR( Expr::create_expr_trees(_pool, tnode.conjuncts, &_conjunct_ctxs)); return Status::OK(); } Status ExecNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::PREPARE)); DCHECK(_runtime_profile.get() != NULL); _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); _memory_used_counter = ADD_COUNTER(_runtime_profile, "MemoryUsed", TUnit::BYTES); _rows_returned_rate = runtime_profile()->add_derived_counter( ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, boost::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); _mem_tracker.reset(new MemTracker(-1, _runtime_profile->name(), state->instance_mem_tracker())); _expr_mem_tracker.reset(new MemTracker(-1, "Exprs", _mem_tracker.get())); _expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get())); // TODO chenhao RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc(), expr_mem_tracker())); // TODO(zc): // AddExprCtxsToFree(_conjunct_ctxs); for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->prepare(state)); } return Status::OK(); } Status ExecNode::open(RuntimeState* state) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); return Expr::open(_conjunct_ctxs, state); } Status ExecNode::reset(RuntimeState* state) { _num_rows_returned = 0; for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->reset(state)); } return Status::OK(); } Status ExecNode::collect_query_statistics(QueryStatistics* statistics) { DCHECK(statistics != nullptr); for (auto child_node : _children) { child_node->collect_query_statistics(statistics); } return Status::OK(); } Status ExecNode::close(RuntimeState* state) { if (_is_closed) { return Status::OK(); } _is_closed = true; RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); if (_rows_returned_counter != NULL) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); } Status result; for (int i = 0; i < _children.size(); ++i) { auto st = _children[i]->close(state); if (result.ok() && !st.ok()) { result = st; } } Expr::close(_conjunct_ctxs, state); if (expr_mem_pool() != nullptr) { _expr_mem_pool->free_all(); } if (_buffer_pool_client.is_registered()) { VLOG_FILE << _id << " returning reservation " << _resource_profile.min_reservation; state->initial_reservations()->Return( &_buffer_pool_client, _resource_profile.min_reservation); state->exec_env()->buffer_pool()->DeregisterClient(&_buffer_pool_client); } if (_expr_mem_tracker != nullptr) { _expr_mem_tracker->close(); } if (_mem_tracker != nullptr) { _mem_tracker->close(); } return result; } void ExecNode::add_runtime_exec_option(const std::string& str) { lock_guard l(_exec_options_lock); if (_runtime_exec_options.empty()) { _runtime_exec_options = str; } else { _runtime_exec_options.append(", "); _runtime_exec_options.append(str); } runtime_profile()->add_info_string("ExecOption", _runtime_exec_options); } Status ExecNode::create_tree(RuntimeState* state, ObjectPool* pool, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) { if (plan.nodes.size() == 0) { *root = NULL; return Status::OK(); } int node_idx = 0; RETURN_IF_ERROR(create_tree_helper(state, pool, plan.nodes, descs, NULL, &node_idx, root)); if (node_idx + 1 != plan.nodes.size()) { // TODO: print thrift msg for diagnostic purposes. return Status::InternalError( "Plan tree only partially reconstructed. Not all thrift nodes were used."); } return Status::OK(); } Status ExecNode::create_tree_helper( RuntimeState* state, ObjectPool* pool, const vector& tnodes, const DescriptorTbl& descs, ExecNode* parent, int* node_idx, ExecNode** root) { // propagate error case if (*node_idx >= tnodes.size()) { // TODO: print thrift msg return Status::InternalError("Failed to reconstruct plan tree from thrift."); } const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnodes[*node_idx].num_children; ExecNode* node = NULL; RETURN_IF_ERROR(create_node(state, pool, tnodes[*node_idx], descs, &node)); // assert(parent != NULL || (node_idx == 0 && root_expr != NULL)); if (parent != NULL) { parent->_children.push_back(node); } else { *root = node; } for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(create_tree_helper(state, pool, tnodes, descs, node, node_idx, NULL)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail if (*node_idx >= tnodes.size()) { // TODO: print thrift msg return Status::InternalError("Failed to reconstruct plan tree from thrift."); } } RETURN_IF_ERROR(node->init(tnode, state)); // build up tree of profiles; add children >0 first, so that when we print // the profile, child 0 is printed last (makes the output more readable) for (int i = 1; i < node->_children.size(); ++i) { node->runtime_profile()->add_child(node->_children[i]->runtime_profile(), true, NULL); } if (!node->_children.empty()) { node->runtime_profile()->add_child(node->_children[0]->runtime_profile(), false, NULL); } return Status::OK(); } Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, ExecNode** node) { std::stringstream error_msg; VLOG(2) << "tnode:\n" << apache::thrift::ThriftDebugString(tnode); switch (tnode.node_type) { case TPlanNodeType::CSV_SCAN_NODE: *node = pool->add(new CsvScanNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::MYSQL_SCAN_NODE: #ifdef DORIS_WITH_MYSQL *node = pool->add(new MysqlScanNode(pool, tnode, descs)); return Status::OK(); #else return Status::InternalError("Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON"); #endif case TPlanNodeType::ES_SCAN_NODE: *node = pool->add(new EsScanNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::ES_HTTP_SCAN_NODE: *node = pool->add(new EsHttpScanNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::SCHEMA_SCAN_NODE: *node = pool->add(new SchemaScanNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::OLAP_SCAN_NODE: *node = pool->add(new OlapScanNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::AGGREGATION_NODE: if (config::enable_partitioned_aggregation) { *node = pool->add(new PartitionedAggregationNode(pool, tnode, descs)); } else if (config::enable_new_partitioned_aggregation) { *node = pool->add(new NewPartitionedAggregationNode(pool, tnode, descs)); } else { *node = pool->add(new AggregationNode(pool, tnode, descs)); } return Status::OK(); case TPlanNodeType::HASH_JOIN_NODE: *node = pool->add(new HashJoinNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::CROSS_JOIN_NODE: *node = pool->add(new CrossJoinNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::MERGE_JOIN_NODE: *node = pool->add(new MergeJoinNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::EMPTY_SET_NODE: *node = pool->add(new EmptySetNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::EXCHANGE_NODE: *node = pool->add(new ExchangeNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::SELECT_NODE: *node = pool->add(new SelectNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::OLAP_REWRITE_NODE: *node = pool->add(new OlapRewriteNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::SORT_NODE: if (tnode.sort_node.use_top_n) { *node = pool->add(new TopNNode(pool, tnode, descs)); } else { *node = pool->add(new SpillSortNode(pool, tnode, descs)); } return Status::OK(); case TPlanNodeType::ANALYTIC_EVAL_NODE: *node = pool->add(new AnalyticEvalNode(pool, tnode, descs)); break; case TPlanNodeType::MERGE_NODE: *node = pool->add(new MergeNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::UNION_NODE: *node = pool->add(new UnionNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::INTERSECT_NODE: *node = pool->add(new IntersectNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::EXCEPT_NODE: *node = pool->add(new ExceptNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::BROKER_SCAN_NODE: *node = pool->add(new BrokerScanNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::REPEAT_NODE: *node = pool->add(new RepeatNode(pool, tnode, descs)); return Status::OK(); case TPlanNodeType::ASSERT_NUM_ROWS_NODE: *node = pool->add(new AssertNumRowsNode(pool, tnode, descs)); return Status::OK(); default: map::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); const char* str = "unknown node type"; if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) { str = i->second; } error_msg << str << " not implemented"; return Status::InternalError(error_msg.str()); } return Status::OK(); } void ExecNode::set_debug_options( int node_id, TExecNodePhase::type phase, TDebugAction::type action, ExecNode* root) { if (root->_id == node_id) { root->_debug_phase = phase; root->_debug_action = action; return; } for (int i = 0; i < root->_children.size(); ++i) { set_debug_options(node_id, phase, action, root->_children[i]); } } std::string ExecNode::debug_string() const { std::stringstream out; this->debug_string(0, &out); return out.str(); } void ExecNode::debug_string(int indentation_level, std::stringstream* out) const { *out << " conjuncts=" << Expr::debug_string(_conjuncts); *out << " id=" << _id; *out << " type=" << print_plan_node_type(_type); *out << " tuple_ids=["; for (auto id : _tuple_ids) { *out << id << ", "; } *out << "]"; for (int i = 0; i < _children.size(); ++i) { *out << "\n"; _children[i]->debug_string(indentation_level + 1, out); } } bool ExecNode::eval_conjuncts(ExprContext* const* ctxs, int num_ctxs, TupleRow* row) { for (int i = 0; i < num_ctxs; ++i) { BooleanVal v = ctxs[i]->get_boolean_val(row); if (v.is_null || !v.val) { return false; } } return true; } void ExecNode::collect_nodes(TPlanNodeType::type node_type, vector* nodes) { if (_type == node_type) { nodes->push_back(this); } for (int i = 0; i < _children.size(); ++i) { _children[i]->collect_nodes(node_type, nodes); } } void ExecNode::collect_scan_nodes(vector* nodes) { collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::ES_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); } void ExecNode::try_do_aggregate_serde_improve() { std::vector agg_node; collect_nodes(TPlanNodeType::AGGREGATION_NODE, &agg_node); if (agg_node.size() != 1) { return; } if (agg_node[0]->_children.size() != 1) { return; } if (agg_node[0]->_children[0]->type() != TPlanNodeType::OLAP_SCAN_NODE) { return; } OlapScanNode* scan_node = static_cast(agg_node[0]->_children[0]); scan_node->set_no_agg_finalize(); } void ExecNode::init_runtime_profile(const std::string& name) { std::stringstream ss; ss << name << " (id=" << _id << ")"; _runtime_profile.reset(new RuntimeProfile(_pool, ss.str())); _runtime_profile->set_metadata(_id); } Status ExecNode::exec_debug_action(TExecNodePhase::type phase) { DCHECK(phase != TExecNodePhase::INVALID); if (_debug_phase != phase) { return Status::OK(); } if (_debug_action == TDebugAction::FAIL) { return Status::InternalError("Debug Action: FAIL"); } if (_debug_action == TDebugAction::WAIT) { while (true) { sleep(1); } } return Status::OK(); } Status ExecNode::claim_buffer_reservation(RuntimeState* state) { DCHECK(!_buffer_pool_client.is_registered()); BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool(); // Check the minimum buffer size in case the minimum buffer size used by the planner // doesn't match this backend's. std::stringstream ss; if (_resource_profile.__isset.spillable_buffer_size && _resource_profile.spillable_buffer_size < buffer_pool->min_buffer_len()) { ss << "Spillable buffer size for node " << _id << " of " << _resource_profile.spillable_buffer_size << "bytes is less than the minimum buffer pool buffer size of " << buffer_pool->min_buffer_len() << "bytes"; return Status::InternalError(ss.str()); } ss << print_plan_node_type(_type) << " id=" << _id << " ptr=" << this; RETURN_IF_ERROR(buffer_pool->RegisterClient(ss.str(), state->instance_buffer_reservation(), mem_tracker(), _resource_profile.max_reservation, runtime_profile(), &_buffer_pool_client)); state->initial_reservations()->Claim(&_buffer_pool_client, _resource_profile.min_reservation); /* if (debug_action_ == TDebugAction::SET_DENY_RESERVATION_PROBABILITY && (debug_phase_ == TExecNodePhase::PREPARE || debug_phase_ == TExecNodePhase::OPEN)) { // We may not have been able to enable the debug action at the start of Prepare() or // Open() because the client is not registered then. Do it now to be sure that it is // effective. RETURN_IF_ERROR(EnableDenyReservationDebugAction()); } */ return Status::OK(); } Status ExecNode::release_unused_reservation() { return _buffer_pool_client.DecreaseReservationTo(_resource_profile.min_reservation); } /* Status ExecNode::enable_deny_reservation_debug_action() { DCHECK_EQ(debug_action_, TDebugAction::SET_DENY_RESERVATION_PROBABILITY); DCHECK(_buffer_pool_client.is_registered()); // Parse [0.0, 1.0] probability. StringParser::ParseResult parse_result; double probability = StringParser::StringToFloat( debug_action_param_.c_str(), debug_action_param_.size(), &parse_result); if (parse_result != StringParser::PARSE_SUCCESS || probability < 0.0 || probability > 1.0) { return Status::InternalError(Substitute( "Invalid SET_DENY_RESERVATION_PROBABILITY param: '$0'", debug_action_param_)); } _buffer_pool_client.SetDebugDenyIncreaseReservation(probability); return Status::OK()(); } */ Status ExecNode::QueryMaintenance(RuntimeState* state, const std::string& msg) { // TODO chenhao , when introduce latest AnalyticEvalNode open it // ScalarExprEvaluator::FreeLocalAllocations(evals_to_free_); return state->check_query_state(msg); } }