cherry pick from #40495
This commit is contained in:
@ -185,7 +185,7 @@ Status ScannerContext::init() {
|
||||
for (int i = 0; i < _max_thread_num; ++i) {
|
||||
std::weak_ptr<ScannerDelegate> next_scanner;
|
||||
if (_scanners.try_dequeue(next_scanner)) {
|
||||
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
|
||||
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(next_scanner)));
|
||||
_num_running_scanners++;
|
||||
}
|
||||
}
|
||||
@ -231,10 +231,10 @@ bool ScannerContext::empty_in_queue(int id) {
|
||||
return _blocks_queue.empty();
|
||||
}
|
||||
|
||||
void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
|
||||
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
|
||||
_scanner_sched_counter->update(1);
|
||||
_num_scheduled_scanners++;
|
||||
_scanner_scheduler->submit(shared_from_this(), scan_task);
|
||||
return _scanner_scheduler->submit(shared_from_this(), scan_task);
|
||||
}
|
||||
|
||||
void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task) {
|
||||
@ -291,10 +291,15 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
|
||||
}
|
||||
|
||||
if (scan_task) {
|
||||
// The abnormal status of scanner may come from the execution of the scanner itself,
|
||||
// or come from the scanner scheduler, such as TooManyTasks.
|
||||
if (!scan_task->status_ok()) {
|
||||
// TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while.
|
||||
_process_status = scan_task->get_status();
|
||||
_set_scanner_done();
|
||||
return scan_task->get_status();
|
||||
return _process_status;
|
||||
}
|
||||
|
||||
if (!scan_task->cached_blocks.empty()) {
|
||||
vectorized::BlockUPtr current_block = std::move(scan_task->cached_blocks.front());
|
||||
scan_task->cached_blocks.pop_front();
|
||||
@ -309,13 +314,20 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
|
||||
return_free_block(std::move(current_block));
|
||||
}
|
||||
if (scan_task->cached_blocks.empty()) {
|
||||
// This scan task do not have any cached blocks.
|
||||
_blocks_queue.pop_front();
|
||||
if (scan_task->is_eos()) { // current scanner is finished, and no more data to read
|
||||
// current scanner is finished, and no more data to read
|
||||
if (scan_task->is_eos()) {
|
||||
_num_finished_scanners++;
|
||||
std::weak_ptr<ScannerDelegate> next_scanner;
|
||||
// submit one of the remaining scanners
|
||||
if (_scanners.try_dequeue(next_scanner)) {
|
||||
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
|
||||
auto submit_status = submit_scan_task(std::make_shared<ScanTask>(next_scanner));
|
||||
if (!submit_status.ok()) {
|
||||
_process_status = submit_status;
|
||||
_set_scanner_done();
|
||||
return _process_status;
|
||||
}
|
||||
} else {
|
||||
// no more scanner to be scheduled
|
||||
// `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners
|
||||
@ -330,11 +342,16 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
|
||||
}
|
||||
} else {
|
||||
// resubmit current running scanner to read the next block
|
||||
submit_scan_task(scan_task);
|
||||
Status submit_status = submit_scan_task(scan_task);
|
||||
if (!submit_status.ok()) {
|
||||
_process_status = submit_status;
|
||||
_set_scanner_done();
|
||||
return _process_status;
|
||||
}
|
||||
}
|
||||
}
|
||||
// scale up
|
||||
_try_to_scale_up();
|
||||
RETURN_IF_ERROR(_try_to_scale_up());
|
||||
}
|
||||
|
||||
if (_num_finished_scanners == _all_scanners.size() && _blocks_queue.empty()) {
|
||||
@ -345,7 +362,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void ScannerContext::_try_to_scale_up() {
|
||||
Status ScannerContext::_try_to_scale_up() {
|
||||
// Four criteria to determine whether to increase the parallelism of the scanners
|
||||
// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
|
||||
// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks
|
||||
@ -362,7 +379,7 @@ void ScannerContext::_try_to_scale_up() {
|
||||
// when _last_wait_duration_ratio > 0, it has scaled up before.
|
||||
// we need to determine if the scale-up is effective:
|
||||
// the wait duration ratio after last scaling up should less than 80% of `_last_wait_duration_ratio`
|
||||
return;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool is_scale_up = false;
|
||||
@ -377,7 +394,10 @@ void ScannerContext::_try_to_scale_up() {
|
||||
// get enough memory to launch one more scanner.
|
||||
std::weak_ptr<ScannerDelegate> scale_up_scanner;
|
||||
if (_scanners.try_dequeue(scale_up_scanner)) {
|
||||
submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner));
|
||||
// Just return error to caller.
|
||||
// Because _try_to_scale_up is called under _transfer_lock locked, if we add the scanner
|
||||
// to the block queue, we will get a deadlock.
|
||||
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner)));
|
||||
_num_running_scanners++;
|
||||
_scale_up_scanners_counter->update(1);
|
||||
is_scale_up = true;
|
||||
@ -392,6 +412,8 @@ void ScannerContext::_try_to_scale_up() {
|
||||
_total_wait_block_time = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ScannerContext::validate_block_schema(Block* block) {
|
||||
|
||||
@ -139,7 +139,7 @@ public:
|
||||
// set the next scanned block to `ScanTask::current_block`
|
||||
// set the error state to `ScanTask::status`
|
||||
// set the `eos` to `ScanTask::eos` if there is no more data in current scanner
|
||||
void submit_scan_task(std::shared_ptr<ScanTask> scan_task);
|
||||
Status submit_scan_task(std::shared_ptr<ScanTask> scan_task);
|
||||
|
||||
// append the running scanner and its cached block to `_blocks_queue`
|
||||
virtual void append_block_to_queue(std::shared_ptr<ScanTask> scan_task);
|
||||
@ -192,8 +192,7 @@ protected:
|
||||
/// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up
|
||||
/// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
|
||||
virtual void _set_scanner_done() {};
|
||||
|
||||
void _try_to_scale_up();
|
||||
Status _try_to_scale_up();
|
||||
|
||||
RuntimeState* _state = nullptr;
|
||||
VScanNode* _parent = nullptr;
|
||||
|
||||
@ -119,23 +119,23 @@ Status ScannerScheduler::init(ExecEnv* env) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
|
||||
std::shared_ptr<ScanTask> scan_task) {
|
||||
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
|
||||
std::shared_ptr<ScanTask> scan_task) {
|
||||
scan_task->last_submit_time = GetCurrentTimeNanos();
|
||||
if (ctx->done()) {
|
||||
return;
|
||||
return Status::OK();
|
||||
}
|
||||
auto task_lock = ctx->task_exec_ctx();
|
||||
if (task_lock == nullptr) {
|
||||
LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
|
||||
<< " maybe finished";
|
||||
return;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (ctx->thread_token != nullptr) {
|
||||
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
|
||||
if (scanner_delegate == nullptr) {
|
||||
return;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
scanner_delegate->_scanner->start_wait_worker_timer();
|
||||
@ -152,13 +152,12 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
|
||||
});
|
||||
if (!s.ok()) {
|
||||
scan_task->set_status(s);
|
||||
ctx->append_block_to_queue(scan_task);
|
||||
return;
|
||||
return s;
|
||||
}
|
||||
} else {
|
||||
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
|
||||
if (scanner_delegate == nullptr) {
|
||||
return;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
scanner_delegate->_scanner->start_wait_worker_timer();
|
||||
@ -186,14 +185,18 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
|
||||
return scan_sched->submit_scan_task(simple_scan_task);
|
||||
};
|
||||
|
||||
if (auto ret = sumbit_task(); !ret) {
|
||||
scan_task->set_status(Status::InternalError(
|
||||
"Failed to submit scanner to scanner pool reason:" + std::string(ret.msg()) +
|
||||
"|type:" + std::to_string(type)));
|
||||
ctx->append_block_to_queue(scan_task);
|
||||
return;
|
||||
Status submit_status = sumbit_task();
|
||||
if (!submit_status.ok()) {
|
||||
// User will see TooManyTasks error. It looks like a more reasonable error.
|
||||
Status scan_task_status = Status::TooManyTasks(
|
||||
"Failed to submit scanner to scanner pool reason:" +
|
||||
std::string(submit_status.msg()) + "|type:" + std::to_string(type));
|
||||
scan_task->set_status(scan_task_status);
|
||||
return scan_task_status;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
|
||||
|
||||
@ -57,7 +57,7 @@ public:
|
||||
|
||||
[[nodiscard]] Status init(ExecEnv* env);
|
||||
|
||||
void submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);
|
||||
Status submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);
|
||||
|
||||
void stop();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user