[Try_Fix](scan) try fix the scanner schedule logic to prevent excessive memory usage and timeout (#30515)
This commit is contained in:
@ -40,7 +40,7 @@ public:
|
||||
_need_colocate_distribute(!_col_distribute_ids.empty()) {}
|
||||
|
||||
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos,
|
||||
int id, bool wait = false) override {
|
||||
int id) override {
|
||||
{
|
||||
std::unique_lock l(_transfer_lock);
|
||||
if (state->is_cancelled()) {
|
||||
@ -97,6 +97,11 @@ public:
|
||||
(*block)->set_columns(std::move(m.mutable_columns()));
|
||||
}
|
||||
|
||||
// after return free blocks, should try to reschedule the scanner
|
||||
if (should_be_scheduled()) {
|
||||
this->reschedule_scanner_ctx();
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -284,7 +289,7 @@ public:
|
||||
limit_, max_bytes_in_blocks_queue, 1, local_state,
|
||||
dependency) {}
|
||||
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos,
|
||||
int id, bool wait = false) override {
|
||||
int id) override {
|
||||
if (_blocks_queue_buffered.empty()) {
|
||||
std::unique_lock l(_transfer_lock);
|
||||
if (state->is_cancelled()) {
|
||||
@ -349,6 +354,11 @@ public:
|
||||
}
|
||||
return_free_blocks();
|
||||
|
||||
// after return free blocks, should try to reschedule the scanner
|
||||
if (should_be_scheduled()) {
|
||||
this->reschedule_scanner_ctx();
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -273,7 +273,7 @@ bool ScannerContext::empty_in_queue(int id) {
|
||||
}
|
||||
|
||||
Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block,
|
||||
bool* eos, int id, bool wait) {
|
||||
bool* eos, int id) {
|
||||
std::vector<vectorized::BlockUPtr> merge_blocks;
|
||||
{
|
||||
std::unique_lock l(_transfer_lock);
|
||||
@ -295,9 +295,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
|
||||
}
|
||||
|
||||
// Wait for block from queue
|
||||
if (wait) {
|
||||
// scanner batch wait time
|
||||
{
|
||||
SCOPED_TIMER(_scanner_wait_batch_timer);
|
||||
// scanner batch wait time
|
||||
while (!(!_blocks_queue.empty() || done() || !status().ok() || state->is_cancelled())) {
|
||||
if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
|
||||
LOG(INFO) << debug_string();
|
||||
|
||||
@ -89,7 +89,7 @@ public:
|
||||
// Set eos to true if there is no more data to read.
|
||||
// And if eos is true, the block returned must be nullptr.
|
||||
virtual Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block,
|
||||
bool* eos, int id, bool wait = true);
|
||||
bool* eos, int id);
|
||||
|
||||
[[nodiscard]] Status validate_block_schema(Block* block);
|
||||
|
||||
@ -134,7 +134,8 @@ public:
|
||||
|
||||
// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan
|
||||
inline bool should_be_scheduled() const {
|
||||
return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
|
||||
return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
|
||||
(_serving_blocks_num < allowed_blocks_num());
|
||||
}
|
||||
|
||||
int get_available_thread_slot_num() {
|
||||
|
||||
Reference in New Issue
Block a user