[Bug](pipeline) adjust scanner scheduler.submit and _num_scheduling_ctx maintain (#21843)
adjust scanner scheduler.submit and _num_scheduling_ctx maintain
This commit is contained in:
@ -42,7 +42,6 @@ void TabletSchemaCache::stop() {
|
||||
while (!_is_stopped) {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
}
|
||||
LOG(INFO) << "xxx stopped";
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -45,7 +45,7 @@ public:
|
||||
{
|
||||
std::unique_lock l(_transfer_lock);
|
||||
if (state->is_cancelled()) {
|
||||
_process_status = Status::Cancelled("cancelled");
|
||||
set_status_on_error(Status::Cancelled("cancelled"), false);
|
||||
}
|
||||
|
||||
if (!_process_status.ok()) {
|
||||
|
||||
@ -28,6 +28,7 @@
|
||||
#include <utility>
|
||||
|
||||
#include "common/config.h"
|
||||
#include "common/status.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/query_context.h"
|
||||
@ -188,8 +189,12 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
|
||||
// At this point, consumers are required to trigger new scheduling to ensure that
|
||||
// data can be continuously fetched.
|
||||
if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
|
||||
_num_scheduling_ctx++;
|
||||
_scanner_scheduler->submit(this);
|
||||
auto state = _scanner_scheduler->submit(this);
|
||||
if (state.ok()) {
|
||||
_num_scheduling_ctx++;
|
||||
} else {
|
||||
set_status_on_error(state, false);
|
||||
}
|
||||
}
|
||||
// Wait for block from queue
|
||||
if (wait) {
|
||||
@ -201,11 +206,11 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
|
||||
}
|
||||
|
||||
if (state->is_cancelled()) {
|
||||
_process_status = Status::Cancelled("cancelled");
|
||||
set_status_on_error(Status::Cancelled("cancelled"), false);
|
||||
}
|
||||
|
||||
if (!_process_status.ok()) {
|
||||
return _process_status;
|
||||
if (!status().ok()) {
|
||||
return status();
|
||||
}
|
||||
|
||||
if (!_blocks_queue.empty()) {
|
||||
@ -221,12 +226,16 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool ScannerContext::set_status_on_error(const Status& status) {
|
||||
std::lock_guard l(_transfer_lock);
|
||||
bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) {
|
||||
std::unique_lock l(_transfer_lock, std::defer_lock);
|
||||
if (need_lock) {
|
||||
l.lock();
|
||||
}
|
||||
if (_process_status.ok()) {
|
||||
_process_status = status;
|
||||
_status_error = true;
|
||||
_blocks_queue_added_cv.notify_one();
|
||||
_should_stop = true;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
@ -326,10 +335,12 @@ std::string ScannerContext::debug_string() {
|
||||
|
||||
void ScannerContext::reschedule_scanner_ctx() {
|
||||
std::lock_guard l(_transfer_lock);
|
||||
auto submit_st = _scanner_scheduler->submit(this);
|
||||
auto state = _scanner_scheduler->submit(this);
|
||||
//todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times?
|
||||
if (submit_st.ok()) {
|
||||
if (state.ok()) {
|
||||
_num_scheduling_ctx++;
|
||||
} else {
|
||||
set_status_on_error(state, false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -340,10 +351,11 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
|
||||
}
|
||||
std::lock_guard l(_transfer_lock);
|
||||
if (has_enough_space_in_blocks_queue()) {
|
||||
_num_scheduling_ctx++;
|
||||
auto submit_st = _scanner_scheduler->submit(this);
|
||||
if (!submit_st.ok()) {
|
||||
_num_scheduling_ctx--;
|
||||
auto state = _scanner_scheduler->submit(this);
|
||||
if (state.ok()) {
|
||||
_num_scheduling_ctx++;
|
||||
} else {
|
||||
set_status_on_error(state, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -86,10 +86,12 @@ public:
|
||||
// to return the scanner to the list for next scheduling.
|
||||
void push_back_scanner_and_reschedule(VScannerSPtr scanner);
|
||||
|
||||
bool set_status_on_error(const Status& status);
|
||||
bool set_status_on_error(const Status& status, bool need_lock = true);
|
||||
|
||||
Status status() {
|
||||
std::lock_guard l(_transfer_lock);
|
||||
if (_process_status.is<ErrorCode::END_OF_FILE>()) {
|
||||
return Status::OK();
|
||||
}
|
||||
return _process_status;
|
||||
}
|
||||
|
||||
@ -102,10 +104,7 @@ public:
|
||||
}
|
||||
|
||||
// Return true if this ScannerContext need no more process
|
||||
virtual bool done() {
|
||||
std::unique_lock l(_transfer_lock);
|
||||
return _is_finished || _should_stop || !_process_status.ok();
|
||||
}
|
||||
virtual bool done() { return _is_finished || _should_stop; }
|
||||
|
||||
// Update the running num of scanners and contexts
|
||||
void update_num_running(int32_t scanner_inc, int32_t sched_inc) {
|
||||
|
||||
@ -53,7 +53,7 @@
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
ScannerScheduler::ScannerScheduler() {}
|
||||
ScannerScheduler::ScannerScheduler() = default;
|
||||
|
||||
ScannerScheduler::~ScannerScheduler() {
|
||||
if (!_is_init) {
|
||||
@ -135,6 +135,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
|
||||
}
|
||||
|
||||
Status ScannerScheduler::submit(ScannerContext* ctx) {
|
||||
if (ctx->done()) {
|
||||
return Status::EndOfFile("ScannerContext is done");
|
||||
}
|
||||
if (ctx->queue_idx == -1) {
|
||||
ctx->queue_idx = (_queue_idx++ % QUEUE_NUM);
|
||||
}
|
||||
@ -163,7 +166,6 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
|
||||
// If ctx is done, no need to schedule it again.
|
||||
// But should notice that there may still scanners running in scanner pool.
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
[[maybe_unused]] static void* run_scanner_bthread(void* arg) {
|
||||
|
||||
@ -67,7 +67,7 @@ public:
|
||||
|
||||
Status init(ExecEnv* env);
|
||||
|
||||
Status submit(ScannerContext* ctx);
|
||||
[[nodiscard]] Status submit(ScannerContext* ctx);
|
||||
|
||||
std::unique_ptr<ThreadPoolToken> new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
|
||||
int max_concurrency);
|
||||
@ -86,7 +86,6 @@ private:
|
||||
void _task_group_scanner_scan(ScannerScheduler* scheduler,
|
||||
taskgroup::ScanTaskTaskGroupQueue* scan_queue);
|
||||
|
||||
private:
|
||||
// Scheduling queue number.
|
||||
// TODO: make it configurable.
|
||||
static const int QUEUE_NUM = 4;
|
||||
|
||||
Reference in New Issue
Block a user