[Improvement](pipeline) Improve shared scan performance (#20785)

This commit is contained in:
Gabriel
2023-06-21 14:36:05 +08:00
committed by GitHub
parent 5f0bb49d46
commit 81abdeffbc
6 changed files with 166 additions and 76 deletions

View File

@ -17,6 +17,7 @@
#pragma once
#include "concurrentqueue.h"
#include "runtime/descriptors.h"
#include "scanner_context.h"
@ -32,9 +33,11 @@ public:
const TupleDescriptor* input_tuple_desc,
const TupleDescriptor* output_tuple_desc,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit,
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids)
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids,
const int num_parallel_instances)
: vectorized::ScannerContext(state, parent, input_tuple_desc, output_tuple_desc,
scanners, limit, max_bytes_in_blocks_queue),
scanners, limit, max_bytes_in_blocks_queue,
num_parallel_instances),
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}
@ -52,11 +55,7 @@ public:
}
{
std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
if (!_blocks_queues[id].empty()) {
*block = std::move(_blocks_queues[id].front());
_blocks_queues[id].pop_front();
} else {
if (!_blocks_queues[id].try_dequeue(*block)) {
*eos = _is_finished || _should_stop;
return Status::OK();
}
@ -69,7 +68,7 @@ public:
bool done() override { return _is_finished || _should_stop || _status_error; }
void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) override {
const int queue_size = _queue_mutexs.size();
const int queue_size = _blocks_queues.size();
const int block_size = blocks.size();
int64_t local_bytes = 0;
@ -78,7 +77,7 @@ public:
for (const auto& block : blocks) {
// vectorized calculate hash
int rows = block->rows();
const auto element_size = _max_queue_size;
const auto element_size = _num_parallel_instances;
hash_vals.resize(rows);
std::fill(hash_vals.begin(), hash_vals.end(), 0);
auto* __restrict hashes = hash_vals.data();
@ -113,9 +112,8 @@ public:
for (int i = 0; i < queue_size && i < block_size; ++i) {
int queue = _next_queue_to_feed;
{
std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
for (int j = i; j < block_size; j += queue_size) {
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
_blocks_queues[queue].enqueue(std::move(blocks[j]));
}
}
_next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
@ -124,58 +122,108 @@ public:
_current_used_bytes += local_bytes;
}
bool empty_in_queue(int id) override {
std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
return _blocks_queues[id].empty();
bool empty_in_queue(int id) override { return _blocks_queues[id].size_approx() == 0; }
Status init() override {
for (int i = 0; i < _num_parallel_instances; ++i) {
_blocks_queues.emplace_back(moodycamel::ConcurrentQueue<vectorized::BlockUPtr>());
}
RETURN_IF_ERROR(ScannerContext::init());
if (_need_colocate_distribute) {
_init_colocate_block();
}
return Status::OK();
}
void set_max_queue_size(const int max_queue_size) override {
_max_queue_size = max_queue_size;
for (int i = 0; i < max_queue_size; ++i) {
_queue_mutexs.emplace_back(new std::mutex);
_blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
}
if (_need_colocate_distribute) {
int real_block_size =
limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit);
int64_t free_blocks_memory_usage = 0;
for (int i = 0; i < _max_queue_size; ++i) {
auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(),
real_block_size,
true /*ignore invalid slots*/);
free_blocks_memory_usage += block->allocated_bytes();
_colocate_mutable_blocks.emplace_back(
vectorized::MutableBlock::create_unique(block.get()));
_colocate_blocks.emplace_back(std::move(block));
_colocate_block_mutexs.emplace_back(new std::mutex);
}
_free_blocks_memory_usage->add(free_blocks_memory_usage);
void _init_colocate_block() {
int real_block_size =
limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit);
int64_t free_blocks_memory_usage = 0;
for (int i = 0; i < _num_parallel_instances; ++i) {
auto block = vectorized::Block::create_unique(
_output_tuple_desc->slots(), real_block_size, true /*ignore invalid slots*/);
free_blocks_memory_usage += block->allocated_bytes();
_colocate_mutable_blocks.emplace_back(
vectorized::MutableBlock::create_unique(block.get()));
_colocate_blocks.emplace_back(std::move(block));
_colocate_block_mutexs.emplace_back(new std::mutex);
}
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}
bool has_enough_space_in_blocks_queue() const override {
return _current_used_bytes < _max_bytes_in_queue / 2 * _max_queue_size;
return _current_used_bytes < _max_bytes_in_queue / 2 * _num_parallel_instances;
}
void _dispose_coloate_blocks_not_in_queue() override {
if (_need_colocate_distribute) {
for (int i = 0; i < _max_queue_size; ++i) {
std::scoped_lock s(*_colocate_block_mutexs[i], *_queue_mutexs[i]);
for (int i = 0; i < _num_parallel_instances; ++i) {
if (_colocate_blocks[i] && !_colocate_blocks[i]->empty()) {
_current_used_bytes += _colocate_blocks[i]->allocated_bytes();
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
_blocks_queues[i].enqueue(std::move(_colocate_blocks[i]));
_colocate_mutable_blocks[i]->clear();
}
}
}
}
vectorized::BlockUPtr get_free_block(bool* has_free_block,
bool get_not_empty_block = false) override {
{
vectorized::BlockUPtr block;
if (_free_blocks_queues.try_dequeue(block)) {
if (!get_not_empty_block || block->mem_reuse()) {
_total_free_block_num--;
_free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
}
}
}
*has_free_block = false;
COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
return vectorized::Block::create_unique(_real_tuple_desc->slots(), _batch_size,
true /*ignore invalid slots*/);
}
void return_free_block(std::unique_ptr<vectorized::Block> block) override {
block->clear_column_data();
_free_blocks_memory_usage->add(block->allocated_bytes());
_free_blocks_queues.enqueue(std::move(block));
_total_free_block_num++;
}
void _init_free_block(int pre_alloc_block_count, int real_block_size) override {
// The free blocks is used for final output block of scanners.
// So use _output_tuple_desc;
int64_t free_blocks_memory_usage = 0;
auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), real_block_size,
true /*ignore invalid slots*/);
free_blocks_memory_usage += block->allocated_bytes();
_free_blocks_queues.enqueue(std::move(block));
_total_free_block_num = pre_alloc_block_count;
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}
int cal_thread_slot_num_by_free_block_num() override {
// For pipeline engine, we don't promise `thread_slot_num` is exact (e.g. scanners to
// schedule may need more free blocks than available free blocks).
// This is because we don't want a heavy lock for free block queues.
int local_val = _total_free_block_num;
int thread_slot_num = local_val / _block_per_scanner;
thread_slot_num += (local_val % _block_per_scanner != 0);
thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners);
if (thread_slot_num <= 0) {
thread_slot_num = 1;
}
return thread_slot_num;
}
private:
int _max_queue_size = 1;
int _next_queue_to_feed = 0;
std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
std::vector<moodycamel::ConcurrentQueue<vectorized::BlockUPtr>> _blocks_queues;
std::atomic_int64_t _current_used_bytes = 0;
std::atomic_int32_t _total_free_block_num = 0;
const std::vector<int>& _col_distribute_ids;
const bool _need_colocate_distribute;
@ -183,6 +231,8 @@ private:
std::vector<std::unique_ptr<vectorized::MutableBlock>> _colocate_mutable_blocks;
std::vector<std::unique_ptr<std::mutex>> _colocate_block_mutexs;
moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks_queues;
void _add_rows_colocate_blocks(vectorized::Block* block, int loc,
const std::vector<int>& rows) {
int row_wait_add = rows.size();
@ -205,10 +255,7 @@ private:
if (row_add == max_add) {
_current_used_bytes += _colocate_blocks[loc]->allocated_bytes();
{
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
{ _blocks_queues[loc].enqueue(std::move(_colocate_blocks[loc])); }
bool get_block_not_empty = true;
_colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty);
_colocate_mutable_blocks[loc]->set_muatable_columns(

View File

@ -45,7 +45,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V
const doris::TupleDescriptor* input_tuple_desc,
const doris::TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_)
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances)
: _state(state_),
_parent(parent),
_input_tuple_desc(input_tuple_desc),
@ -55,7 +55,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V
limit(limit_),
_max_bytes_in_queue(max_bytes_in_blocks_queue_),
_scanner_scheduler(state_->exec_env()->scanner_scheduler()),
_scanners(scanners_) {
_scanners(scanners_),
_num_parallel_instances(num_parallel_instances) {
ctx_id = UniqueId::gen_uid().to_string();
if (_scanners.empty()) {
_is_finished = true;
@ -68,8 +69,11 @@ Status ScannerContext::init() {
// 1. Calculate max concurrency
// TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4
// should find a more reasonable value.
_max_thread_num = _parent->_shared_scan_opt ? config::doris_scanner_thread_pool_thread_num
: config::doris_scanner_thread_pool_thread_num / 4;
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
if (_parent->_shared_scan_opt) {
DCHECK(_num_parallel_instances > 0);
_max_thread_num *= _num_parallel_instances;
}
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
DCHECK(_max_thread_num > 0);
_max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
@ -81,6 +85,7 @@ Status ScannerContext::init() {
_scanner_profile = _parent->_scanner_profile;
_scanner_sched_counter = _parent->_scanner_sched_counter;
_scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
_scanner_ctx_sched_time = _parent->_scanner_ctx_sched_time;
_free_blocks_memory_usage = _parent->_free_blocks_memory_usage;
_newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num;
_queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage;
@ -98,6 +103,9 @@ Status ScannerContext::init() {
limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit);
_block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size;
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
auto pre_alloc_block_count = _max_thread_num * _block_per_scanner;
_init_free_block(pre_alloc_block_count, real_block_size);
#ifndef BE_TEST
// 3. get thread token
@ -117,6 +125,19 @@ Status ScannerContext::init() {
return Status::OK();
}
void ScannerContext::_init_free_block(int pre_alloc_block_count, int real_block_size) {
// The free blocks is used for final output block of scanners.
// So use _output_tuple_desc;
int64_t free_blocks_memory_usage = 0;
for (int i = 0; i < pre_alloc_block_count; ++i) {
auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), real_block_size,
true /*ignore invalid slots*/);
free_blocks_memory_usage += block->allocated_bytes();
_free_blocks.emplace_back(std::move(block));
}
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}
vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
bool get_block_not_empty) {
{
@ -345,13 +366,8 @@ void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
int thread_slot_num = 0;
{
// If there are enough space in blocks queue,
// the scanner number depends on the _free_blocks_capacity
std::lock_guard f(_free_blocks_lock);
thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner;
thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners);
if (thread_slot_num <= 0) {
thread_slot_num = 1;
}
// the scanner number depends on the _free_blocks numbers
thread_slot_num = cal_thread_slot_num_by_free_block_num();
}
// 2. get #thread_slot_num scanners from ctx->scanners

View File

@ -61,13 +61,14 @@ public:
ScannerContext(RuntimeState* state_, VScanNode* parent, const TupleDescriptor* input_tuple_desc,
const TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_);
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 0);
virtual ~ScannerContext() = default;
Status init();
virtual Status init();
vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false);
void return_free_block(std::unique_ptr<vectorized::Block> block);
virtual vectorized::BlockUPtr get_free_block(bool* has_free_block,
bool get_not_empty_block = false);
virtual void return_free_block(std::unique_ptr<vectorized::Block> block);
// Append blocks from scanners to the blocks queue.
virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks);
@ -126,19 +127,29 @@ public:
RuntimeState* state() { return _state; }
void incr_num_ctx_scheduling(int64_t num) { _scanner_ctx_sched_counter->update(num); }
void incr_ctx_scheduling_time(int64_t num) { _scanner_ctx_sched_time->update(num); }
void incr_num_scanner_scheduling(int64_t num) { _scanner_sched_counter->update(num); }
VScanNode* parent() { return _parent; }
virtual bool empty_in_queue(int id);
virtual void set_max_queue_size(int max_queue_size) {};
// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan
virtual inline bool has_enough_space_in_blocks_queue() const {
return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
}
virtual int cal_thread_slot_num_by_free_block_num() {
int thread_slot_num = 0;
std::lock_guard f(_free_blocks_lock);
thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner;
thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners);
if (thread_slot_num <= 0) {
thread_slot_num = 1;
}
return thread_slot_num;
}
void reschedule_scanner_ctx();
// the unique id of this context
@ -153,6 +164,8 @@ private:
protected:
virtual void _dispose_coloate_blocks_not_in_queue() {}
virtual void _init_free_block(int pre_alloc_block_count, int real_block_size);
RuntimeState* _state;
VScanNode* _parent;
@ -215,7 +228,7 @@ protected:
// Here we record the number of ctx in the scheduling queue to clean up at the end.
std::atomic_int32_t _num_scheduling_ctx = 0;
// Num of unfinished scanners. Should be set in init()
int32_t _num_unfinished_scanners = 0;
std::atomic_int32_t _num_unfinished_scanners = 0;
// Max number of scan thread for this scanner context.
int32_t _max_thread_num = 0;
// How many blocks a scanner can use in one task.
@ -224,7 +237,7 @@ protected:
// The current bytes of blocks in blocks queue
int64_t _cur_bytes_in_queue = 0;
// The max limit bytes of blocks in blocks queue
int64_t _max_bytes_in_queue;
const int64_t _max_bytes_in_queue;
doris::vectorized::ScannerScheduler* _scanner_scheduler;
// List "scanners" saves all "unfinished" scanners.
@ -236,9 +249,12 @@ protected:
std::vector<int64_t> _finished_scanner_runtime;
std::vector<int64_t> _finished_scanner_rows_read;
const int _num_parallel_instances;
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;

View File

@ -153,6 +153,9 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
}
void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
MonotonicStopWatch watch;
watch.reset();
watch.start();
ctx->incr_num_ctx_scheduling(1);
if (ctx->done()) {
ctx->update_num_running(0, -1);
@ -257,6 +260,7 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
}
}
#endif
ctx->incr_ctx_scheduling_time(watch.elapsed_time());
}
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx,

View File

@ -173,11 +173,11 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
if (_is_pipeline_scan) {
if (_should_create_scanner) {
auto status = !_eos ? _prepare_scanners() : Status::OK();
auto status =
!_eos ? _prepare_scanners(state->query_parallel_instance_num()) : Status::OK();
if (_scanner_ctx) {
DCHECK(!_eos && _num_scanners->value() > 0);
_scanner_ctx->set_max_queue_size(
_shared_scan_opt ? std::max(state->query_parallel_instance_num(), 1) : 1);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(
_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
}
@ -195,8 +195,10 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
return Status::WaitForScannerContext("Need wait for scanner context create");
}
} else {
RETURN_IF_ERROR(!_eos ? _prepare_scanners() : Status::OK());
RETURN_IF_ERROR(!_eos ? _prepare_scanners(state->query_parallel_instance_num())
: Status::OK());
if (_scanner_ctx) {
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
}
}
@ -280,6 +282,7 @@ Status VScanNode::_init_profile() {
_scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, "ScannerBatchWaitTime");
_scanner_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerSchedCount", TUnit::UNIT);
_scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerCtxSchedCount", TUnit::UNIT);
_scanner_ctx_sched_time = ADD_TIMER(_scanner_profile, "ScannerCtxSchedTime");
_scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");
_scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScannerCpuTime");
@ -295,17 +298,18 @@ Status VScanNode::_init_profile() {
return Status::OK();
}
Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners) {
Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners,
const int query_parallel_instance_num) {
if (_is_pipeline_scan) {
int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1;
_scanner_ctx = pipeline::PipScannerContext::create_shared(
_state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(),
_state->scan_queue_mem_limit(), _col_distribute_ids);
_state->scan_queue_mem_limit(), _col_distribute_ids, max_queue_size);
} else {
_scanner_ctx =
ScannerContext::create_shared(_state, this, _input_tuple_desc, _output_tuple_desc,
scanners, limit(), _state->scan_queue_mem_limit());
}
RETURN_IF_ERROR(_scanner_ctx->init());
return Status::OK();
}
@ -1246,14 +1250,14 @@ VScanNode::PushDownType VScanNode::_should_push_down_in_predicate(VInPredicate*
return PushDownType::ACCEPTABLE;
}
Status VScanNode::_prepare_scanners() {
Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) {
std::list<VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
if (scanners.empty()) {
_eos = true;
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(scanners));
RETURN_IF_ERROR(_start_scanners(scanners, query_parallel_instance_num));
}
return Status::OK();
}

View File

@ -236,10 +236,11 @@ protected:
// Only predicate on key column can be pushed down.
virtual bool _is_key_column(const std::string& col_name) { return false; }
Status _prepare_scanners();
Status _prepare_scanners(const int query_parallel_instance_num);
bool _is_pipeline_scan = false;
bool _shared_scan_opt = false;
// For load scan node, there should be both input and output tuple descriptor.
// For query scan node, there is only output_tuple_desc.
TupleId _input_tuple_id = -1;
@ -334,6 +335,7 @@ protected:
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
// Num of newly created free blocks when running query
@ -417,7 +419,8 @@ private:
const std::string& fn_name, int slot_ref_child = -1);
// Submit the scanner to the thread pool and start execution
Status _start_scanners(const std::list<VScannerSPtr>& scanners);
Status _start_scanners(const std::list<VScannerSPtr>& scanners,
const int query_parallel_instance_num);
};
} // namespace doris::vectorized