[Improvement](scan) Use scanner to do projection of scan node (#29124)

This commit is contained in:
Gabriel
2023-12-27 16:00:52 +08:00
committed by GitHub
parent cd1e109cc3
commit c75e63a2a5
14 changed files with 138 additions and 25 deletions

View File

@ -103,7 +103,7 @@ public:
// TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
[[nodiscard]] virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);
// new interface to compatible new optimizers in FE
[[nodiscard]] Status get_next_after_projects(
[[nodiscard]] virtual Status get_next_after_projects(
RuntimeState* state, vectorized::Block* block, bool* eos,
const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& fn,
bool clear_data = true);

View File

@ -1239,10 +1239,10 @@ template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<vectorized::VScannerSPtr>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners,
p.limit(), state()->scan_queue_mem_limit(),
p._col_distribute_ids, 1, _scan_dependency,
_finish_dependency);
_scanner_ctx = PipScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
state()->scan_queue_mem_limit(), p._col_distribute_ids, 1, _scan_dependency,
_finish_dependency);
return Status::OK();
}

View File

@ -427,6 +427,10 @@ public:
Status open(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
Status get_block_after_projects(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
return get_block(state, block, source_state);
}
[[nodiscard]] bool is_source() const override { return true; }
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {

View File

@ -193,8 +193,8 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
return Status::OK();
}
Status OperatorXBase::get_next_after_projects(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
auto local_state = state->get_local_state(operator_id());
if (_output_row_descriptor) {
local_state->clear_origin_block();
@ -461,8 +461,8 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
template <typename LocalStateType>
Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_next_after_projects(state, block,
source_state));
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects(state, block,
source_state));
return pull(state, block, source_state);
}
@ -473,7 +473,7 @@ Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
->template cast<LocalStateType>();
if (need_more_input_data(state)) {
local_state._child_block->clear_column_data();
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_next_after_projects(
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects(
state, local_state._child_block.get(), local_state._child_source_state));
source_state = local_state._child_source_state;
if (local_state._child_block->rows() == 0 &&

View File

@ -271,10 +271,15 @@ public:
return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor;
}
[[nodiscard]] const RowDescriptor* output_row_descriptor() {
return _output_row_descriptor.get();
}
[[nodiscard]] bool is_source() const override { return false; }
Status get_next_after_projects(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
[[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state);
/// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc
Status do_projections(RuntimeState* state, vectorized::Block* origin_block,
@ -286,6 +291,7 @@ protected:
template <typename Dependency>
friend class PipelineXLocalState;
friend class PipelineXLocalStateBase;
friend class VScanner;
const int _operator_id;
const int _node_id; // unique w/in single plan tree
TPlanNodeType::type _type;

View File

@ -272,7 +272,7 @@ Status PipelineXTask::execute(bool* eos) {
if (!_dry_run) {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
RETURN_IF_ERROR(_root->get_next_after_projects(_state, block, _data_state));
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, _data_state));
} else {
_data_state = SourceState::FINISHED;
}

View File

@ -31,23 +31,26 @@ class PipScannerContext : public vectorized::ScannerContext {
public:
PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids,
const int num_parallel_instances)
: vectorized::ScannerContext(state, parent, output_tuple_desc, scanners, limit_,
max_bytes_in_blocks_queue, num_parallel_instances),
: vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor,
scanners, limit_, max_bytes_in_blocks_queue,
num_parallel_instances),
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}
PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids,
const int num_parallel_instances,
std::shared_ptr<pipeline::ScanDependency> dependency,
std::shared_ptr<pipeline::Dependency> finish_dependency)
: vectorized::ScannerContext(state, output_tuple_desc, scanners, limit_,
max_bytes_in_blocks_queue, num_parallel_instances,
: vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners,
limit_, max_bytes_in_blocks_queue, num_parallel_instances,
local_state, dependency, finish_dependency),
_need_colocate_distribute(false) {}

View File

@ -46,6 +46,7 @@ namespace doris::vectorized {
using namespace std::chrono_literals;
ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state,
@ -54,7 +55,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
: _state(state),
_parent(nullptr),
_local_state(local_state),
_output_tuple_desc(output_tuple_desc),
_output_tuple_desc(output_row_descriptor
? output_row_descriptor->tuple_descriptors().front()
: output_tuple_desc),
_output_row_descriptor(output_row_descriptor),
_process_status(Status::OK()),
_batch_size(state->batch_size()),
limit(limit_),
@ -66,6 +70,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
_num_parallel_instances(num_parallel_instances),
_dependency(dependency),
_finish_dependency(finish_dependency) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
// Use the task exec context as a lock between scanner threads and fragment exection threads
_task_exec_ctx = _state->get_task_execution_context();
_query_id = _state->get_query_ctx()->query_id();
@ -92,13 +98,17 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VScanNode* parent,
const doris::TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state)
: _state(state),
_parent(parent),
_local_state(local_state),
_output_tuple_desc(output_tuple_desc),
_output_tuple_desc(output_row_descriptor
? output_row_descriptor->tuple_descriptors().front()
: output_tuple_desc),
_output_row_descriptor(output_row_descriptor),
_process_status(Status::OK()),
_batch_size(state->batch_size()),
limit(limit_),
@ -108,6 +118,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS
_scanners(scanners),
_scanners_ref(scanners.begin(), scanners.end()),
_num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
// Use the task exec context as a lock between scanner threads and fragment exection threads
_task_exec_ctx = _state->get_task_execution_context();
_query_id = _state->get_query_ctx()->query_id();

View File

@ -70,6 +70,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext> {
public:
ScannerContext(RuntimeState* state, VScanNode* parent, const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int num_parallel_instances = 1,
pipeline::ScanLocalStateBase* local_state = nullptr);
@ -187,6 +188,7 @@ private:
protected:
ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state,
@ -203,6 +205,7 @@ protected:
// the comment of same fields in VScanNode
const TupleDescriptor* _output_tuple_desc = nullptr;
const RowDescriptor* _output_row_descriptor = nullptr;
// _transfer_lock is used to protect the critical section
// where the ScanNode and ScannerScheduler interact.

View File

@ -351,7 +351,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
BlockUPtr block = ctx->get_free_block();
status = scanner->get_block(state, block.get(), &eos);
status = scanner->get_block_after_projects(state, block.get(), &eos);
// The VFileScanner for external table may try to open not exist files,
// Because FE file cache for external table may out of date.
// So, NOT_FOUND for VFileScanner is not a fail case.

View File

@ -323,10 +323,11 @@ Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners,
if (_is_pipeline_scan) {
int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1;
_scanner_ctx = pipeline::PipScannerContext::create_shared(
_state, this, _output_tuple_desc, scanners, limit(), _state->scan_queue_mem_limit(),
_col_distribute_ids, max_queue_size);
_state, this, _output_tuple_desc, _output_row_descriptor.get(), scanners, limit(),
_state->scan_queue_mem_limit(), _col_distribute_ids, max_queue_size);
} else {
_scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, scanners,
_scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc,
_output_row_descriptor.get(), scanners,
limit(), _state->scan_queue_mem_limit());
}
return Status::OK();

View File

@ -177,6 +177,20 @@ public:
RuntimeProfile* scanner_profile() { return _scanner_profile.get(); }
Status get_next_after_projects(
RuntimeState* state, vectorized::Block* block, bool* eos,
const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& fn,
bool clear_data = true) override {
Defer defer([block, this]() {
if (block && !block->empty()) {
COUNTER_UPDATE(_output_bytes_counter, block->allocated_bytes());
COUNTER_UPDATE(_block_count_counter, 1);
}
});
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
return get_next(state, block, eos);
}
protected:
// Different data sources register different profiles by implementing this method
virtual Status _init_profile();

View File

@ -35,7 +35,8 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, Runtim
_local_state(nullptr),
_limit(limit),
_profile(profile),
_output_tuple_desc(parent->output_tuple_desc()) {
_output_tuple_desc(parent->output_tuple_desc()),
_output_row_descriptor(_parent->_output_row_descriptor.get()) {
_total_rf_num = _parent->runtime_filter_num();
}
@ -46,7 +47,8 @@ VScanner::VScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_stat
_local_state(local_state),
_limit(limit),
_profile(profile),
_output_tuple_desc(_local_state->output_tuple_desc()) {
_output_tuple_desc(_local_state->output_tuple_desc()),
_output_row_descriptor(_local_state->_parent->output_row_descriptor()) {
_total_rf_num = _local_state->runtime_filter_num();
}
@ -58,9 +60,30 @@ Status VScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts
}
}
const auto& projections = _parent ? _parent->_projections : _local_state->_projections;
if (!projections.empty()) {
_projections.resize(projections.size());
for (size_t i = 0; i != projections.size(); ++i) {
RETURN_IF_ERROR(projections[i]->clone(state, _projections[i]));
}
}
return Status::OK();
}
Status VScanner::get_block_after_projects(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& row_descriptor =
_parent ? _parent->_row_descriptor : _local_state->_parent->row_descriptor();
if (_output_row_descriptor) {
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
auto status = get_block(state, &_origin_block, eos);
if (UNLIKELY(!status.ok())) return status;
return _do_projections(&_origin_block, block);
}
return get_block(state, block, eos);
}
Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
// only empty block should be here
DCHECK(block->rows() == 0);
@ -138,6 +161,47 @@ Status VScanner::_filter_output_block(Block* block) {
return st;
}
Status VScanner::_do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
auto projection_timer = _parent ? _parent->_projection_timer : _local_state->_projection_timer;
auto exec_timer = _parent ? _parent->_exec_timer : _local_state->_exec_timer;
SCOPED_TIMER(exec_timer);
SCOPED_TIMER(projection_timer);
MutableBlock mutable_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
auto rows = origin_block->rows();
if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
if (mutable_columns.size() != _projections.size()) {
return Status::InternalError(
"Logical error in scanner, output of projections {} mismatches with "
"scanner output {}",
_projections.size(), mutable_columns.size());
}
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) {
DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable());
reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
->insert_range_from_not_nullable(*column_ptr, 0, rows);
} else {
mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
}
return Status::OK();
}
Status VScanner::try_append_late_arrival_runtime_filter() {
if (_applied_rf_num == _total_rf_num) {
return Status::OK();

View File

@ -67,6 +67,7 @@ public:
virtual Status open(RuntimeState* state) { return Status::OK(); }
Status get_block(RuntimeState* state, Block* block, bool* eos);
Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos);
virtual Status close(RuntimeState* state);
@ -89,6 +90,8 @@ protected:
// Filter the output block finally.
Status _filter_output_block(Block* block);
Status _do_projections(vectorized::Block* origin_block, vectorized::Block* output_block);
// Not virtual, all child will call this method explictly
Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts);
@ -172,6 +175,7 @@ protected:
RuntimeProfile* _profile = nullptr;
const TupleDescriptor* _output_tuple_desc = nullptr;
const RowDescriptor* _output_row_descriptor = nullptr;
// If _input_tuple_desc is set, the scanner will read data into
// this _input_block first, then convert to the output block.
@ -189,6 +193,8 @@ protected:
// Cloned from _conjuncts of scan node.
// It includes predicate in SQL and runtime filters.
VExprContextSPtrs _conjuncts;
VExprContextSPtrs _projections;
vectorized::Block _origin_block;
VExprContextSPtrs _common_expr_ctxs_push_down;
// Late arriving runtime filters will update _conjuncts.