[improvement](scan) avoid too many scanners for file scan node (#25727)

In previous, when using file scan node(eq, querying hive table), the max number of scanner for each scan node
will be the `doris_scanner_thread_pool_thread_num`(default is 48).
And if the query parallelism is N, the total number of scanner would be 48 * N, which is too many.

In this PR, I change the logic, the max number of scanner for each scan node
will be the `doris_scanner_thread_pool_thread_num / query parallelism`. So that the total number of scanners
will be up to `doris_scanner_thread_pool_thread_num`.

Reduce the number of scanner can significantly reduce the memory usage of query.
This commit is contained in:
Mingyu Chen
2023-10-29 17:41:31 +08:00
committed by GitHub
parent 99b45e1938
commit e20cab64f4
35 changed files with 83 additions and 50 deletions

View File

@ -213,8 +213,6 @@ Status ExecNode::close(RuntimeState* state) {
<< " already closed";
return Status::OK();
}
LOG(INFO) << "query= " << print_id(state->query_id())
<< " fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed";
_is_closed = true;
Status result;
@ -228,6 +226,9 @@ Status ExecNode::close(RuntimeState* state) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
release_resource(state);
LOG(INFO) << "query= " << print_id(state->query_id())
<< ", fragment_instance_id=" << print_id(state->fragment_instance_id())
<< ", id=" << _id << " type=" << print_plan_node_type(_type) << " closed";
return result;
}

View File

@ -83,7 +83,8 @@ public:
// Convert scan_ranges into node-specific scan restrictions. This should be
// called after prepare()
virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) = 0;
virtual Status set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) = 0;
bool is_scan_node() const override { return true; }

View File

@ -216,7 +216,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
RETURN_IF_ERROR(
putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]));
LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id
LOG(INFO) << "fragment_instance_id=" << print_id(plan.params.fragment_instance_id)
<< " table=" << plan.table_name;
} else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) {
auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
@ -334,4 +334,4 @@ template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<TPipelineFragmentParams> params);
} // namespace io
} // namespace doris
} // namespace doris

View File

@ -106,7 +106,8 @@ Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca
return Status::OK();
}
void EsScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
void EsScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
for (auto& es_scan_range : scan_ranges) {
DCHECK(es_scan_range.scan_range.__isset.es_scan_range);
_scan_ranges.emplace_back(new TEsScanRange(es_scan_range.scan_range.es_scan_range));

View File

@ -48,7 +48,8 @@ public:
private:
friend class vectorized::NewEsScanner;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
Status _init_profile() override;
Status _process_conjuncts() override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;

View File

@ -52,8 +52,11 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}
void FileScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
int max_scanners = config::doris_scanner_thread_pool_thread_num;
void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
int max_scanners =
config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num();
max_scanners = max_scanners == 0 ? 1 : max_scanners;
if (scan_ranges.size() <= max_scanners) {
_scan_ranges = scan_ranges;
} else {

View File

@ -51,7 +51,8 @@ public:
Status _process_conjuncts() override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
int parent_id() { return _parent->node_id(); }
private:

View File

@ -39,7 +39,8 @@ Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}
void MetaScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
void MetaScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
_scan_ranges = scan_ranges;
}

View File

@ -48,7 +48,8 @@ public:
private:
friend class vectorized::NewOlapScanner;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;
Status _process_conjuncts() override;

View File

@ -293,7 +293,8 @@ TOlapScanNode& OlapScanLocalState::olap_scan_node() {
return _parent->cast<OlapScanOperatorX>()._olap_scan_node;
}
void OlapScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
void OlapScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
for (auto& scan_range : scan_ranges) {
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
_scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range));

View File

@ -50,7 +50,8 @@ public:
private:
friend class vectorized::NewOlapScanner;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
Status _init_profile() override;
Status _process_conjuncts() override;
bool _is_key_column(const std::string& col_name) override;

View File

@ -131,7 +131,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
_filter_dependency->set_filter_blocked_by_fn(
[this]() { return this->runtime_filters_are_ready_or_timeout(); });
auto& p = _parent->cast<typename Derived::Parent>();
set_scan_ranges(info.scan_ranges);
set_scan_ranges(state, info.scan_ranges);
_common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) {
RETURN_IF_ERROR(

View File

@ -136,7 +136,8 @@ public:
[[nodiscard]] virtual int runtime_filter_num() const = 0;
virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0;
virtual void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) = 0;
virtual void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) = 0;
virtual TPushAggOp::type get_push_down_agg_type() = 0;
@ -219,7 +220,8 @@ class ScanLocalState : public ScanLocalStateBase {
}
Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) override;
virtual void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override {}
virtual void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override {}
TPushAggOp::type get_push_down_agg_type() override;

View File

@ -300,15 +300,15 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
no_scan_ranges);
const bool shared_scan =
find_with_default(local_params.per_node_shared_scans, scan_node->id(), false);
scan_node->set_scan_ranges(scan_ranges);
scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges);
scan_node->set_shared_scan(_runtime_state.get(), shared_scan);
} else {
ScanNode* scan_node = static_cast<ScanNode*>(node);
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
no_scan_ranges);
static_cast<void>(scan_node->set_scan_ranges(scan_ranges));
static_cast<void>(scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges));
VLOG_CRITICAL << "query " << print_id(get_query_id())
<< "scan_node_Id=" << scan_node->id()
<< " scan_node_id=" << scan_node->id()
<< " size=" << scan_ranges.get().size();
}
}

View File

@ -464,7 +464,7 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
std::vector<TScanRangeParams> params_vector;
params_vector.emplace_back(scan_range_params);
file_scan_node.set_scan_ranges(params_vector);
file_scan_node.set_scan_ranges(runtime_state.get(), params_vector);
RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
// 3. Put the block into block queue.
@ -557,4 +557,4 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i
}
return group_commit_table->get_load_block_queue(instance_id, load_block_queue);
}
} // namespace doris
} // namespace doris

View File

@ -211,12 +211,12 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]);
auto scan_ranges =
find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
scan_node->set_scan_ranges(scan_ranges);
scan_node->set_scan_ranges(runtime_state(), scan_ranges);
} else {
ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
auto scan_ranges =
find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
static_cast<void>(scan_node->set_scan_ranges(scan_ranges));
static_cast<void>(scan_node->set_scan_ranges(runtime_state(), scan_ranges));
VLOG_CRITICAL << "scan_node_Id=" << scan_node->id()
<< " size=" << scan_ranges.get().size();
}

View File

@ -108,7 +108,7 @@ Status BackendService::create_service(ExecEnv* exec_env, int port,
void BackendService::exec_plan_fragment(TExecPlanFragmentResult& return_val,
const TExecPlanFragmentParams& params) {
LOG(INFO) << "exec_plan_fragment() instance_id=" << params.params.fragment_instance_id
LOG(INFO) << "exec_plan_fragment() instance_id=" << print_id(params.params.fragment_instance_id)
<< " coord=" << params.coord << " backend#=" << params.backend_num;
start_plan_fragment_execution(params).set_t_status(&return_val);
}
@ -122,7 +122,7 @@ Status BackendService::start_plan_fragment_execution(const TExecPlanFragmentPara
void BackendService::cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
const TCancelPlanFragmentParams& params) {
LOG(INFO) << "cancel_plan_fragment(): instance_id=" << params.fragment_instance_id;
LOG(INFO) << "cancel_plan_fragment(): instance_id=" << print_id(params.fragment_instance_id);
_exec_env->fragment_mgr()->cancel_instance(params.fragment_instance_id,
PPlanFragmentCancelReason::INTERNAL_ERROR);
}

View File

@ -50,7 +50,8 @@ Status GroupCommitScanNode::prepare(RuntimeState* state) {
return VScanNode::prepare(state);
}
void GroupCommitScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {}
void GroupCommitScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {}
Status GroupCommitScanNode::_init_profile() {
return VScanNode::_init_profile();

View File

@ -31,7 +31,8 @@ public:
Status prepare(RuntimeState* state) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
std::string get_name() override;

View File

@ -109,7 +109,8 @@ Status NewEsScanNode::prepare(RuntimeState* state) {
return Status::OK();
}
void NewEsScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
void NewEsScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
for (auto& es_scan_range : scan_ranges) {
DCHECK(es_scan_range.scan_range.__isset.es_scan_range);
_scan_ranges.emplace_back(new TEsScanRange(es_scan_range.scan_range.es_scan_range));

View File

@ -55,7 +55,8 @@ public:
std::string get_name() override;
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
Status prepare(RuntimeState* state) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
protected:
Status _init_profile() override;

View File

@ -58,8 +58,11 @@ Status NewFileScanNode::prepare(RuntimeState* state) {
return Status::OK();
}
void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
int max_scanners = config::doris_scanner_thread_pool_thread_num;
void NewFileScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
int max_scanners =
config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num();
max_scanners = max_scanners == 0 ? 1 : max_scanners;
if (scan_ranges.size() <= max_scanners) {
_scan_ranges = scan_ranges;
} else {
@ -122,7 +125,6 @@ Status NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id));
scanners->push_back(std::move(scanner));
}
return Status::OK();
}

View File

@ -47,7 +47,8 @@ public:
Status prepare(RuntimeState* state) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
std::string get_name() override;

View File

@ -46,7 +46,8 @@ public:
std::string get_name() override;
// no use
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override {}
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override {}
protected:
Status _init_profile() override;

View File

@ -402,7 +402,8 @@ Status NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_c
// 9: optional string table_name
//}
// every doris_scan_range is related with one tablet so that one olap scan node contains multiple tablet
void NewOlapScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
void NewOlapScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
for (auto& scan_range : scan_ranges) {
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
_scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range));

View File

@ -66,7 +66,8 @@ public:
Status collect_query_statistics(QueryStatistics* statistics) override;
Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
std::string get_name() override;

View File

@ -52,7 +52,8 @@ Status VMetaScanNode::prepare(RuntimeState* state) {
return Status::OK();
}
void VMetaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
void VMetaScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
_scan_ranges = scan_ranges;
}

View File

@ -47,7 +47,8 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
Status prepare(RuntimeState* state) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
const TMetaScanNode& scan_params() { return _scan_params; }
private:
@ -61,4 +62,4 @@ private:
std::vector<TScanRangeParams> _scan_ranges;
};
} // namespace doris::vectorized
} // namespace doris::vectorized

View File

@ -116,7 +116,8 @@ public:
Status open(RuntimeState* state) override;
virtual void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {}
virtual void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {}
void set_shared_scan(RuntimeState* state, bool shared_scan) {
_shared_scan_opt = shared_scan;

View File

@ -137,7 +137,8 @@ Status VDataGenFunctionScanNode::close(RuntimeState* state) {
return ExecNode::close(state);
}
Status VDataGenFunctionScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
Status VDataGenFunctionScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
return _table_func->set_scan_ranges(scan_ranges);
}

View File

@ -51,7 +51,8 @@ public:
Status close(RuntimeState* state) override;
// No use
Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
Status set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
protected:
std::shared_ptr<VDataGenFunctionInf> _table_func;

View File

@ -227,7 +227,8 @@ void VMysqlScanNode::debug_string(int indentation_level, std::stringstream* out)
}
}
Status VMysqlScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
Status VMysqlScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
return Status::OK();
}
} // namespace doris::vectorized
} // namespace doris::vectorized

View File

@ -49,7 +49,8 @@ public:
Status close(RuntimeState* state) override;
// No use
Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
Status set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
private:
// Write debug string of this into out.
@ -79,4 +80,4 @@ private:
DataTypeSerDe::FormatOptions _text_formatOptions;
};
} // namespace vectorized
} // namespace doris
} // namespace doris

View File

@ -302,7 +302,8 @@ void VSchemaScanNode::debug_string(int indentation_level, std::stringstream* out
}
}
Status VSchemaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
Status VSchemaScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
return Status::OK();
}

View File

@ -56,7 +56,8 @@ public:
private:
// this is no use in this class
Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
Status set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
// Write debug string of this into out.
void debug_string(int indentation_level, std::stringstream* out) const override;
@ -77,4 +78,4 @@ private:
std::unique_ptr<SchemaScanner> _schema_scanner;
};
} // namespace vectorized
} // namespace doris
} // namespace doris