From e348b9464eef18c834014bdcbdd62092a1e0ee9f Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 28 Jun 2023 15:10:07 +0800 Subject: [PATCH] [scan](freeblocks) use ConcurrentQueue to replace vector for free blocks (#21241) --- be/src/vec/exec/scan/pip_scanner_context.h | 56 ---------------------- be/src/vec/exec/scan/scanner_context.cpp | 33 +++++-------- be/src/vec/exec/scan/scanner_context.h | 18 ++++--- 3 files changed, 19 insertions(+), 88 deletions(-) diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 7edf634c17..731c3bb926 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -17,7 +17,6 @@ #pragma once -#include "concurrentqueue.h" #include "runtime/descriptors.h" #include "scanner_context.h" @@ -167,63 +166,10 @@ public: } } - vectorized::BlockUPtr get_free_block(bool* has_free_block, - bool get_not_empty_block = false) override { - { - vectorized::BlockUPtr block; - if (_free_blocks_queues.try_dequeue(block)) { - if (!get_not_empty_block || block->mem_reuse()) { - _total_free_block_num--; - _free_blocks_memory_usage->add(-block->allocated_bytes()); - return block; - } - } - } - *has_free_block = false; - - COUNTER_UPDATE(_newly_create_free_blocks_num, 1); - return vectorized::Block::create_unique(_real_tuple_desc->slots(), _batch_size, - true /*ignore invalid slots*/); - } - - void return_free_block(std::unique_ptr block) override { - block->clear_column_data(); - _free_blocks_memory_usage->add(block->allocated_bytes()); - _free_blocks_queues.enqueue(std::move(block)); - _total_free_block_num++; - } - - void _init_free_block(int pre_alloc_block_count, int real_block_size) override { - // The free blocks is used for final output block of scanners. - // So use _output_tuple_desc; - int64_t free_blocks_memory_usage = 0; - auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), real_block_size, - true /*ignore invalid slots*/); - free_blocks_memory_usage += block->allocated_bytes(); - _free_blocks_queues.enqueue(std::move(block)); - _total_free_block_num = pre_alloc_block_count; - _free_blocks_memory_usage->add(free_blocks_memory_usage); - } - - int cal_thread_slot_num_by_free_block_num() override { - // For pipeline engine, we don't promise `thread_slot_num` is exact (e.g. scanners to - // schedule may need more free blocks than available free blocks). - // This is because we don't want a heavy lock for free block queues. - int local_val = _total_free_block_num; - int thread_slot_num = local_val / _block_per_scanner; - thread_slot_num += (local_val % _block_per_scanner != 0); - thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); - if (thread_slot_num <= 0) { - thread_slot_num = 1; - } - return thread_slot_num; - } - private: int _next_queue_to_feed = 0; std::vector> _blocks_queues; std::atomic_int64_t _current_used_bytes = 0; - std::atomic_int32_t _total_free_block_num = 0; const std::vector& _col_distribute_ids; const bool _need_colocate_distribute; @@ -231,8 +177,6 @@ private: std::vector> _colocate_mutable_blocks; std::vector> _colocate_block_mutexs; - moodycamel::ConcurrentQueue _free_blocks_queues; - void _add_rows_colocate_blocks(vectorized::Block* block, int loc, const std::vector& rows) { int row_wait_add = rows.size(); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index fed8215ad2..b8e5847a15 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -133,28 +133,19 @@ void ScannerContext::_init_free_block(int pre_alloc_block_count, int real_block_ auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), real_block_size, true /*ignore invalid slots*/); free_blocks_memory_usage += block->allocated_bytes(); - _free_blocks.emplace_back(std::move(block)); + _free_blocks.enqueue(std::move(block)); } _free_blocks_memory_usage->add(free_blocks_memory_usage); } vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block, bool get_block_not_empty) { - { - std::lock_guard l(_free_blocks_lock); - *has_free_block = _free_blocks_capacity > 0; - // Always reduce _free_blocks_capacity by one since we always return a block - if (_free_blocks_capacity > 0) { - --_free_blocks_capacity; - } - - if (!_free_blocks.empty()) { - if (!get_block_not_empty || _free_blocks.back()->mem_reuse()) { - auto block = std::move(_free_blocks.back()); - _free_blocks.pop_back(); - _free_blocks_memory_usage->add(-block->allocated_bytes()); - return block; - } + vectorized::BlockUPtr block; + if (_free_blocks.try_dequeue(block)) { + if (!get_block_not_empty || block->mem_reuse()) { + _free_blocks_capacity--; + _free_blocks_memory_usage->add(-block->allocated_bytes()); + return block; } } @@ -166,8 +157,7 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block, void ScannerContext::return_free_block(std::unique_ptr block) { block->clear_column_data(); _free_blocks_memory_usage->add(block->allocated_bytes()); - std::lock_guard l(_free_blocks_lock); - _free_blocks.emplace_back(std::move(block)); + _free_blocks.enqueue(std::move(block)); ++_free_blocks_capacity; } @@ -315,8 +305,6 @@ void ScannerContext::clear_and_join(VScanNode* node, RuntimeState* state) { _close_and_clear_scanners(node, state); _blocks_queue.clear(); - std::unique_lock lock(_free_blocks_lock); - _free_blocks.clear(); } bool ScannerContext::no_schedule() { @@ -331,8 +319,9 @@ std::string ScannerContext::debug_string() { " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {}, _max_thread_num: {}," " _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}", ctx_id, _scanners.size(), _blocks_queue.size(), _process_status.ok(), _should_stop, - _is_finished, _free_blocks.size(), limit, _num_running_scanners, _num_scheduling_ctx, - _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue); + _is_finished, _free_blocks.size_approx(), limit, _num_running_scanners, + _num_scheduling_ctx, _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, + _max_bytes_in_queue); } void ScannerContext::reschedule_scanner_ctx() { diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 5986384edd..fa264c756a 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -29,6 +29,7 @@ #include "common/factory_creator.h" #include "common/status.h" +#include "concurrentqueue.h" #include "util/lock.h" #include "util/runtime_profile.h" #include "vec/core/block.h" @@ -70,9 +71,8 @@ public: virtual ~ScannerContext() = default; virtual Status init(); - virtual vectorized::BlockUPtr get_free_block(bool* has_free_block, - bool get_not_empty_block = false); - virtual void return_free_block(std::unique_ptr block); + vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false); + void return_free_block(std::unique_ptr block); // Append blocks from scanners to the blocks queue. virtual void append_blocks_to_queue(std::vector& blocks); @@ -124,7 +124,7 @@ public: void clear_and_join(VScanNode* node, RuntimeState* state); - virtual bool no_schedule(); + bool no_schedule(); std::string debug_string(); @@ -143,9 +143,8 @@ public: return _cur_bytes_in_queue < _max_bytes_in_queue / 2; } - virtual int cal_thread_slot_num_by_free_block_num() { + int cal_thread_slot_num_by_free_block_num() { int thread_slot_num = 0; - std::lock_guard f(_free_blocks_lock); thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner; thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); if (thread_slot_num <= 0) { @@ -169,7 +168,7 @@ private: protected: virtual void _dispose_coloate_blocks_not_in_queue() {} - virtual void _init_free_block(int pre_alloc_block_count, int real_block_size); + void _init_free_block(int pre_alloc_block_count, int real_block_size); RuntimeState* _state; VScanNode* _parent; @@ -213,12 +212,11 @@ protected: std::atomic_bool _is_finished = false; // Lazy-allocated blocks for all scanners to share, for memory reuse. - doris::Mutex _free_blocks_lock; - std::vector _free_blocks; + moodycamel::ConcurrentQueue _free_blocks; // The current number of free blocks available to the scanners. // Used to limit the memory usage of the scanner. // NOTE: this is NOT the size of `_free_blocks`. - int32_t _free_blocks_capacity = 0; + std::atomic_int32_t _free_blocks_capacity = 0; int _batch_size; // The limit from SQL's limit clause