[Refactor](scan) refactor scan scheduler to improve performance (#27948)

* [Refactor](scan) refactor scan scheduler to improve performance

* fix pipeline x core
This commit is contained in:
HappenLee
2023-12-05 13:03:16 +08:00
committed by GitHub
parent da87fcb477
commit 54fe1a166b
7 changed files with 127 additions and 58 deletions

View File

@ -107,6 +107,9 @@ public:
const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
int batch_size() const { return _query_options.batch_size; }
int wait_full_block_schedule_times() const {
return _query_options.wait_full_block_schedule_times;
}
bool abort_on_error() const { return _query_options.abort_on_error; }
bool abort_on_default_limit_exceeded() const {
return _query_options.abort_on_default_limit_exceeded;

View File

@ -63,6 +63,7 @@ public:
}
}
std::vector<vectorized::BlockUPtr> merge_blocks;
{
std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
if (_blocks_queues[id].empty()) {
@ -76,6 +77,18 @@ public:
*block = std::move(_blocks_queues[id].front());
_blocks_queues[id].pop_front();
auto rows = (*block)->rows();
while (!_blocks_queues[id].empty()) {
const auto add_rows = (*_blocks_queues[id].front()).rows();
if (rows + add_rows < state->batch_size()) {
rows += add_rows;
merge_blocks.emplace_back(std::move(_blocks_queues[id].front()));
_blocks_queues[id].pop_front();
} else {
break;
}
}
if (_blocks_queues[id].empty()) {
this->reschedule_scanner_ctx();
if (_dependency) {
@ -83,7 +96,16 @@ public:
}
}
}
_current_used_bytes -= (*block)->allocated_bytes();
if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
_current_used_bytes -= merge_block->allocated_bytes();
static_cast<void>(m.merge(*merge_block));
return_free_block(std::move(merge_block));
}
}
return Status::OK();
}

View File

@ -209,65 +209,90 @@ bool ScannerContext::empty_in_queue(int id) {
Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block,
bool* eos, int id, bool wait) {
std::unique_lock l(_transfer_lock);
// Normally, the scanner scheduler will schedule ctx.
// But when the amount of data in the blocks queue exceeds the upper limit,
// the scheduler will stop scheduling.
// (if the scheduler continues to schedule, it will cause a lot of busy running).
// At this point, consumers are required to trigger new scheduling to ensure that
// data can be continuously fetched.
int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
int32_t serving_blocks_num = _serving_blocks_num;
bool to_be_schedule = should_be_scheduled();
int num_running_scanners = _num_running_scanners;
std::vector<vectorized::BlockUPtr> merge_blocks;
{
std::unique_lock l(_transfer_lock);
// Normally, the scanner scheduler will schedule ctx.
// But when the amount of data in the blocks queue exceeds the upper limit,
// the scheduler will stop scheduling.
// (if the scheduler continues to schedule, it will cause a lot of busy running).
// At this point, consumers are required to trigger new scheduling to ensure that
// data can be continuously fetched.
int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
int32_t serving_blocks_num = _serving_blocks_num;
bool to_be_schedule = should_be_scheduled();
int num_running_scanners = _num_running_scanners;
bool is_scheduled = false;
if (to_be_schedule && _num_running_scanners == 0) {
is_scheduled = true;
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
} else {
set_status_on_error(state, false);
}
}
// Wait for block from queue
if (wait) {
// scanner batch wait time
SCOPED_TIMER(_scanner_wait_batch_timer);
while (!(!_blocks_queue.empty() || _is_finished || !status().ok() ||
state->is_cancelled())) {
if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
<< ", serving_blocks_num " << serving_blocks_num
<< ", num_running_scanners " << num_running_scanners
<< ", to_be_scheudle " << to_be_schedule << (void*)this;
bool is_scheduled = false;
if (to_be_schedule && _num_running_scanners == 0) {
is_scheduled = true;
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
} else {
set_status_on_error(state, false);
}
_blocks_queue_added_cv.wait_for(l, 1s);
}
// Wait for block from queue
if (wait) {
// scanner batch wait time
SCOPED_TIMER(_scanner_wait_batch_timer);
while (!(!_blocks_queue.empty() || _is_finished || !status().ok() ||
state->is_cancelled())) {
if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
<< ", serving_blocks_num " << serving_blocks_num
<< ", num_running_scanners " << num_running_scanners
<< ", to_be_scheudle " << to_be_schedule << (void*)this;
}
_blocks_queue_added_cv.wait_for(l, 1s);
}
}
if (state->is_cancelled()) {
set_status_on_error(Status::Cancelled("cancelled"), false);
}
if (!status().ok()) {
return status();
}
if (!_blocks_queue.empty()) {
*block = std::move(_blocks_queue.front());
_blocks_queue.pop_front();
auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
_queued_blocks_memory_usage->add(-block_bytes);
auto rows = (*block)->rows();
while (!_blocks_queue.empty()) {
auto& add_block = _blocks_queue.front();
const auto add_rows = (*add_block).rows();
if (rows + add_rows < state->batch_size()) {
rows += add_rows;
block_bytes = (*add_block).allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
_queued_blocks_memory_usage->add(-block_bytes);
merge_blocks.emplace_back(std::move(add_block));
_blocks_queue.pop_front();
} else {
break;
}
}
} else {
*eos = _is_finished;
}
}
if (state->is_cancelled()) {
set_status_on_error(Status::Cancelled("cancelled"), false);
if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
static_cast<void>(m.merge(*merge_block));
return_free_block(std::move(merge_block));
}
}
if (!status().ok()) {
return status();
}
if (!_blocks_queue.empty()) {
*block = std::move(_blocks_queue.front());
_blocks_queue.pop_front();
auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
_queued_blocks_memory_usage->add(-block_bytes);
return Status::OK();
} else {
*eos = _is_finished;
}
return Status::OK();
}

View File

@ -347,8 +347,18 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
bool should_stop = false;
// Has to wait at least one full block, or it will cause a lot of schedule task in priority
// queue, it will affect query latency and query concurrency for example ssb 3.3.
while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold &&
num_rows_in_block < state->batch_size()) {
auto should_do_scan = [&, batch_size = state->batch_size(),
time = state->wait_full_block_schedule_times()]() {
if (raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold) {
return true;
} else if (num_rows_in_block < batch_size) {
return raw_bytes_read < raw_bytes_threshold * time &&
raw_rows_read < raw_rows_threshold * time;
}
return false;
};
while (!eos && should_do_scan()) {
// TODO llj task group should should_yield?
if (UNLIKELY(ctx->done())) {
// No need to set status on error here.
@ -384,10 +394,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
ctx->return_free_block(std::move(block));
} else {
if (!blocks.empty() && blocks.back()->rows() + block->rows() <= state->batch_size()) {
status = vectorized::MutableBlock(blocks.back().get()).merge(*block);
if (!status.ok()) {
break;
}
static_cast<void>(vectorized::MutableBlock(blocks.back().get()).merge(*block));
ctx->return_free_block(std::move(block));
} else {
blocks.push_back(std::move(block));

View File

@ -374,6 +374,7 @@ public class Coordinator implements CoordInterface {
this.queryOptions.setExecutionTimeout(context.getExecTimeout());
this.queryOptions.setEnableScanNodeRunSerial(context.getSessionVariable().isEnableScanRunSerial());
this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
this.queryOptions.setWaitFullBlockScheduleTimes(context.getSessionVariable().getWaitFullBlockScheduleTimes());
}
public ConnectContext getConnectContext() {

View File

@ -468,6 +468,8 @@ public class SessionVariable implements Serializable, Writable {
// this session variable is set to true.
public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt";
public static final String WAIT_FULL_BLOCK_SCHEDULE_TIMES = "wait_full_block_schedule_times";
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@ -834,6 +836,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = USE_RF_DEFAULT)
public boolean useRuntimeFilterDefaultSize = false;
@VariableMgr.VarAttr(name = WAIT_FULL_BLOCK_SCHEDULE_TIMES)
public int waitFullBlockScheduleTimes = 2;
public int getBeNumberForTest() {
return beNumberForTest;
}
@ -2168,6 +2173,10 @@ public class SessionVariable implements Serializable, Writable {
return sqlDialect;
}
public int getWaitFullBlockScheduleTimes() {
return waitFullBlockScheduleTimes;
}
public ParseDialect.Dialect getSqlParseDialect() {
return ParseDialect.Dialect.getByName(sqlDialect);
}

View File

@ -257,6 +257,8 @@ struct TQueryOptions {
90: optional bool skip_missing_version = false;
91: optional bool runtime_filter_wait_infinitely = false;
92: optional i32 wait_full_block_schedule_times = 1;
}