[refactor](scanner) refactoring and optimizing scanner scheduling (#30746)

This commit is contained in:
Ashin Gau
2024-02-07 18:08:24 +08:00
committed by yiguolei
parent 16cdab816a
commit 366a6792bf
14 changed files with 525 additions and 961 deletions

View File

@ -38,8 +38,10 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
}
auto& p = _parent->cast<FileScanOperatorX>();
size_t shard_num =
std::min<size_t>(config::doris_scanner_thread_pool_thread_num, _scan_ranges.size());
size_t shard_num = std::min<size_t>(
config::doris_scanner_thread_pool_thread_num / state()->query_parallel_instance_num(),
_scan_ranges.size());
shard_num = std::max(shard_num, (size_t)1);
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
for (auto& scan_range : _scan_ranges) {
std::unique_ptr<vectorized::VFileScanner> scanner = vectorized::VFileScanner::create_unique(
@ -62,7 +64,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
int max_scanners =
config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num();
max_scanners = max_scanners == 0 ? 1 : max_scanners;
max_scanners = std::max(std::max(max_scanners, state->parallel_scan_max_scanners_count()), 1);
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
max_scanners = 1;

View File

@ -160,7 +160,6 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
if (_scanner_ctx) {
DCHECK(!_eos && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
}
_opened = true;
return status;
@ -1288,16 +1287,14 @@ Status ScanLocalState<Derived>::_init_profile() {
profile()->add_child(_scanner_profile.get(), true, nullptr);
_memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, "MemoryUsage", 1);
_queued_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter(
"QueuedBlocks", TUnit::BYTES, "MemoryUsage", 1);
_free_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage", 1);
_newly_create_free_blocks_num =
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT);
_scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, "NumScaleUpScanners", TUnit::UNIT);
// time of transfer thread to wait for block from scan thread
_scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, "ScannerBatchWaitTime");
_scanner_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerSchedCount", TUnit::UNIT);
_scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerCtxSchedCount", TUnit::UNIT);
_scanner_ctx_sched_time = ADD_TIMER(_scanner_profile, "ScannerCtxSchedTime");
_scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");
@ -1456,14 +1453,10 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
}};
if (state->is_cancelled()) {
// ISSUE: https://github.com/apache/doris/issues/16360
// _scanner_ctx may be null here, see: `VScanNode::alloc_resource` (_eos == null)
if (local_state._scanner_ctx) {
local_state._scanner_ctx->set_status_on_error(Status::Cancelled("query cancelled"));
return local_state._scanner_ctx->status();
} else {
return Status::Cancelled("query cancelled");
local_state._scanner_ctx->stop_scanners(state);
}
return Status::Cancelled("Query cancelled in ScanOperator");
}
if (local_state._eos) {
@ -1471,21 +1464,11 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
return Status::OK();
}
vectorized::BlockUPtr scan_block = nullptr;
bool eos = false;
RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state, &scan_block, &eos, 0));
if (eos) {
source_state = SourceState::FINISHED;
DCHECK(scan_block == nullptr);
return Status::OK();
}
// get scanner's block memory
block->swap(*scan_block);
local_state._scanner_ctx->return_free_block(std::move(scan_block));
RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state, block, &eos, 0));
local_state.reached_limit(block, source_state);
if (eos) {
if (eos || source_state == SourceState::FINISHED) {
source_state = SourceState::FINISHED;
// reach limit, stop the scanners.
local_state._scanner_ctx->stop_scanners(state);

View File

@ -142,7 +142,6 @@ protected:
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
@ -160,8 +159,8 @@ protected:
// time of filter output block from scanner
RuntimeProfile::Counter* _filter_timer = nullptr;
RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
// rows read from the scanner (including those discarded by (pre)filters)
RuntimeProfile::Counter* _rows_read_counter = nullptr;

View File

@ -128,7 +128,13 @@ public:
: _query_options.query_timeout;
}
int max_io_buffers() const { return _query_options.max_io_buffers; }
int num_scanner_threads() const { return _query_options.num_scanner_threads; }
int num_scanner_threads() const {
return _query_options.__isset.num_scanner_threads ? _query_options.num_scanner_threads : 0;
}
double scanner_scale_up_ratio() const {
return _query_options.__isset.scanner_scale_up_ratio ? _query_options.scanner_scale_up_ratio
: 0;
}
TQueryType::type query_type() const { return _query_options.query_type; }
int64_t timestamp_ms() const { return _timestamp_ms; }
int32_t nano_seconds() const { return _nano_seconds; }

View File

@ -62,7 +62,7 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
int max_scanners =
config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num();
max_scanners = max_scanners == 0 ? 1 : max_scanners;
max_scanners = std::max(std::max(max_scanners, state->parallel_scan_max_scanners_count()), 1);
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
max_scanners = 1;
@ -116,9 +116,10 @@ Status NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
return Status::OK();
}
// TODO: determine kv cache shard num
size_t shard_num =
std::min<size_t>(config::doris_scanner_thread_pool_thread_num, _scan_ranges.size());
size_t shard_num = std::min<size_t>(
config::doris_scanner_thread_pool_thread_num / _state->query_parallel_instance_num(),
_scan_ranges.size());
shard_num = std::max(shard_num, (size_t)1);
_kv_cache.reset(new ShardedKVCache(shard_num));
for (auto& scan_range : _scan_ranges) {
std::unique_ptr<VFileScanner> scanner =

View File

@ -36,129 +36,6 @@ public:
: vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor,
scanners, limit_, max_bytes_in_blocks_queue,
num_parallel_instances) {}
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos,
int id) override {
{
std::unique_lock l(_transfer_lock);
if (state->is_cancelled()) {
set_status_on_error(Status::Cancelled("cancelled"), false);
}
if (!status().ok()) {
return _process_status;
}
}
std::vector<vectorized::BlockUPtr> merge_blocks;
{
std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
// The pipeline maybe wake up by scanner.done. If there are still any data
// in the queue, should read the data first and then check if the scanner.done
// if done, then eos is returned to indicate that the scan operator finished.
if (_blocks_queues[id].empty()) {
*eos = done();
return Status::OK();
}
if (_process_status.is<ErrorCode::CANCELLED>()) {
*eos = true;
return Status::OK();
}
*block = std::move(_blocks_queues[id].front());
_blocks_queues[id].pop_front();
auto rows = (*block)->rows();
while (!_blocks_queues[id].empty()) {
const auto add_rows = (*_blocks_queues[id].front()).rows();
if (rows + add_rows < state->batch_size()) {
rows += add_rows;
merge_blocks.emplace_back(std::move(_blocks_queues[id].front()));
_blocks_queues[id].pop_front();
} else {
break;
}
}
if (_blocks_queues[id].empty()) {
this->reschedule_scanner_ctx();
}
}
_current_used_bytes -= (*block)->allocated_bytes();
if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
_current_used_bytes -= merge_block->allocated_bytes();
static_cast<void>(m.merge(*merge_block));
return_free_block(std::move(merge_block));
}
(*block)->set_columns(std::move(m.mutable_columns()));
}
// after return free blocks, should try to reschedule the scanner
if (should_be_scheduled()) {
this->reschedule_scanner_ctx();
}
return Status::OK();
}
void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) override {
const int queue_size = _blocks_queues.size();
const int block_size = blocks.size();
if (block_size == 0) {
return;
}
int64_t local_bytes = 0;
for (const auto& block : blocks) {
auto st = validate_block_schema(block.get());
if (!st.ok()) {
set_status_on_error(st, false);
}
local_bytes += block->allocated_bytes();
}
for (int i = 0; i < queue_size && i < block_size; ++i) {
int queue = _next_queue_to_feed;
{
std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
for (int j = i; j < block_size; j += queue_size) {
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
}
}
_next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
}
_current_used_bytes += local_bytes;
}
bool empty_in_queue(int id) override {
std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
return _blocks_queues[id].empty();
}
Status init() override {
for (int i = 0; i < _num_parallel_instances; ++i) {
_queue_mutexs.emplace_back(std::make_unique<std::mutex>());
_blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
}
return ScannerContext::init();
}
std::string debug_string() override {
auto res = ScannerContext::debug_string();
for (int i = 0; i < _blocks_queues.size(); ++i) {
res += " queue " + std::to_string(i) + ":size " +
std::to_string(_blocks_queues[i].size());
}
return res;
}
protected:
int _next_queue_to_feed = 0;
std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
std::atomic_int64_t _current_used_bytes = 0;
};
class PipXScannerContext final : public vectorized::ScannerContext {
@ -172,117 +49,38 @@ public:
int64_t limit_, int64_t max_bytes_in_blocks_queue,
std::shared_ptr<pipeline::ScanDependency> dependency)
: vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners,
limit_, max_bytes_in_blocks_queue, 1, local_state,
dependency) {}
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos,
int id) override {
if (_blocks_queue_buffered.empty()) {
std::unique_lock l(_transfer_lock);
if (state->is_cancelled()) {
set_status_on_error(Status::Cancelled("cancelled"), false);
}
if (!status().ok()) {
return _process_status;
}
if (_blocks_queue.empty()) {
*eos = done();
return Status::OK();
}
if (_process_status.is<ErrorCode::CANCELLED>()) {
*eos = true;
return Status::OK();
}
_blocks_queue_buffered = std::move(_blocks_queue);
}
// `get_block_from_queue` should not be called concurrently from multiple threads,
// so here no need to lock.
*block = std::move(_blocks_queue_buffered.front());
_blocks_queue_buffered.pop_front();
std::vector<vectorized::BlockUPtr> merge_blocks;
auto rows = (*block)->rows();
while (!_blocks_queue_buffered.empty()) {
const auto add_rows = (*_blocks_queue_buffered.front()).rows();
if (rows + add_rows < state->batch_size()) {
rows += add_rows;
merge_blocks.emplace_back(std::move(_blocks_queue_buffered.front()));
_blocks_queue_buffered.pop_front();
} else {
break;
}
}
if (_blocks_queue_buffered.empty()) {
std::unique_lock l(_transfer_lock);
if (_blocks_queue.empty()) {
this->reschedule_scanner_ctx();
_dependency->block();
} else {
_blocks_queue_buffered = std::move(_blocks_queue);
}
}
_cur_bytes_in_queue -= (*block)->allocated_bytes();
if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
_cur_bytes_in_queue -= merge_block->allocated_bytes();
static_cast<void>(m.merge(*merge_block));
if (merge_block->mem_reuse()) {
_free_blocks_buffered.emplace_back(std::move(merge_block));
}
}
(*block)->set_columns(std::move(m.mutable_columns()));
}
return_free_blocks();
// after return free blocks, should try to reschedule the scanner
if (should_be_scheduled()) {
this->reschedule_scanner_ctx();
}
return Status::OK();
limit_, max_bytes_in_blocks_queue, 1, local_state) {
_dependency = dependency;
}
void reschedule_scanner_ctx() override {
if (done()) {
return;
void append_block_to_queue(std::shared_ptr<vectorized::ScanTask> scan_task) override {
vectorized::ScannerContext::append_block_to_queue(scan_task);
if (_dependency) {
_dependency->set_ready();
}
auto state = _scanner_scheduler->submit(shared_from_this());
//todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times?
if (state.ok()) {
_num_scheduling_ctx++;
} else {
set_status_on_error(state, false);
}
Status get_block_from_queue(RuntimeState* state, vectorized::Block* block, bool* eos, int id,
bool wait = true) override {
Status st = vectorized::ScannerContext::get_block_from_queue(state, block, eos, id, wait);
std::lock_guard<std::mutex> l(_transfer_lock);
if (_blocks_queue.empty()) {
if (_dependency) {
_dependency->block();
}
}
return st;
}
protected:
void _set_scanner_done() override {
if (_dependency) {
_dependency->set_scanner_done();
}
}
private:
void return_free_blocks() {
if (_free_blocks_buffered.empty()) {
return;
}
size_t total_bytes = 0;
for (auto& block : _free_blocks_buffered) {
const auto bytes = block->allocated_bytes();
block->clear_column_data();
_estimated_block_bytes = std::max(bytes, (size_t)16);
total_bytes += bytes;
}
_free_blocks_memory_usage->add(total_bytes);
const auto count = _free_blocks_buffered.size();
_free_blocks.enqueue_bulk(std::make_move_iterator(_free_blocks_buffered.begin()), count);
_free_blocks_buffered.clear();
_serving_blocks_num -= count;
}
std::vector<vectorized::BlockUPtr> _free_blocks_buffered;
std::list<vectorized::BlockUPtr> _blocks_queue_buffered;
std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
};
} // namespace doris::pipeline

View File

@ -17,12 +17,10 @@
#include "scanner_context.h"
#include <bthread/bthread.h>
#include <fmt/format.h>
#include <gen_cpp/Metrics_types.h>
#include <glog/logging.h>
#include <algorithm>
#include <mutex>
#include <ostream>
#include <utility>
@ -31,64 +29,57 @@
#include "common/status.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "util/pretty_printer.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exec/scan/vscanner.h"
namespace doris::vectorized {
using namespace std::chrono_literals;
static bvar::Status<int64_t> g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0);
static bvar::Status<int64_t> g_num_running_scanners("doris_num_running_scanners", 0);
ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
int64_t limit_, int64_t max_bytes_in_blocks_queue,
const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state,
std::shared_ptr<pipeline::ScanDependency> dependency)
pipeline::ScanLocalStateBase* local_state)
: HasTaskExecutionCtx(state),
_state(state),
_parent(nullptr),
_local_state(local_state),
_output_tuple_desc(output_row_descriptor
? output_row_descriptor->tuple_descriptors().front()
: output_tuple_desc),
_output_row_descriptor(output_row_descriptor),
_process_status(Status::OK()),
_batch_size(state->batch_size()),
limit(limit_),
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
_scanners(scanners.begin(), scanners.end()),
_all_scanners(scanners.begin(), scanners.end()),
_num_parallel_instances(num_parallel_instances),
_dependency(dependency) {
_num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
if (_scanners.empty()) {
// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
if (scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
_scanners.enqueue_bulk(scanners.begin(), scanners.size());
if (limit < 0) {
limit = -1;
}
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
_max_thread_num = _state->num_scanner_threads() > 0
? _state->num_scanner_threads()
: config::doris_scanner_thread_pool_thread_num /
state->query_parallel_instance_num();
_max_thread_num *= num_parallel_instances;
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
DCHECK(_max_thread_num > 0);
_max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
_max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if ((_parent && _parent->should_run_serial()) ||
@ -104,45 +95,9 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS
int64_t limit_, int64_t max_bytes_in_blocks_queue,
const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state)
: HasTaskExecutionCtx(state),
_state(state),
_parent(parent),
_local_state(local_state),
_output_tuple_desc(output_row_descriptor
? output_row_descriptor->tuple_descriptors().front()
: output_tuple_desc),
_output_row_descriptor(output_row_descriptor),
_process_status(Status::OK()),
_batch_size(state->batch_size()),
limit(limit_),
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
_scanners(scanners.begin(), scanners.end()),
_all_scanners(scanners.begin(), scanners.end()),
_num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
if (_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
if (limit < 0) {
limit = -1;
}
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
_max_thread_num *= num_parallel_instances;
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
DCHECK(_max_thread_num > 0);
_max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if ((_parent && _parent->should_run_serial()) ||
(_local_state && _local_state->should_run_serial())) {
_max_thread_num = 1;
}
: ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, limit_,
max_bytes_in_blocks_queue, num_parallel_instances, local_state) {
_parent = parent;
}
// After init function call, should not access _parent
@ -150,43 +105,21 @@ Status ScannerContext::init() {
if (_parent) {
_scanner_profile = _parent->_scanner_profile;
_scanner_sched_counter = _parent->_scanner_sched_counter;
_scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
_scanner_ctx_sched_time = _parent->_scanner_ctx_sched_time;
_free_blocks_memory_usage = _parent->_free_blocks_memory_usage;
_newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num;
_queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage;
_scanner_wait_batch_timer = _parent->_scanner_wait_batch_timer;
_free_blocks_memory_usage_mark = _parent->_free_blocks_memory_usage;
_scanner_ctx_sched_time = _parent->_scanner_ctx_sched_time;
_scale_up_scanners_counter = _parent->_scale_up_scanners_counter;
} else {
_scanner_profile = _local_state->_scanner_profile;
_scanner_sched_counter = _local_state->_scanner_sched_counter;
_scanner_ctx_sched_counter = _local_state->_scanner_ctx_sched_counter;
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
_free_blocks_memory_usage = _local_state->_free_blocks_memory_usage;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_queued_blocks_memory_usage = _local_state->_queued_blocks_memory_usage;
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
_free_blocks_memory_usage_mark = _local_state->_free_blocks_memory_usage;
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
}
// 2. Calculate the number of free blocks that all scanners can use.
// The calculation logic is as follows:
// 1. Assuming that at most M rows can be scanned in one scan(config::doris_scanner_row_num),
// then figure out how many blocks are required for one scan(_block_per_scanner).
// 2. The maximum number of concurrency * the blocks required for one scan,
// that is, the number of blocks that all scanners can use.
auto doris_scanner_row_num =
limit == -1 ? config::doris_scanner_row_num
: std::min(static_cast<int64_t>(config::doris_scanner_row_num), limit);
int real_block_size =
limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit);
_block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size;
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
auto block = get_free_block();
_estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
int min_blocks = (config::min_bytes_in_scanner_queue + _estimated_block_bytes - 1) /
_estimated_block_bytes;
_free_blocks_capacity = std::max(_free_blocks_capacity, min_blocks);
return_free_block(std::move(block));
#ifndef BE_TEST
// 3. get thread token
if (_state->get_query_ctx()) {
@ -198,8 +131,6 @@ Status ScannerContext::init() {
}
#endif
_num_unfinished_scanners = _scanners.size();
if (_parent) {
COUNTER_SET(_parent->_max_scanner_thread_num, (int64_t)_max_thread_num);
_parent->_runtime_profile->add_info_string("UseSpecificThreadToken",
@ -210,6 +141,17 @@ Status ScannerContext::init() {
thread_token == nullptr ? "False" : "True");
}
// submit `_max_thread_num` running scanners to `ScannerScheduler`
// When a running scanners is finished, it will submit one of the remaining scanners.
for (int i = 0; i < _max_thread_num; ++i) {
std::weak_ptr<ScannerDelegate> next_scanner;
if (_scanners.try_dequeue(next_scanner)) {
vectorized::BlockUPtr block = get_free_block();
submit_scan_task(std::make_shared<ScanTask>(next_scanner, std::move(block)));
_num_running_scanners++;
}
}
return Status::OK();
}
@ -220,140 +162,223 @@ std::string ScannerContext::parent_name() {
vectorized::BlockUPtr ScannerContext::get_free_block() {
vectorized::BlockUPtr block;
if (_free_blocks.try_dequeue(block)) {
std::lock_guard<std::mutex> fl(_free_blocks_lock);
DCHECK(block->mem_reuse());
_free_blocks_memory_usage->add(-block->allocated_bytes());
_serving_blocks_num++;
_free_blocks_memory_usage -= block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
return block;
}
block = vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size,
true /*ignore invalid slots*/);
COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
_serving_blocks_num++;
return block;
_newly_create_free_blocks_num->update(1);
return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size,
true /*ignore invalid slots*/);
}
void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block) {
_serving_blocks_num--;
if (block->mem_reuse()) {
// Only put blocks with schema to free blocks, because colocate blocks
// need schema.
_estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
std::lock_guard<std::mutex> fl(_free_blocks_lock);
if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) {
block->clear_column_data();
_free_blocks_memory_usage->add(block->allocated_bytes());
_free_blocks_memory_usage += block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
_free_blocks.enqueue(std::move(block));
}
}
void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) {
std::lock_guard l(_transfer_lock);
auto old_bytes_in_queue = _cur_bytes_in_queue;
for (auto& b : blocks) {
auto st = validate_block_schema(b.get());
if (!st.ok()) {
set_status_on_error(st, false);
}
_cur_bytes_in_queue += b->allocated_bytes();
_blocks_queue.push_back(std::move(b));
}
blocks.clear();
if (_dependency) {
_dependency->set_ready();
}
_blocks_queue_added_cv.notify_one();
_queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
}
bool ScannerContext::empty_in_queue(int id) {
std::unique_lock l(_transfer_lock);
std::lock_guard<std::mutex> l(_transfer_lock);
return _blocks_queue.empty();
}
Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block,
bool* eos, int id) {
std::vector<vectorized::BlockUPtr> merge_blocks;
{
std::unique_lock l(_transfer_lock);
// Normally, the scanner scheduler will schedule ctx.
// But when the amount of data in the blocks queue exceeds the upper limit,
// the scheduler will stop scheduling.
// (if the scheduler continues to schedule, it will cause a lot of busy running).
// At this point, consumers are required to trigger new scheduling to ensure that
// data can be continuously fetched.
bool to_be_schedule = should_be_scheduled();
void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
_scanner_sched_counter->update(1);
_num_scheduled_scanners++;
_scanner_scheduler->submit(shared_from_this(), scan_task);
}
bool is_scheduled = false;
if (!done() && to_be_schedule && _num_running_scanners == 0) {
is_scheduled = true;
auto submit_status = _scanner_scheduler->submit(shared_from_this());
if (!submit_status.ok()) {
set_status_on_error(submit_status, false);
void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task) {
if (scan_task->status_ok() && scan_task->current_block->rows() > 0) {
Status st = validate_block_schema(scan_task->current_block.get());
if (!st.ok()) {
scan_task->set_status(st);
}
}
std::lock_guard<std::mutex> l(_transfer_lock);
if (!scan_task->status_ok()) {
_process_status = scan_task->get_status();
}
if (_last_scale_up_time == 0) {
_last_scale_up_time = UnixMillis();
}
if (_blocks_queue.empty() && _last_fetch_time != 0) {
// there's no block in queue before current block, so the consumer is waiting
_total_wait_block_time += UnixMillis() - _last_fetch_time;
}
_num_scheduled_scanners--;
_blocks_queue.emplace_back(scan_task);
_blocks_queue_added_cv.notify_one();
}
Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Block* block,
bool* eos, int id, bool wait) {
if (state->is_cancelled()) {
_set_scanner_done();
return Status::Cancelled("Query cancelled in ScannerContext");
}
std::unique_lock l(_transfer_lock);
// Wait for block from queue
if (wait) {
// scanner batch wait time
SCOPED_TIMER(_scanner_wait_batch_timer);
while (!done() && _blocks_queue.empty() && _process_status.ok()) {
_blocks_queue_added_cv.wait_for(l, 1s);
}
}
if (!_process_status.ok()) {
_set_scanner_done();
return _process_status;
}
std::shared_ptr<ScanTask> scan_task = nullptr;
if (!_blocks_queue.empty() && !done()) {
_last_fetch_time = UnixMillis();
scan_task = _blocks_queue.front();
_blocks_queue.pop_front();
}
if (scan_task) {
if (!scan_task->status_ok()) {
_set_scanner_done();
return scan_task->get_status();
}
// We can only know the block size after reading at least one block
// Just take the size of first block as `_estimated_block_size`
if (scan_task->first_block) {
std::lock_guard<std::mutex> fl(_free_blocks_lock);
size_t block_size = scan_task->current_block->allocated_bytes();
_free_blocks_memory_usage += block_size;
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
scan_task->first_block = false;
if (block_size > _estimated_block_size) {
_estimated_block_size = block_size;
}
}
// Wait for block from queue
{
SCOPED_TIMER(_scanner_wait_batch_timer);
// scanner batch wait time
while (!(!_blocks_queue.empty() || done() || !status().ok() || state->is_cancelled())) {
if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
LOG(INFO) << debug_string();
}
_blocks_queue_added_cv.wait_for(l, 1s);
}
// consume current block
block->swap(*scan_task->current_block);
if (!scan_task->current_block->mem_reuse()) {
// it depends on the memory strategy of ScanNode/ScanOperator
// we should double check `mem_reuse()` of `current_block` to make sure it can be reused
_newly_create_free_blocks_num->update(1);
scan_task->current_block = vectorized::Block::create_unique(_output_tuple_desc->slots(),
_batch_size, true);
}
if (state->is_cancelled()) {
set_status_on_error(Status::Cancelled("cancelled"), false);
}
if (!status().ok()) {
return status();
}
if (!_blocks_queue.empty()) {
*block = std::move(_blocks_queue.front());
_blocks_queue.pop_front();
auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
_queued_blocks_memory_usage->add(-block_bytes);
auto rows = (*block)->rows();
while (!_blocks_queue.empty()) {
auto& add_block = _blocks_queue.front();
const auto add_rows = (*add_block).rows();
if (rows + add_rows < state->batch_size()) {
rows += add_rows;
block_bytes = (*add_block).allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
_queued_blocks_memory_usage->add(-block_bytes);
merge_blocks.emplace_back(std::move(add_block));
_blocks_queue.pop_front();
} else {
break;
if (scan_task->is_eos()) { // current scanner is finished, and no more data to read
_num_finished_scanners++;
std::weak_ptr<ScannerDelegate> next_scanner;
// submit one of the remaining scanners
if (_scanners.try_dequeue(next_scanner)) {
// reuse current running scanner, just reset some states.
scan_task->reuse_scanner(next_scanner);
submit_scan_task(scan_task);
} else {
// no more scanner to be scheduled
// `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners
int free_blocks_for_each = _free_blocks.size_approx() / _num_running_scanners;
_num_running_scanners--;
std::lock_guard<std::mutex> fl(_free_blocks_lock);
for (int i = 0; i < free_blocks_for_each; ++i) {
vectorized::BlockUPtr removed_block;
if (_free_blocks.try_dequeue(removed_block)) {
_free_blocks_memory_usage -= block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
}
}
}
} else {
*eos = done();
// resubmit current running scanner to read the next block
submit_scan_task(scan_task);
}
// scale up
_try_to_scale_up();
}
g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
static_cast<void>(m.merge(*merge_block));
return_free_block(std::move(merge_block));
}
(*block)->set_columns(std::move(m.mutable_columns()));
if (_num_finished_scanners == _all_scanners.size() && _blocks_queue.empty()) {
_set_scanner_done();
_is_finished = true;
}
*eos = done();
return Status::OK();
}
void ScannerContext::_try_to_scale_up() {
// Four criteria to determine whether to increase the parallelism of the scanners
// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks
// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up
// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
if (MAX_SCALE_UP_RATIO > 0 && _scanners.size_approx() > 0 &&
(_num_running_scanners < _max_thread_num * MAX_SCALE_UP_RATIO) &&
(_last_fetch_time - _last_scale_up_time > SCALE_UP_DURATION) && // duration > 5000ms
(_total_wait_block_time > (_last_fetch_time - _last_scale_up_time) *
WAIT_BLOCK_DURATION_RATIO)) { // too large lock time
double wait_ratio =
(double)_total_wait_block_time / (_last_fetch_time - _last_scale_up_time);
if (_last_wait_duration_ratio > 0 && wait_ratio > _last_wait_duration_ratio * 0.8) {
// when _last_wait_duration_ratio > 0, it has scaled up before.
// we need to determine if the scale-up is effective:
// the wait duration ratio after last scaling up should less than 80% of `_last_wait_duration_ratio`
return;
}
std::lock_guard<std::mutex> fl(_free_blocks_lock);
bool is_scale_up = false;
// calculate the number of scanners that can be scheduled
int num_add = std::min(_num_running_scanners * SCALE_UP_RATIO,
_max_thread_num * MAX_SCALE_UP_RATIO - _num_running_scanners);
num_add = std::max(num_add, 1);
for (int i = 0; i < num_add; ++i) {
vectorized::BlockUPtr allocate_block = nullptr;
// reuse block in `_free_blocks` firstly
if (!_free_blocks.try_dequeue(allocate_block)) {
if (_free_blocks_memory_usage < _max_bytes_in_queue) {
_newly_create_free_blocks_num->update(1);
allocate_block = vectorized::Block::create_unique(_output_tuple_desc->slots(),
_batch_size, true);
}
} else {
// comes from `_free_blocks`, decrease first, then will be added back.
_free_blocks_memory_usage -= allocate_block->allocated_bytes();
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
}
if (allocate_block) {
// get enough memory to launch one more scanner.
std::weak_ptr<ScannerDelegate> scale_up_scanner;
if (_scanners.try_dequeue(scale_up_scanner)) {
std::shared_ptr<ScanTask> scale_up_task =
std::make_shared<ScanTask>(scale_up_scanner, std::move(allocate_block));
_free_blocks_memory_usage += _estimated_block_size;
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
// `first_block` is used to update `_free_blocks_memory_usage`,
// we have got the `_estimated_block_size`, no need for further updates
scale_up_task->first_block = false;
submit_scan_task(scale_up_task);
_num_running_scanners++;
_scale_up_scanners_counter->update(1);
is_scale_up = true;
} else {
break;
}
} else {
break;
}
}
if (is_scale_up) {
_last_wait_duration_ratio = wait_ratio;
_last_scale_up_time = UnixMillis();
_total_wait_block_time = 0;
}
}
}
Status ScannerContext::validate_block_schema(Block* block) {
size_t index = 0;
for (auto& slot : _output_tuple_desc->slots()) {
@ -380,29 +405,17 @@ Status ScannerContext::validate_block_schema(Block* block) {
return Status::OK();
}
void ScannerContext::inc_num_running_scanners(int32_t inc) {
std::lock_guard l(_transfer_lock);
_num_running_scanners += inc;
g_num_running_scanners.set_value(_num_running_scanners);
}
void ScannerContext::set_status_on_error(const Status& status, bool need_lock) {
std::unique_lock l(_transfer_lock, std::defer_lock);
if (need_lock) {
l.lock();
}
if (this->status().ok()) {
_process_status = status;
_blocks_queue_added_cv.notify_one();
_should_stop = true;
_set_scanner_done();
LOG(INFO) << "ctx is set status on error " << debug_string()
<< ", call stack is: " << Status::InternalError<true>("catch error status");
}
void ScannerContext::set_status_on_error(const Status& status) {
std::lock_guard<std::mutex> l(_transfer_lock);
_process_status = status;
_blocks_queue_added_cv.notify_one();
}
void ScannerContext::stop_scanners(RuntimeState* state) {
std::unique_lock l(_transfer_lock);
std::lock_guard<std::mutex> l(_transfer_lock);
if (_should_stop) {
return;
}
_should_stop = true;
_set_scanner_done();
for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
@ -453,95 +466,15 @@ void ScannerContext::stop_scanners(RuntimeState* state) {
_blocks_queue_added_cv.notify_one();
}
void ScannerContext::_set_scanner_done() {
if (_dependency) {
_dependency->set_scanner_done();
}
}
std::string ScannerContext::debug_string() {
return fmt::format(
"id: {}, total scanners: {}, scanners: {}, blocks in queue: {},"
" status: {}, _should_stop: {}, _is_finished: {}, free blocks: {},"
"id: {}, total scanners: {}, blocks in queue: {},"
" _should_stop: {}, _is_finished: {}, free blocks: {},"
" limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
" _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}, "
"num_ctx_scheduled: {}, serving_blocks_num: {}, allowed_blocks_num: {}, query_id: {}",
ctx_id, _all_scanners.size(), _scanners.size(), _blocks_queue.size(),
_process_status.to_string(), _should_stop, _is_finished, _free_blocks.size_approx(),
limit, _num_running_scanners, _max_thread_num, _block_per_scanner, _cur_bytes_in_queue,
_max_bytes_in_queue, num_ctx_scheduled(), _serving_blocks_num, allowed_blocks_num(),
print_id(_query_id));
}
void ScannerContext::reschedule_scanner_ctx() {
std::lock_guard l(_transfer_lock);
if (done()) {
return;
}
auto submit_status = _scanner_scheduler->submit(shared_from_this());
//todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times?
if (!submit_status.ok()) {
set_status_on_error(submit_status, false);
}
}
void ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate> scanner) {
std::lock_guard l(_transfer_lock);
// Use a transfer lock to avoid the scanner be scheduled concurrently. For example, that after
// calling "_scanners.push_front(scanner)", there may be other ctx in scheduler
// to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed
// before we call the following if() block.
{
--_num_running_scanners;
g_num_running_scanners.set_value(_num_running_scanners);
if (scanner->_scanner->need_to_close()) {
--_num_unfinished_scanners;
if (_num_unfinished_scanners == 0) {
_is_finished = true;
_set_scanner_done();
_blocks_queue_added_cv.notify_one();
return;
}
} else {
_scanners.push_front(scanner);
}
}
if (should_be_scheduled()) {
auto submit_status = _scanner_scheduler->submit(shared_from_this());
if (!submit_status.ok()) {
set_status_on_error(submit_status, false);
}
}
}
// This method is called in scanner scheduler, and task context is hold
void ScannerContext::get_next_batch_of_scanners(
std::list<std::weak_ptr<ScannerDelegate>>* current_run) {
std::lock_guard l(_transfer_lock);
// Update the sched counter for profile
Defer defer {[&]() { _scanner_sched_counter->update(current_run->size()); }};
// 1. Calculate how many scanners should be scheduled at this run.
// If there are enough space in blocks queue,
// the scanner number depends on the _free_blocks numbers
int thread_slot_num = get_available_thread_slot_num();
// 2. get #thread_slot_num scanners from ctx->scanners
// and put them into "this_run".
for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
std::weak_ptr<ScannerDelegate> scanner_ref = _scanners.front();
std::shared_ptr<ScannerDelegate> scanner = scanner_ref.lock();
_scanners.pop_front();
if (scanner == nullptr) {
continue;
}
if (scanner->_scanner->need_to_close()) {
static_cast<void>(scanner->_scanner->close(_state));
} else {
current_run->push_back(scanner_ref);
i++;
}
}
" _max_bytes_in_queue: {}, query_id: {}",
ctx_id, _all_scanners.size(), _blocks_queue.size(), _should_stop, _is_finished,
_free_blocks.size_approx(), limit, _num_scheduled_scanners, _max_thread_num,
_max_bytes_in_queue, print_id(_query_id));
}
} // namespace doris::vectorized

View File

@ -58,6 +58,47 @@ class VScanNode;
class ScannerScheduler;
class SimplifiedScanScheduler;
class ScanTask {
public:
ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner, vectorized::BlockUPtr free_block)
: scanner(delegate_scanner), current_block(std::move(free_block)) {}
private:
// whether current scanner is finished
bool eos = false;
Status status = Status::OK();
public:
std::weak_ptr<ScannerDelegate> scanner;
// cache the block of current loop
vectorized::BlockUPtr current_block;
// only take the size of the first block as estimated size
bool first_block = true;
uint64_t last_submit_time; // nanoseconds
void set_status(Status _status) {
if (_status.is<ErrorCode::END_OF_FILE>()) {
// set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error
eos = true;
}
status = _status;
}
Status get_status() const { return status; }
bool status_ok() { return status.ok() || status.is<ErrorCode::END_OF_FILE>(); }
bool is_eos() const { return eos; }
void set_eos(bool _eos) { eos = _eos; }
// reuse current running scanner
// reset `eos` and `status`
// `first_block` is used to update `_free_blocks_memory_usage`, and take the first block size
// as the `_estimated_block_size`. It has updated `_free_blocks_memory_usage`, so don't reset.
void reuse_scanner(std::weak_ptr<ScannerDelegate> next_scanner) {
scanner = next_scanner;
eos = false;
status = Status::OK();
}
};
// ScannerContext is responsible for recording the execution status
// of a group of Scanners corresponding to a ScanNode.
// Including how many scanners are being scheduled, and maintaining
@ -81,88 +122,49 @@ public:
virtual Status init();
vectorized::BlockUPtr get_free_block();
void return_free_block(std::unique_ptr<vectorized::Block> block);
void return_free_block(vectorized::BlockUPtr block);
// Append blocks from scanners to the blocks queue.
virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks);
// Get next block from blocks queue. Called by ScanNode
// Get next block from blocks queue. Called by ScanNode/ScanOperator
// Set eos to true if there is no more data to read.
// And if eos is true, the block returned must be nullptr.
virtual Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block,
bool* eos, int id);
virtual Status get_block_from_queue(RuntimeState* state, vectorized::Block* block, bool* eos,
int id, bool wait = true);
[[nodiscard]] Status validate_block_schema(Block* block);
// When a scanner complete a scan, this method will be called
// to return the scanner to the list for next scheduling.
void push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate> scanner);
// submit the running scanner to thread pool in `ScannerScheduler`
// set the next scanned block to `ScanTask::current_block`
// set the error state to `ScanTask::status`
// set the `eos` to `ScanTask::eos` if there is no more data in current scanner
void submit_scan_task(std::shared_ptr<ScanTask> scan_task);
void set_status_on_error(const Status& status, bool need_lock = true);
// append the running scanner and its cached block to `_blocks_queue`
virtual void append_block_to_queue(std::shared_ptr<ScanTask> scan_task);
Status status() {
if (_process_status.is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
}
return _process_status;
}
void set_status_on_error(const Status& status);
// Return true if this ScannerContext need no more process
bool done() const { return _is_finished || _should_stop; }
bool is_finished() { return _is_finished.load(); }
bool should_stop() { return _should_stop.load(); }
void inc_num_running_scanners(int32_t scanner_inc);
int get_num_running_scanners() const { return _num_running_scanners; }
int get_num_unfinished_scanners() const { return _num_unfinished_scanners; }
void get_next_batch_of_scanners(std::list<std::weak_ptr<ScannerDelegate>>* current_run);
virtual std::string debug_string();
RuntimeState* state() { return _state; }
void incr_num_ctx_scheduling(int64_t num) { _scanner_ctx_sched_counter->update(num); }
int64_t num_ctx_scheduled() { return _scanner_ctx_sched_counter->value(); }
void incr_ctx_scheduling_time(int64_t num) { _scanner_ctx_sched_time->update(num); }
std::string parent_name();
virtual bool empty_in_queue(int id);
// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan
inline bool should_be_scheduled() const {
return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
(_serving_blocks_num < allowed_blocks_num());
}
int get_available_thread_slot_num() {
int thread_slot_num = 0;
thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / _block_per_scanner;
thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners);
if (thread_slot_num <= 0) {
thread_slot_num = 1;
}
return thread_slot_num;
}
int32_t allowed_blocks_num() const {
int32_t blocks_num = std::min(_free_blocks_capacity,
int32_t((_max_bytes_in_queue + _estimated_block_bytes - 1) /
_estimated_block_bytes));
return blocks_num;
}
SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; }
virtual void reschedule_scanner_ctx();
void stop_scanners(RuntimeState* state);
int32_t get_max_thread_num() const { return _max_thread_num; }
void set_max_thread_num(int32_t num) { _max_thread_num = num; }
int batch_size() const { return _batch_size; }
// the unique id of this context
std::string ctx_id;
TUniqueId _query_id;
@ -176,10 +178,16 @@ protected:
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state,
std::shared_ptr<pipeline::ScanDependency> dependency);
pipeline::ScanLocalStateBase* local_state);
void _set_scanner_done();
/// Four criteria to determine whether to increase the parallelism of the scanners
/// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
/// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks
/// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up
/// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
virtual void _set_scanner_done() {};
void _try_to_scale_up();
RuntimeState* _state = nullptr;
VScanNode* _parent = nullptr;
@ -189,97 +197,52 @@ protected:
const TupleDescriptor* _output_tuple_desc = nullptr;
const RowDescriptor* _output_row_descriptor = nullptr;
// _transfer_lock is used to protect the critical section
// where the ScanNode and ScannerScheduler interact.
// Including access to variables such as blocks_queue, _process_status, _is_finished, etc.
std::mutex _transfer_lock;
// The blocks got from scanners will be added to the "blocks_queue".
// And the upper scan node will be as a consumer to fetch blocks from this queue.
// Should be protected by "_transfer_lock"
std::list<vectorized::BlockUPtr> _blocks_queue;
// Wait in get_block_from_queue(), by ScanNode.
std::condition_variable _blocks_queue_added_cv;
// Wait in clear_and_join(), by ScanNode.
std::condition_variable _ctx_finish_cv;
std::list<std::shared_ptr<ScanTask>> _blocks_queue;
// The following 3 variables control the process of the scanner scheduling.
// Use _transfer_lock to protect them.
// 1. _process_status
// indicates the global status of this scanner context.
// Set to non-ok if encounter errors.
// And if it is non-ok, the scanner process should stop.
// Set be set by either ScanNode or ScannerScheduler.
// 2. _should_stop
// Always be set by ScanNode.
// True means no more data need to be read(reach limit or closed)
// 3. _is_finished
// Always be set by ScannerScheduler.
// True means all scanners are finished to scan.
Status _process_status;
Status _process_status = Status::OK();
std::atomic_bool _should_stop = false;
std::atomic_bool _is_finished = false;
// Lazy-allocated blocks for all scanners to share, for memory reuse.
moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
std::atomic<int32_t> _serving_blocks_num = 0;
// The current number of free blocks available to the scanners.
// Used to limit the memory usage of the scanner.
// NOTE: this is NOT the size of `_free_blocks`.
int32_t _free_blocks_capacity = 0;
int64_t _estimated_block_bytes = 0;
int _batch_size;
// The limit from SQL's limit clause
int64_t limit;
// Current number of running scanners.
std::atomic_int32_t _num_running_scanners = 0;
// Current number of ctx being scheduled.
// After each Scanner finishes a task, it will put the corresponding ctx
// back into the scheduling queue.
// Therefore, there will be multiple pointer of same ctx in the scheduling queue.
// Here we record the number of ctx in the scheduling queue to clean up at the end.
std::atomic_int32_t _num_scheduling_ctx = 0;
// Num of unfinished scanners. Should be set in init()
std::atomic_int32_t _num_unfinished_scanners = 0;
// Max number of scan thread for this scanner context.
int32_t _max_thread_num = 0;
// How many blocks a scanner can use in one task.
int32_t _block_per_scanner = 0;
// The current bytes of blocks in blocks queue
int64_t _cur_bytes_in_queue = 0;
// The max limit bytes of blocks in blocks queue
const int64_t _max_bytes_in_queue;
int64_t _max_bytes_in_queue;
doris::vectorized::ScannerScheduler* _scanner_scheduler;
SimplifiedScanScheduler* _simple_scan_scheduler = nullptr;
// List "scanners" saves all "unfinished" scanners.
// The scanner scheduler will pop scanners from this list, run scanner,
// and then if the scanner is not finished, will be pushed back to this list.
// Not need to protect by lock, because only one scheduler thread will access to it.
std::mutex _scanners_lock;
// Scanner's ownership belong to vscannode or scanoperator, scanner context does not own it.
// ScannerContext has to check if scanner is deconstructed before use it.
std::list<std::weak_ptr<ScannerDelegate>> _scanners;
moodycamel::ConcurrentQueue<std::weak_ptr<ScannerDelegate>> _scanners;
int32_t _num_scheduled_scanners = 0;
int32_t _num_finished_scanners = 0;
int32_t _num_running_scanners = 0;
// weak pointer for _scanners, used in stop function
std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
std::vector<int64_t> _finished_scanner_runtime;
std::vector<int64_t> _finished_scanner_rows_read;
std::vector<int64_t> _finished_scanner_wait_worker_time;
const int _num_parallel_instances;
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage_mark = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
// for scaling up the running scanners
std::mutex _free_blocks_lock;
size_t _estimated_block_size = 0;
int64_t _free_blocks_memory_usage = 0;
int64_t _last_scale_up_time = 0;
int64_t _last_fetch_time = 0;
int64_t _total_wait_block_time = 0;
double _last_wait_duration_ratio = 0;
const int64_t SCALE_UP_DURATION = 5000; // 5000ms
const float WAIT_BLOCK_DURATION_RATIO = 0.5;
const float SCALE_UP_RATIO = 0.5;
float MAX_SCALE_UP_RATIO;
};
} // namespace vectorized
} // namespace doris

View File

@ -69,10 +69,6 @@ ScannerScheduler::~ScannerScheduler() {
return;
}
for (int i = 0; i < QUEUE_NUM; i++) {
delete _pending_queues[i];
}
delete[] _pending_queues;
_deregister_metrics();
}
@ -81,18 +77,12 @@ void ScannerScheduler::stop() {
return;
}
for (int i = 0; i < QUEUE_NUM; i++) {
_pending_queues[i]->shutdown();
}
_is_closed = true;
_scheduler_pool->shutdown();
_local_scan_thread_pool->shutdown();
_remote_scan_thread_pool->shutdown();
_limited_scan_thread_pool->shutdown();
_scheduler_pool->wait();
_local_scan_thread_pool->join();
_remote_scan_thread_pool->join();
_limited_scan_thread_pool->wait();
@ -101,24 +91,12 @@ void ScannerScheduler::stop() {
}
Status ScannerScheduler::init(ExecEnv* env) {
// 1. scheduling thread pool and scheduling queues
static_cast<void>(ThreadPoolBuilder("SchedulingThreadPool")
.set_min_threads(QUEUE_NUM)
.set_max_threads(QUEUE_NUM)
.build(&_scheduler_pool));
_pending_queues = new BlockingQueue<std::shared_ptr<ScannerContext>>*[QUEUE_NUM];
for (int i = 0; i < QUEUE_NUM; i++) {
_pending_queues[i] = new BlockingQueue<std::shared_ptr<ScannerContext>>(INT32_MAX);
static_cast<void>(_scheduler_pool->submit_func([this, i] { this->_schedule_thread(i); }));
}
// 2. local scan thread pool
// 1. local scan thread pool
_local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size, "local_scan");
// 3. remote scan thread pool
// 2. remote scan thread pool
_remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1
? config::doris_max_remote_scanner_thread_pool_thread_num
: std::max(512, CpuInfo::num_cores() * 10);
@ -128,7 +106,7 @@ Status ScannerScheduler::init(ExecEnv* env) {
_remote_thread_pool_max_size, config::doris_remote_scanner_thread_pool_queue_size,
"RemoteScanThreadPool");
// 4. limited scan thread pool
// 3. limited scan thread pool
static_cast<void>(ThreadPoolBuilder("LimitedScanThreadPool")
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
@ -139,15 +117,75 @@ Status ScannerScheduler::init(ExecEnv* env) {
return Status::OK();
}
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx) {
void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task) {
scan_task->last_submit_time = GetCurrentTimeNanos();
if (ctx->done()) {
return Status::EndOfFile("ScannerContext is done");
return;
}
ctx->queue_idx = (_queue_idx++ % QUEUE_NUM);
if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) {
return Status::InternalError("failed to submit scanner context to scheduler");
auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
<< " maybe finished";
return;
}
// Submit scanners to thread pool
// TODO(cmy): How to handle this "nice"?
int nice = 1;
if (ctx->thread_token != nullptr) {
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
}
scanner_delegate->_scanner->start_wait_worker_timer();
auto s = ctx->thread_token->submit_func(
[this, scanner_ref = scan_task, ctx]() { this->_scanner_scan(ctx, scanner_ref); });
if (!s.ok()) {
scan_task->set_status(s);
ctx->append_block_to_queue(scan_task);
return;
}
} else {
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
}
scanner_delegate->_scanner->start_wait_worker_timer();
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
bool ret = false;
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
auto work_func = [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref);
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
ret = scan_sche->get_scan_queue()->try_put(simple_scan_task);
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref);
};
task.priority = nice;
ret = _local_scan_thread_pool->offer(task);
}
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref);
};
task.priority = nice;
ret = _remote_scan_thread_pool->offer(task);
}
if (!ret) {
scan_task->set_status(
Status::InternalError("Failed to submit scanner to scanner pool"));
ctx->append_block_to_queue(scan_task);
return;
}
}
return Status::OK();
}
std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
@ -155,135 +193,21 @@ std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
return _limited_scan_thread_pool->new_token(mode, max_concurrency);
}
void ScannerScheduler::_schedule_thread(int queue_id) {
BlockingQueue<std::shared_ptr<ScannerContext>>* queue = _pending_queues[queue_id];
while (!_is_closed) {
std::shared_ptr<ScannerContext> ctx;
bool ok = queue->blocking_get(&ctx);
if (!ok) {
// maybe closed
continue;
}
_schedule_scanners(ctx);
// If ctx is done, no need to schedule it again.
// But should notice that there may still scanners running in scanner pool.
}
}
void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task) {
// record the time from scanner submission to actual execution in nanoseconds
ctx->incr_ctx_scheduling_time(GetCurrentTimeNanos() - scan_task->last_submit_time);
auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
<< " maybe finished";
return;
}
MonotonicStopWatch watch;
watch.reset();
watch.start();
ctx->incr_num_ctx_scheduling(1);
if (ctx->done()) {
return;
}
std::list<std::weak_ptr<ScannerDelegate>> this_run;
ctx->get_next_batch_of_scanners(&this_run);
if (this_run.empty()) {
// There running scanner will schedule the ctx after they are finished.
// So here we just return to stop scheduling ctx.
return;
}
ctx->inc_num_running_scanners(this_run.size());
// Submit scanners to thread pool
// TODO(cmy): How to handle this "nice"?
int nice = 1;
auto iter = this_run.begin();
if (ctx->thread_token != nullptr) {
// TODO llj tg how to treat this?
while (iter != this_run.end()) {
std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
if (scanner_delegate == nullptr) {
// Has to ++, or there is a dead loop
iter++;
continue;
}
scanner_delegate->_scanner->start_wait_worker_timer();
auto s = ctx->thread_token->submit_func([this, scanner_ref = *iter, ctx]() {
this->_scanner_scan(this, ctx, scanner_ref);
});
if (s.ok()) {
iter++;
} else {
ctx->set_status_on_error(s);
break;
}
}
} else {
while (iter != this_run.end()) {
std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
if (scanner_delegate == nullptr) {
// Has to ++, or there is a dead loop
iter++;
continue;
}
scanner_delegate->_scanner->start_wait_worker_timer();
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
bool ret = false;
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
auto work_func = [this, scanner_ref = *iter, ctx]() {
this->_scanner_scan(this, ctx, scanner_ref);
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
ret = scan_sche->get_scan_queue()->try_put(simple_scan_task);
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner_ref = *iter, ctx]() {
this->_scanner_scan(this, ctx, scanner_ref);
};
task.priority = nice;
ret = _local_scan_thread_pool->offer(task);
}
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner_ref = *iter, ctx]() {
this->_scanner_scan(this, ctx, scanner_ref);
};
task.priority = nice;
ret = _remote_scan_thread_pool->offer(task);
}
if (ret) {
iter++;
} else {
ctx->set_status_on_error(
Status::InternalError("failed to submit scanner to scanner pool"));
break;
}
}
}
ctx->incr_ctx_scheduling_time(watch.elapsed_time());
}
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
std::shared_ptr<ScannerContext> ctx,
std::weak_ptr<ScannerDelegate> scanner_ref) {
auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
// LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id)
// << " maybe finished";
return;
}
//LOG_EVERY_N(INFO, 100) << "start running scanner from ctx " << ctx->debug_string();
// will release scanner if it is the last one, task lock is hold here, to ensure
// that scanner could call scannode's method during deconstructor
std::shared_ptr<ScannerDelegate> scanner_delegate = scanner_ref.lock();
auto& scanner = scanner_delegate->_scanner;
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
}
VScannerSPtr& scanner = scanner_delegate->_scanner;
SCOPED_ATTACH_TASK(scanner->runtime_state());
// for cpu hard limit, thread name should not be reset
if (ctx->_should_reset_thread_name) {
@ -306,14 +230,13 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
if (!scanner->is_init()) {
status = scanner->init();
if (!status.ok()) {
ctx->set_status_on_error(status);
eos = true;
}
}
if (!eos && !scanner->is_open()) {
status = scanner->open(state);
if (!status.ok()) {
ctx->set_status_on_error(status);
eos = true;
}
scanner->set_opened();
@ -321,46 +244,21 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
static_cast<void>(scanner->try_append_late_arrival_runtime_filter());
// Because we use thread pool to scan data from storage. One scanner can't
// use this thread too long, this can starve other query's scanner. So, we
// need yield this thread when we do enough work. However, OlapStorage read
// data in pre-aggregate mode, then we can't use storage returned data to
// judge if we need to yield. So we record all raw data read in this round
// scan, if this exceeds row number or bytes threshold, we yield this thread.
std::vector<vectorized::BlockUPtr> blocks;
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
int num_rows_in_block = 0;
// Only set to true when ctx->done() return true.
// Use this flag because we need distinguish eos from `should_stop`.
// If eos is true, we still need to return blocks,
// but is should_stop is true, no need to return blocks
bool should_stop = false;
// Has to wait at least one full block, or it will cause a lot of schedule task in priority
// queue, it will affect query latency and query concurrency for example ssb 3.3.
auto should_do_scan = [&, batch_size = state->batch_size(),
time = state->wait_full_block_schedule_times()]() {
if (raw_bytes_read < raw_bytes_threshold) {
return true;
} else if (num_rows_in_block < batch_size) {
return raw_bytes_read < raw_bytes_threshold * time;
}
return false;
};
while (!eos && should_do_scan()) {
// TODO llj task group should should_yield?
bool first_read = true;
while (!eos) {
if (UNLIKELY(ctx->done())) {
// No need to set status on error here.
// Because done() maybe caused by "should_stop"
should_stop = true;
eos = true;
break;
}
BlockUPtr free_block = nullptr;
if (first_read) {
status = scanner->get_block_after_projects(state, scan_task->current_block.get(), &eos);
first_read = false;
} else {
free_block = ctx->get_free_block();
status = scanner->get_block_after_projects(state, free_block.get(), &eos);
}
BlockUPtr block = ctx->get_free_block();
status = scanner->get_block_after_projects(state, block.get(), &eos);
// The VFileScanner for external table may try to open not exist files,
// Because FE file cache for external table may out of date.
// So, NOT_FOUND for VFileScanner is not a fail case.
@ -370,49 +268,36 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
!status.is<ErrorCode::NOT_FOUND>()))) {
LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string();
break;
}
VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << eos;
if (status.is<ErrorCode::NOT_FOUND>()) {
} else if (status.is<ErrorCode::NOT_FOUND>()) {
// The only case in this "if" branch is external table file delete and fe cache has not been updated yet.
// Set status to OK.
status = Status::OK();
eos = true;
break;
}
raw_bytes_read += block->allocated_bytes();
num_rows_in_block += block->rows();
if (UNLIKELY(block->rows() == 0)) {
ctx->return_free_block(std::move(block));
} else {
if (!blocks.empty() && blocks.back()->rows() + block->rows() <= state->batch_size()) {
vectorized::MutableBlock mutable_block(blocks.back().get());
static_cast<void>(mutable_block.merge(*block));
blocks.back().get()->set_columns(std::move(mutable_block.mutable_columns()));
ctx->return_free_block(std::move(block));
} else {
blocks.push_back(std::move(block));
}
if (!first_read && free_block) {
vectorized::MutableBlock mutable_block(scan_task->current_block.get());
static_cast<void>(mutable_block.merge(*free_block));
scan_task->current_block->set_columns(std::move(mutable_block.mutable_columns()));
ctx->return_free_block(std::move(free_block));
}
if (scan_task->current_block->rows() >= ctx->batch_size()) {
break;
}
} // end for while
// if we failed, check status.
if (UNLIKELY(!status.ok())) {
// _transfer_done = true;
ctx->set_status_on_error(status);
scan_task->set_status(status);
eos = true;
blocks.clear();
} else if (should_stop) {
// No need to return blocks because of should_stop, just delete them
blocks.clear();
} else if (!blocks.empty()) {
ctx->append_blocks_to_queue(blocks);
}
scanner->update_scan_cpu_timer();
if (eos || should_stop) {
if (eos) {
scanner->mark_to_need_to_close();
}
ctx->push_back_scanner_and_reschedule(scanner_delegate);
scan_task->set_eos(eos);
ctx->append_block_to_queue(scan_task);
}
void ScannerScheduler::_register_metrics() {

View File

@ -37,25 +37,18 @@ class BlockingQueue;
namespace doris::vectorized {
class ScannerDelegate;
class ScanTask;
class ScannerContext;
// Responsible for the scheduling and execution of all Scanners of a BE node.
// ScannerScheduler has two types of thread pools:
// 1. Scheduling thread pool
// Responsible for Scanner scheduling.
// A set of Scanners for a query will be encapsulated into a ScannerContext
// and submitted to the ScannerScheduler's scheduling queue.
// There are multiple scheduling queues in ScannerScheduler, and each scheduling queue
// is handled by a scheduling thread.
// The scheduling thread is scheduled in granularity of ScannerContext,
// that is, a group of Scanners in a ScannerContext are scheduled at a time.
//
//2. Execution thread pool
// The scheduling thread will submit the Scanners selected from the ScannerContext
// Execution thread pool
// When a ScannerContext is launched, it will submit the running scanners to this scheduler.
// The scheduling thread will submit the running scanner and its ScannerContext
// to the execution thread pool to do the actual scan task.
// Each Scanner will act as a producer, read a group of blocks and put them into
// Each Scanner will act as a producer, read the next block and put it into
// the corresponding block queue.
// The corresponding ScanNode will act as a consumer to consume blocks from the block queue.
// After the block is consumed, the unfinished scanner will resubmit to this scheduler.
class ScannerScheduler {
public:
ScannerScheduler();
@ -63,7 +56,7 @@ public:
[[nodiscard]] Status init(ExecEnv* env);
[[nodiscard]] Status submit(std::shared_ptr<ScannerContext> ctx);
void submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);
void stop();
@ -73,32 +66,13 @@ public:
int remote_thread_pool_max_size() const { return _remote_thread_pool_max_size; }
private:
// scheduling thread function
void _schedule_thread(int queue_id);
// schedule scanners in a certain ScannerContext
void _schedule_scanners(std::shared_ptr<ScannerContext> ctx);
// execution thread function
void _scanner_scan(ScannerScheduler* scheduler, std::shared_ptr<ScannerContext> ctx,
std::weak_ptr<ScannerDelegate> scanner);
static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task);
void _register_metrics();
static void _deregister_metrics();
// Scheduling queue number.
// TODO: make it configurable.
static const int QUEUE_NUM = 4;
// The ScannerContext will be submitted to the pending queue roundrobin.
// _queue_idx pointer to the current queue.
// Use std::atomic_uint to prevent numerical overflow from memory out of bound.
// The scheduler thread will take ctx from pending queue, schedule it,
// and put it to the _scheduling_map.
// If any scanner finish, it will take ctx from and put it to pending queue again.
std::atomic_uint _queue_idx = {0};
BlockingQueue<std::shared_ptr<ScannerContext>>** _pending_queues = nullptr;
// scheduling thread pool
std::unique_ptr<ThreadPool> _scheduler_pool;
// execution thread pool
// _local_scan_thread_pool is for local scan task(typically, olap scanner)
// _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.)

View File

@ -197,7 +197,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
if (_scanner_ctx) {
DCHECK(!_eos && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
}
if (_shared_scan_opt) {
//LOG(INFO) << "instance shared scan enabled"
@ -219,7 +218,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
: Status::OK());
if (_scanner_ctx) {
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
}
}
@ -246,14 +244,10 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool*
}};
if (state->is_cancelled()) {
// ISSUE: https://github.com/apache/doris/issues/16360
// _scanner_ctx may be null here, see: `VScanNode::alloc_resource` (_eos == null)
if (_scanner_ctx) {
_scanner_ctx->set_status_on_error(Status::Cancelled("query cancelled"));
return _scanner_ctx->status();
} else {
return Status::Cancelled("query cancelled");
_scanner_ctx->stop_scanners(state);
}
return Status::Cancelled("Query cancelled in ScanNode");
}
if (_eos) {
@ -261,16 +255,7 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool*
return Status::OK();
}
vectorized::BlockUPtr scan_block = nullptr;
RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, &scan_block, eos, _context_queue_id));
if (*eos) {
DCHECK(scan_block == nullptr);
return Status::OK();
}
// get scanner's block memory
block->swap(*scan_block);
_scanner_ctx->return_free_block(std::move(scan_block));
RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, block, eos, _context_queue_id));
reached_limit(block, eos);
if (*eos) {
@ -294,16 +279,14 @@ Status VScanNode::_init_profile() {
runtime_profile()->add_child(_scanner_profile.get(), true, nullptr);
_memory_usage_counter = ADD_LABEL_COUNTER(_scanner_profile, "MemoryUsage");
_queued_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES, "MemoryUsage");
_free_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage");
_newly_create_free_blocks_num =
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT);
_scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, "NumScaleUpScanners", TUnit::UNIT);
// time of transfer thread to wait for block from scan thread
_scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, "ScannerBatchWaitTime");
_scanner_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerSchedCount", TUnit::UNIT);
_scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerCtxSchedCount", TUnit::UNIT);
_scanner_ctx_sched_time = ADD_TIMER(_scanner_profile, "ScannerCtxSchedTime");
_scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");

View File

@ -377,7 +377,6 @@ protected:
RuntimeProfile::Counter* _filter_timer = nullptr;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
@ -387,8 +386,8 @@ protected:
RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
std::unordered_map<std::string, int> _colname_to_slot_id;

View File

@ -74,6 +74,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
public static final String NUM_SCANNER_THREADS = "num_scanner_threads";
public static final String SCANNER_SCALE_UP_RATIO = "scanner_scale_up_ratio";
public static final String QUERY_TIMEOUT = "query_timeout";
public static final String ANALYZE_TIMEOUT = "analyze_timeout";
@ -553,6 +555,20 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT)
public long maxScanQueueMemByte = 2147483648L / 20;
@VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true, description = {
"ScanNode扫描数据的最大并发,默认为0,采用BE的doris_scanner_thread_pool_thread_num",
"The max threads to read data of ScanNode, "
+ "default 0, use doris_scanner_thread_pool_thread_num in be.conf"
})
public int numScannerThreads = 0;
@VariableMgr.VarAttr(name = SCANNER_SCALE_UP_RATIO, needForward = true, description = {
"ScanNode自适应的增加扫描并发,最大允许增长的并发倍率,默认为0,关闭该功能",
"The max multiple of increasing the concurrency of scanners adaptively, "
+ "default 0, turn off scaling up"
})
public double scannerScaleUpRatio = 0;
@VariableMgr.VarAttr(name = ENABLE_SPILLING)
public boolean enableSpilling = false;
@ -1790,6 +1806,14 @@ public class SessionVariable implements Serializable, Writable {
return maxScanQueueMemByte;
}
public int getNumScannerThreads() {
return numScannerThreads;
}
public double getScannerScaleUpRatio() {
return scannerScaleUpRatio;
}
public int getQueryTimeoutS() {
return queryTimeoutS;
}
@ -1962,7 +1986,15 @@ public class SessionVariable implements Serializable, Writable {
}
public void setMaxScanQueueMemByte(long scanQueueMemByte) {
this.maxScanQueueMemByte = Math.min(scanQueueMemByte, maxExecMemByte / 2);
this.maxScanQueueMemByte = scanQueueMemByte;
}
public void setNumScannerThreads(int numScannerThreads) {
this.numScannerThreads = numScannerThreads;
}
public void setScannerScaleUpRatio(double scannerScaleUpRatio) {
this.scannerScaleUpRatio = scannerScaleUpRatio;
}
public boolean isSqlQuoteShowCreate() {
@ -2771,7 +2803,9 @@ public class SessionVariable implements Serializable, Writable {
public TQueryOptions toThrift() {
TQueryOptions tResult = new TQueryOptions();
tResult.setMemLimit(maxExecMemByte);
tResult.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, maxExecMemByte / 20));
tResult.setScanQueueMemLimit(maxScanQueueMemByte);
tResult.setNumScannerThreads(numScannerThreads);
tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
// TODO chenhao, reservation will be calculated by cost
tResult.setMinReservation(0);
@ -3067,7 +3101,9 @@ public class SessionVariable implements Serializable, Writable {
public TQueryOptions getQueryOptionVariables() {
TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setMemLimit(maxExecMemByte);
queryOptions.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, maxExecMemByte / 20));
queryOptions.setScanQueueMemLimit(maxScanQueueMemByte);
queryOptions.setNumScannerThreads(numScannerThreads);
queryOptions.setScannerScaleUpRatio(scannerScaleUpRatio);
queryOptions.setQueryTimeout(queryTimeoutS);
queryOptions.setInsertTimeout(insertTimeoutS);
queryOptions.setAnalyzeTimeout(analyzeTimeoutS);

View File

@ -271,6 +271,8 @@ struct TQueryOptions {
97: optional i64 parallel_scan_min_rows_per_scanner = 0;
98: optional bool skip_bad_tablet = false;
// Increase concurrency of scanners adaptively, the maxinum times to scale up
99: optional double scanner_scale_up_ratio = 0;
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false