diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 0d4509012e..7edf634c17 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -17,6 +17,7 @@ #pragma once +#include "concurrentqueue.h" #include "runtime/descriptors.h" #include "scanner_context.h" @@ -32,9 +33,11 @@ public: const TupleDescriptor* input_tuple_desc, const TupleDescriptor* output_tuple_desc, const std::list& scanners, int64_t limit, - int64_t max_bytes_in_blocks_queue, const std::vector& col_distribute_ids) + int64_t max_bytes_in_blocks_queue, const std::vector& col_distribute_ids, + const int num_parallel_instances) : vectorized::ScannerContext(state, parent, input_tuple_desc, output_tuple_desc, - scanners, limit, max_bytes_in_blocks_queue), + scanners, limit, max_bytes_in_blocks_queue, + num_parallel_instances), _col_distribute_ids(col_distribute_ids), _need_colocate_distribute(!_col_distribute_ids.empty()) {} @@ -52,11 +55,7 @@ public: } { - std::unique_lock l(*_queue_mutexs[id]); - if (!_blocks_queues[id].empty()) { - *block = std::move(_blocks_queues[id].front()); - _blocks_queues[id].pop_front(); - } else { + if (!_blocks_queues[id].try_dequeue(*block)) { *eos = _is_finished || _should_stop; return Status::OK(); } @@ -69,7 +68,7 @@ public: bool done() override { return _is_finished || _should_stop || _status_error; } void append_blocks_to_queue(std::vector& blocks) override { - const int queue_size = _queue_mutexs.size(); + const int queue_size = _blocks_queues.size(); const int block_size = blocks.size(); int64_t local_bytes = 0; @@ -78,7 +77,7 @@ public: for (const auto& block : blocks) { // vectorized calculate hash int rows = block->rows(); - const auto element_size = _max_queue_size; + const auto element_size = _num_parallel_instances; hash_vals.resize(rows); std::fill(hash_vals.begin(), hash_vals.end(), 0); auto* __restrict hashes = hash_vals.data(); @@ -113,9 +112,8 @@ public: for (int i = 0; i < queue_size && i < block_size; ++i) { int queue = _next_queue_to_feed; { - std::lock_guard l(*_queue_mutexs[queue]); for (int j = i; j < block_size; j += queue_size) { - _blocks_queues[queue].emplace_back(std::move(blocks[j])); + _blocks_queues[queue].enqueue(std::move(blocks[j])); } } _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0; @@ -124,58 +122,108 @@ public: _current_used_bytes += local_bytes; } - bool empty_in_queue(int id) override { - std::unique_lock l(*_queue_mutexs[id]); - return _blocks_queues[id].empty(); + bool empty_in_queue(int id) override { return _blocks_queues[id].size_approx() == 0; } + + Status init() override { + for (int i = 0; i < _num_parallel_instances; ++i) { + _blocks_queues.emplace_back(moodycamel::ConcurrentQueue()); + } + RETURN_IF_ERROR(ScannerContext::init()); + if (_need_colocate_distribute) { + _init_colocate_block(); + } + return Status::OK(); } - void set_max_queue_size(const int max_queue_size) override { - _max_queue_size = max_queue_size; - for (int i = 0; i < max_queue_size; ++i) { - _queue_mutexs.emplace_back(new std::mutex); - _blocks_queues.emplace_back(std::list()); - } - if (_need_colocate_distribute) { - int real_block_size = - limit == -1 ? _batch_size : std::min(static_cast(_batch_size), limit); - int64_t free_blocks_memory_usage = 0; - for (int i = 0; i < _max_queue_size; ++i) { - auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), - real_block_size, - true /*ignore invalid slots*/); - free_blocks_memory_usage += block->allocated_bytes(); - _colocate_mutable_blocks.emplace_back( - vectorized::MutableBlock::create_unique(block.get())); - _colocate_blocks.emplace_back(std::move(block)); - _colocate_block_mutexs.emplace_back(new std::mutex); - } - _free_blocks_memory_usage->add(free_blocks_memory_usage); + void _init_colocate_block() { + int real_block_size = + limit == -1 ? _batch_size : std::min(static_cast(_batch_size), limit); + int64_t free_blocks_memory_usage = 0; + for (int i = 0; i < _num_parallel_instances; ++i) { + auto block = vectorized::Block::create_unique( + _output_tuple_desc->slots(), real_block_size, true /*ignore invalid slots*/); + free_blocks_memory_usage += block->allocated_bytes(); + _colocate_mutable_blocks.emplace_back( + vectorized::MutableBlock::create_unique(block.get())); + _colocate_blocks.emplace_back(std::move(block)); + _colocate_block_mutexs.emplace_back(new std::mutex); } + _free_blocks_memory_usage->add(free_blocks_memory_usage); } bool has_enough_space_in_blocks_queue() const override { - return _current_used_bytes < _max_bytes_in_queue / 2 * _max_queue_size; + return _current_used_bytes < _max_bytes_in_queue / 2 * _num_parallel_instances; } void _dispose_coloate_blocks_not_in_queue() override { if (_need_colocate_distribute) { - for (int i = 0; i < _max_queue_size; ++i) { - std::scoped_lock s(*_colocate_block_mutexs[i], *_queue_mutexs[i]); + for (int i = 0; i < _num_parallel_instances; ++i) { if (_colocate_blocks[i] && !_colocate_blocks[i]->empty()) { _current_used_bytes += _colocate_blocks[i]->allocated_bytes(); - _blocks_queues[i].emplace_back(std::move(_colocate_blocks[i])); + _blocks_queues[i].enqueue(std::move(_colocate_blocks[i])); _colocate_mutable_blocks[i]->clear(); } } } } + vectorized::BlockUPtr get_free_block(bool* has_free_block, + bool get_not_empty_block = false) override { + { + vectorized::BlockUPtr block; + if (_free_blocks_queues.try_dequeue(block)) { + if (!get_not_empty_block || block->mem_reuse()) { + _total_free_block_num--; + _free_blocks_memory_usage->add(-block->allocated_bytes()); + return block; + } + } + } + *has_free_block = false; + + COUNTER_UPDATE(_newly_create_free_blocks_num, 1); + return vectorized::Block::create_unique(_real_tuple_desc->slots(), _batch_size, + true /*ignore invalid slots*/); + } + + void return_free_block(std::unique_ptr block) override { + block->clear_column_data(); + _free_blocks_memory_usage->add(block->allocated_bytes()); + _free_blocks_queues.enqueue(std::move(block)); + _total_free_block_num++; + } + + void _init_free_block(int pre_alloc_block_count, int real_block_size) override { + // The free blocks is used for final output block of scanners. + // So use _output_tuple_desc; + int64_t free_blocks_memory_usage = 0; + auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), real_block_size, + true /*ignore invalid slots*/); + free_blocks_memory_usage += block->allocated_bytes(); + _free_blocks_queues.enqueue(std::move(block)); + _total_free_block_num = pre_alloc_block_count; + _free_blocks_memory_usage->add(free_blocks_memory_usage); + } + + int cal_thread_slot_num_by_free_block_num() override { + // For pipeline engine, we don't promise `thread_slot_num` is exact (e.g. scanners to + // schedule may need more free blocks than available free blocks). + // This is because we don't want a heavy lock for free block queues. + int local_val = _total_free_block_num; + int thread_slot_num = local_val / _block_per_scanner; + thread_slot_num += (local_val % _block_per_scanner != 0); + thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); + if (thread_slot_num <= 0) { + thread_slot_num = 1; + } + return thread_slot_num; + } + private: - int _max_queue_size = 1; int _next_queue_to_feed = 0; - std::vector> _queue_mutexs; - std::vector> _blocks_queues; + std::vector> _blocks_queues; std::atomic_int64_t _current_used_bytes = 0; + std::atomic_int32_t _total_free_block_num = 0; const std::vector& _col_distribute_ids; const bool _need_colocate_distribute; @@ -183,6 +231,8 @@ private: std::vector> _colocate_mutable_blocks; std::vector> _colocate_block_mutexs; + moodycamel::ConcurrentQueue _free_blocks_queues; + void _add_rows_colocate_blocks(vectorized::Block* block, int loc, const std::vector& rows) { int row_wait_add = rows.size(); @@ -205,10 +255,7 @@ private: if (row_add == max_add) { _current_used_bytes += _colocate_blocks[loc]->allocated_bytes(); - { - std::lock_guard queue_l(*_queue_mutexs[loc]); - _blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc])); - } + { _blocks_queues[loc].enqueue(std::move(_colocate_blocks[loc])); } bool get_block_not_empty = true; _colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty); _colocate_mutable_blocks[loc]->set_muatable_columns( diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index b50aa62629..6cdd33c551 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -45,7 +45,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V const doris::TupleDescriptor* input_tuple_desc, const doris::TupleDescriptor* output_tuple_desc, const std::list& scanners_, int64_t limit_, - int64_t max_bytes_in_blocks_queue_) + int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances) : _state(state_), _parent(parent), _input_tuple_desc(input_tuple_desc), @@ -55,7 +55,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V limit(limit_), _max_bytes_in_queue(max_bytes_in_blocks_queue_), _scanner_scheduler(state_->exec_env()->scanner_scheduler()), - _scanners(scanners_) { + _scanners(scanners_), + _num_parallel_instances(num_parallel_instances) { ctx_id = UniqueId::gen_uid().to_string(); if (_scanners.empty()) { _is_finished = true; @@ -68,8 +69,11 @@ Status ScannerContext::init() { // 1. Calculate max concurrency // TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4 // should find a more reasonable value. - _max_thread_num = _parent->_shared_scan_opt ? config::doris_scanner_thread_pool_thread_num - : config::doris_scanner_thread_pool_thread_num / 4; + _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4; + if (_parent->_shared_scan_opt) { + DCHECK(_num_parallel_instances > 0); + _max_thread_num *= _num_parallel_instances; + } _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; DCHECK(_max_thread_num > 0); _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size()); @@ -81,6 +85,7 @@ Status ScannerContext::init() { _scanner_profile = _parent->_scanner_profile; _scanner_sched_counter = _parent->_scanner_sched_counter; _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter; + _scanner_ctx_sched_time = _parent->_scanner_ctx_sched_time; _free_blocks_memory_usage = _parent->_free_blocks_memory_usage; _newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num; _queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage; @@ -98,6 +103,9 @@ Status ScannerContext::init() { limit == -1 ? _batch_size : std::min(static_cast(_batch_size), limit); _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size; _free_blocks_capacity = _max_thread_num * _block_per_scanner; + auto pre_alloc_block_count = _max_thread_num * _block_per_scanner; + + _init_free_block(pre_alloc_block_count, real_block_size); #ifndef BE_TEST // 3. get thread token @@ -117,6 +125,19 @@ Status ScannerContext::init() { return Status::OK(); } +void ScannerContext::_init_free_block(int pre_alloc_block_count, int real_block_size) { + // The free blocks is used for final output block of scanners. + // So use _output_tuple_desc; + int64_t free_blocks_memory_usage = 0; + for (int i = 0; i < pre_alloc_block_count; ++i) { + auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), real_block_size, + true /*ignore invalid slots*/); + free_blocks_memory_usage += block->allocated_bytes(); + _free_blocks.emplace_back(std::move(block)); + } + _free_blocks_memory_usage->add(free_blocks_memory_usage); +} + vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block, bool get_block_not_empty) { { @@ -345,13 +366,8 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current int thread_slot_num = 0; { // If there are enough space in blocks queue, - // the scanner number depends on the _free_blocks_capacity - std::lock_guard f(_free_blocks_lock); - thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner; - thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); - if (thread_slot_num <= 0) { - thread_slot_num = 1; - } + // the scanner number depends on the _free_blocks numbers + thread_slot_num = cal_thread_slot_num_by_free_block_num(); } // 2. get #thread_slot_num scanners from ctx->scanners diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index e0bb78908f..84d8839bfe 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -61,13 +61,14 @@ public: ScannerContext(RuntimeState* state_, VScanNode* parent, const TupleDescriptor* input_tuple_desc, const TupleDescriptor* output_tuple_desc, const std::list& scanners_, int64_t limit_, - int64_t max_bytes_in_blocks_queue_); + int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 0); virtual ~ScannerContext() = default; - Status init(); + virtual Status init(); - vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false); - void return_free_block(std::unique_ptr block); + virtual vectorized::BlockUPtr get_free_block(bool* has_free_block, + bool get_not_empty_block = false); + virtual void return_free_block(std::unique_ptr block); // Append blocks from scanners to the blocks queue. virtual void append_blocks_to_queue(std::vector& blocks); @@ -126,19 +127,29 @@ public: RuntimeState* state() { return _state; } void incr_num_ctx_scheduling(int64_t num) { _scanner_ctx_sched_counter->update(num); } + void incr_ctx_scheduling_time(int64_t num) { _scanner_ctx_sched_time->update(num); } void incr_num_scanner_scheduling(int64_t num) { _scanner_sched_counter->update(num); } VScanNode* parent() { return _parent; } virtual bool empty_in_queue(int id); - virtual void set_max_queue_size(int max_queue_size) {}; - // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan virtual inline bool has_enough_space_in_blocks_queue() const { return _cur_bytes_in_queue < _max_bytes_in_queue / 2; } + virtual int cal_thread_slot_num_by_free_block_num() { + int thread_slot_num = 0; + std::lock_guard f(_free_blocks_lock); + thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner; + thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); + if (thread_slot_num <= 0) { + thread_slot_num = 1; + } + return thread_slot_num; + } + void reschedule_scanner_ctx(); // the unique id of this context @@ -153,6 +164,8 @@ private: protected: virtual void _dispose_coloate_blocks_not_in_queue() {} + virtual void _init_free_block(int pre_alloc_block_count, int real_block_size); + RuntimeState* _state; VScanNode* _parent; @@ -215,7 +228,7 @@ protected: // Here we record the number of ctx in the scheduling queue to clean up at the end. std::atomic_int32_t _num_scheduling_ctx = 0; // Num of unfinished scanners. Should be set in init() - int32_t _num_unfinished_scanners = 0; + std::atomic_int32_t _num_unfinished_scanners = 0; // Max number of scan thread for this scanner context. int32_t _max_thread_num = 0; // How many blocks a scanner can use in one task. @@ -224,7 +237,7 @@ protected: // The current bytes of blocks in blocks queue int64_t _cur_bytes_in_queue = 0; // The max limit bytes of blocks in blocks queue - int64_t _max_bytes_in_queue; + const int64_t _max_bytes_in_queue; doris::vectorized::ScannerScheduler* _scanner_scheduler; // List "scanners" saves all "unfinished" scanners. @@ -236,9 +249,12 @@ protected: std::vector _finished_scanner_runtime; std::vector _finished_scanner_rows_read; + const int _num_parallel_instances; + std::shared_ptr _scanner_profile; RuntimeProfile::Counter* _scanner_sched_counter = nullptr; RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr; + RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr; RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr; RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = nullptr; RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index a01a9e8b9b..91bed3ee2e 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -153,6 +153,9 @@ void ScannerScheduler::_schedule_thread(int queue_id) { } void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { + MonotonicStopWatch watch; + watch.reset(); + watch.start(); ctx->incr_num_ctx_scheduling(1); if (ctx->done()) { ctx->update_num_running(0, -1); @@ -257,6 +260,7 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { } } #endif + ctx->incr_ctx_scheduling_time(watch.elapsed_time()); } void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index bcd28ade46..2af6fc87c5 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -173,11 +173,11 @@ Status VScanNode::alloc_resource(RuntimeState* state) { if (_is_pipeline_scan) { if (_should_create_scanner) { - auto status = !_eos ? _prepare_scanners() : Status::OK(); + auto status = + !_eos ? _prepare_scanners(state->query_parallel_instance_num()) : Status::OK(); if (_scanner_ctx) { DCHECK(!_eos && _num_scanners->value() > 0); - _scanner_ctx->set_max_queue_size( - _shared_scan_opt ? std::max(state->query_parallel_instance_num(), 1) : 1); + RETURN_IF_ERROR(_scanner_ctx->init()); RETURN_IF_ERROR( _state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); } @@ -195,8 +195,10 @@ Status VScanNode::alloc_resource(RuntimeState* state) { return Status::WaitForScannerContext("Need wait for scanner context create"); } } else { - RETURN_IF_ERROR(!_eos ? _prepare_scanners() : Status::OK()); + RETURN_IF_ERROR(!_eos ? _prepare_scanners(state->query_parallel_instance_num()) + : Status::OK()); if (_scanner_ctx) { + RETURN_IF_ERROR(_scanner_ctx->init()); RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); } } @@ -280,6 +282,7 @@ Status VScanNode::_init_profile() { _scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, "ScannerBatchWaitTime"); _scanner_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerSchedCount", TUnit::UNIT); _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerCtxSchedCount", TUnit::UNIT); + _scanner_ctx_sched_time = ADD_TIMER(_scanner_profile, "ScannerCtxSchedTime"); _scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime"); _scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScannerCpuTime"); @@ -295,17 +298,18 @@ Status VScanNode::_init_profile() { return Status::OK(); } -Status VScanNode::_start_scanners(const std::list& scanners) { +Status VScanNode::_start_scanners(const std::list& scanners, + const int query_parallel_instance_num) { if (_is_pipeline_scan) { + int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1; _scanner_ctx = pipeline::PipScannerContext::create_shared( _state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(), - _state->scan_queue_mem_limit(), _col_distribute_ids); + _state->scan_queue_mem_limit(), _col_distribute_ids, max_queue_size); } else { _scanner_ctx = ScannerContext::create_shared(_state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(), _state->scan_queue_mem_limit()); } - RETURN_IF_ERROR(_scanner_ctx->init()); return Status::OK(); } @@ -1246,14 +1250,14 @@ VScanNode::PushDownType VScanNode::_should_push_down_in_predicate(VInPredicate* return PushDownType::ACCEPTABLE; } -Status VScanNode::_prepare_scanners() { +Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) { std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); if (scanners.empty()) { _eos = true; } else { COUNTER_SET(_num_scanners, static_cast(scanners.size())); - RETURN_IF_ERROR(_start_scanners(scanners)); + RETURN_IF_ERROR(_start_scanners(scanners, query_parallel_instance_num)); } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index c8b66d1267..112ca47b54 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -236,10 +236,11 @@ protected: // Only predicate on key column can be pushed down. virtual bool _is_key_column(const std::string& col_name) { return false; } - Status _prepare_scanners(); + Status _prepare_scanners(const int query_parallel_instance_num); bool _is_pipeline_scan = false; bool _shared_scan_opt = false; + // For load scan node, there should be both input and output tuple descriptor. // For query scan node, there is only output_tuple_desc. TupleId _input_tuple_id = -1; @@ -334,6 +335,7 @@ protected: RuntimeProfile::Counter* _scanner_sched_counter = nullptr; RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr; + RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr; RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr; // Num of newly created free blocks when running query @@ -417,7 +419,8 @@ private: const std::string& fn_name, int slot_ref_child = -1); // Submit the scanner to the thread pool and start execution - Status _start_scanners(const std::list& scanners); + Status _start_scanners(const std::list& scanners, + const int query_parallel_instance_num); }; } // namespace doris::vectorized