[scan](freeblocks) use ConcurrentQueue to replace vector for free blocks (#21241)

This commit is contained in:
Gabriel
2023-06-28 15:10:07 +08:00
committed by GitHub
parent a4fdf7324a
commit e348b9464e
3 changed files with 19 additions and 88 deletions

View File

@ -17,7 +17,6 @@
#pragma once
#include "concurrentqueue.h"
#include "runtime/descriptors.h"
#include "scanner_context.h"
@ -167,63 +166,10 @@ public:
}
}
vectorized::BlockUPtr get_free_block(bool* has_free_block,
bool get_not_empty_block = false) override {
{
vectorized::BlockUPtr block;
if (_free_blocks_queues.try_dequeue(block)) {
if (!get_not_empty_block || block->mem_reuse()) {
_total_free_block_num--;
_free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
}
}
}
*has_free_block = false;
COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
return vectorized::Block::create_unique(_real_tuple_desc->slots(), _batch_size,
true /*ignore invalid slots*/);
}
void return_free_block(std::unique_ptr<vectorized::Block> block) override {
block->clear_column_data();
_free_blocks_memory_usage->add(block->allocated_bytes());
_free_blocks_queues.enqueue(std::move(block));
_total_free_block_num++;
}
void _init_free_block(int pre_alloc_block_count, int real_block_size) override {
// The free blocks is used for final output block of scanners.
// So use _output_tuple_desc;
int64_t free_blocks_memory_usage = 0;
auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), real_block_size,
true /*ignore invalid slots*/);
free_blocks_memory_usage += block->allocated_bytes();
_free_blocks_queues.enqueue(std::move(block));
_total_free_block_num = pre_alloc_block_count;
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}
int cal_thread_slot_num_by_free_block_num() override {
// For pipeline engine, we don't promise `thread_slot_num` is exact (e.g. scanners to
// schedule may need more free blocks than available free blocks).
// This is because we don't want a heavy lock for free block queues.
int local_val = _total_free_block_num;
int thread_slot_num = local_val / _block_per_scanner;
thread_slot_num += (local_val % _block_per_scanner != 0);
thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners);
if (thread_slot_num <= 0) {
thread_slot_num = 1;
}
return thread_slot_num;
}
private:
int _next_queue_to_feed = 0;
std::vector<moodycamel::ConcurrentQueue<vectorized::BlockUPtr>> _blocks_queues;
std::atomic_int64_t _current_used_bytes = 0;
std::atomic_int32_t _total_free_block_num = 0;
const std::vector<int>& _col_distribute_ids;
const bool _need_colocate_distribute;
@ -231,8 +177,6 @@ private:
std::vector<std::unique_ptr<vectorized::MutableBlock>> _colocate_mutable_blocks;
std::vector<std::unique_ptr<std::mutex>> _colocate_block_mutexs;
moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks_queues;
void _add_rows_colocate_blocks(vectorized::Block* block, int loc,
const std::vector<int>& rows) {
int row_wait_add = rows.size();

View File

@ -133,28 +133,19 @@ void ScannerContext::_init_free_block(int pre_alloc_block_count, int real_block_
auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), real_block_size,
true /*ignore invalid slots*/);
free_blocks_memory_usage += block->allocated_bytes();
_free_blocks.emplace_back(std::move(block));
_free_blocks.enqueue(std::move(block));
}
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}
vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
bool get_block_not_empty) {
{
std::lock_guard l(_free_blocks_lock);
*has_free_block = _free_blocks_capacity > 0;
// Always reduce _free_blocks_capacity by one since we always return a block
if (_free_blocks_capacity > 0) {
--_free_blocks_capacity;
}
if (!_free_blocks.empty()) {
if (!get_block_not_empty || _free_blocks.back()->mem_reuse()) {
auto block = std::move(_free_blocks.back());
_free_blocks.pop_back();
_free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
}
vectorized::BlockUPtr block;
if (_free_blocks.try_dequeue(block)) {
if (!get_block_not_empty || block->mem_reuse()) {
_free_blocks_capacity--;
_free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
}
}
@ -166,8 +157,7 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block) {
block->clear_column_data();
_free_blocks_memory_usage->add(block->allocated_bytes());
std::lock_guard l(_free_blocks_lock);
_free_blocks.emplace_back(std::move(block));
_free_blocks.enqueue(std::move(block));
++_free_blocks_capacity;
}
@ -315,8 +305,6 @@ void ScannerContext::clear_and_join(VScanNode* node, RuntimeState* state) {
_close_and_clear_scanners(node, state);
_blocks_queue.clear();
std::unique_lock lock(_free_blocks_lock);
_free_blocks.clear();
}
bool ScannerContext::no_schedule() {
@ -331,8 +319,9 @@ std::string ScannerContext::debug_string() {
" limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {}, _max_thread_num: {},"
" _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}",
ctx_id, _scanners.size(), _blocks_queue.size(), _process_status.ok(), _should_stop,
_is_finished, _free_blocks.size(), limit, _num_running_scanners, _num_scheduling_ctx,
_max_thread_num, _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue);
_is_finished, _free_blocks.size_approx(), limit, _num_running_scanners,
_num_scheduling_ctx, _max_thread_num, _block_per_scanner, _cur_bytes_in_queue,
_max_bytes_in_queue);
}
void ScannerContext::reschedule_scanner_ctx() {

View File

@ -29,6 +29,7 @@
#include "common/factory_creator.h"
#include "common/status.h"
#include "concurrentqueue.h"
#include "util/lock.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
@ -70,9 +71,8 @@ public:
virtual ~ScannerContext() = default;
virtual Status init();
virtual vectorized::BlockUPtr get_free_block(bool* has_free_block,
bool get_not_empty_block = false);
virtual void return_free_block(std::unique_ptr<vectorized::Block> block);
vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false);
void return_free_block(std::unique_ptr<vectorized::Block> block);
// Append blocks from scanners to the blocks queue.
virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks);
@ -124,7 +124,7 @@ public:
void clear_and_join(VScanNode* node, RuntimeState* state);
virtual bool no_schedule();
bool no_schedule();
std::string debug_string();
@ -143,9 +143,8 @@ public:
return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
}
virtual int cal_thread_slot_num_by_free_block_num() {
int cal_thread_slot_num_by_free_block_num() {
int thread_slot_num = 0;
std::lock_guard f(_free_blocks_lock);
thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner;
thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners);
if (thread_slot_num <= 0) {
@ -169,7 +168,7 @@ private:
protected:
virtual void _dispose_coloate_blocks_not_in_queue() {}
virtual void _init_free_block(int pre_alloc_block_count, int real_block_size);
void _init_free_block(int pre_alloc_block_count, int real_block_size);
RuntimeState* _state;
VScanNode* _parent;
@ -213,12 +212,11 @@ protected:
std::atomic_bool _is_finished = false;
// Lazy-allocated blocks for all scanners to share, for memory reuse.
doris::Mutex _free_blocks_lock;
std::vector<vectorized::BlockUPtr> _free_blocks;
moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
// The current number of free blocks available to the scanners.
// Used to limit the memory usage of the scanner.
// NOTE: this is NOT the size of `_free_blocks`.
int32_t _free_blocks_capacity = 0;
std::atomic_int32_t _free_blocks_capacity = 0;
int _batch_size;
// The limit from SQL's limit clause