diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index b8ab49ff27..e37883abbe 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -107,6 +107,9 @@ public: const DescriptorTbl& desc_tbl() const { return *_desc_tbl; } void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; } int batch_size() const { return _query_options.batch_size; } + int wait_full_block_schedule_times() const { + return _query_options.wait_full_block_schedule_times; + } bool abort_on_error() const { return _query_options.abort_on_error; } bool abort_on_default_limit_exceeded() const { return _query_options.abort_on_default_limit_exceeded; diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 681fc09739..57d632a03e 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -63,6 +63,7 @@ public: } } + std::vector merge_blocks; { std::unique_lock l(*_queue_mutexs[id]); if (_blocks_queues[id].empty()) { @@ -76,6 +77,18 @@ public: *block = std::move(_blocks_queues[id].front()); _blocks_queues[id].pop_front(); + auto rows = (*block)->rows(); + while (!_blocks_queues[id].empty()) { + const auto add_rows = (*_blocks_queues[id].front()).rows(); + if (rows + add_rows < state->batch_size()) { + rows += add_rows; + merge_blocks.emplace_back(std::move(_blocks_queues[id].front())); + _blocks_queues[id].pop_front(); + } else { + break; + } + } + if (_blocks_queues[id].empty()) { this->reschedule_scanner_ctx(); if (_dependency) { @@ -83,7 +96,16 @@ public: } } } + _current_used_bytes -= (*block)->allocated_bytes(); + if (!merge_blocks.empty()) { + vectorized::MutableBlock m(block->get()); + for (auto& merge_block : merge_blocks) { + _current_used_bytes -= merge_block->allocated_bytes(); + static_cast(m.merge(*merge_block)); + return_free_block(std::move(merge_block)); + } + } return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 0d9b0351dd..7cad8242c1 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -209,65 +209,90 @@ bool ScannerContext::empty_in_queue(int id) { Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, int id, bool wait) { - std::unique_lock l(_transfer_lock); - // Normally, the scanner scheduler will schedule ctx. - // But when the amount of data in the blocks queue exceeds the upper limit, - // the scheduler will stop scheduling. - // (if the scheduler continues to schedule, it will cause a lot of busy running). - // At this point, consumers are required to trigger new scheduling to ensure that - // data can be continuously fetched. - int64_t cur_bytes_in_queue = _cur_bytes_in_queue; - int32_t serving_blocks_num = _serving_blocks_num; - bool to_be_schedule = should_be_scheduled(); - int num_running_scanners = _num_running_scanners; + std::vector merge_blocks; + { + std::unique_lock l(_transfer_lock); + // Normally, the scanner scheduler will schedule ctx. + // But when the amount of data in the blocks queue exceeds the upper limit, + // the scheduler will stop scheduling. + // (if the scheduler continues to schedule, it will cause a lot of busy running). + // At this point, consumers are required to trigger new scheduling to ensure that + // data can be continuously fetched. + int64_t cur_bytes_in_queue = _cur_bytes_in_queue; + int32_t serving_blocks_num = _serving_blocks_num; + bool to_be_schedule = should_be_scheduled(); + int num_running_scanners = _num_running_scanners; - bool is_scheduled = false; - if (to_be_schedule && _num_running_scanners == 0) { - is_scheduled = true; - auto state = _scanner_scheduler->submit(this); - if (state.ok()) { - _num_scheduling_ctx++; - } else { - set_status_on_error(state, false); - } - } - - // Wait for block from queue - if (wait) { - // scanner batch wait time - SCOPED_TIMER(_scanner_wait_batch_timer); - while (!(!_blocks_queue.empty() || _is_finished || !status().ok() || - state->is_cancelled())) { - if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) { - LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue - << ", serving_blocks_num " << serving_blocks_num - << ", num_running_scanners " << num_running_scanners - << ", to_be_scheudle " << to_be_schedule << (void*)this; + bool is_scheduled = false; + if (to_be_schedule && _num_running_scanners == 0) { + is_scheduled = true; + auto state = _scanner_scheduler->submit(this); + if (state.ok()) { + _num_scheduling_ctx++; + } else { + set_status_on_error(state, false); } - _blocks_queue_added_cv.wait_for(l, 1s); + } + + // Wait for block from queue + if (wait) { + // scanner batch wait time + SCOPED_TIMER(_scanner_wait_batch_timer); + while (!(!_blocks_queue.empty() || _is_finished || !status().ok() || + state->is_cancelled())) { + if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) { + LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue + << ", serving_blocks_num " << serving_blocks_num + << ", num_running_scanners " << num_running_scanners + << ", to_be_scheudle " << to_be_schedule << (void*)this; + } + _blocks_queue_added_cv.wait_for(l, 1s); + } + } + + if (state->is_cancelled()) { + set_status_on_error(Status::Cancelled("cancelled"), false); + } + + if (!status().ok()) { + return status(); + } + + if (!_blocks_queue.empty()) { + *block = std::move(_blocks_queue.front()); + _blocks_queue.pop_front(); + auto block_bytes = (*block)->allocated_bytes(); + _cur_bytes_in_queue -= block_bytes; + _queued_blocks_memory_usage->add(-block_bytes); + + auto rows = (*block)->rows(); + while (!_blocks_queue.empty()) { + auto& add_block = _blocks_queue.front(); + const auto add_rows = (*add_block).rows(); + if (rows + add_rows < state->batch_size()) { + rows += add_rows; + block_bytes = (*add_block).allocated_bytes(); + _cur_bytes_in_queue -= block_bytes; + _queued_blocks_memory_usage->add(-block_bytes); + merge_blocks.emplace_back(std::move(add_block)); + _blocks_queue.pop_front(); + } else { + break; + } + } + } else { + *eos = _is_finished; } } - if (state->is_cancelled()) { - set_status_on_error(Status::Cancelled("cancelled"), false); + if (!merge_blocks.empty()) { + vectorized::MutableBlock m(block->get()); + for (auto& merge_block : merge_blocks) { + static_cast(m.merge(*merge_block)); + return_free_block(std::move(merge_block)); + } } - if (!status().ok()) { - return status(); - } - - if (!_blocks_queue.empty()) { - *block = std::move(_blocks_queue.front()); - _blocks_queue.pop_front(); - - auto block_bytes = (*block)->allocated_bytes(); - _cur_bytes_in_queue -= block_bytes; - - _queued_blocks_memory_usage->add(-block_bytes); - return Status::OK(); - } else { - *eos = _is_finished; - } return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index b3cfc2e48a..6b7bc232a2 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -347,8 +347,18 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext bool should_stop = false; // Has to wait at least one full block, or it will cause a lot of schedule task in priority // queue, it will affect query latency and query concurrency for example ssb 3.3. - while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold && - num_rows_in_block < state->batch_size()) { + auto should_do_scan = [&, batch_size = state->batch_size(), + time = state->wait_full_block_schedule_times()]() { + if (raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold) { + return true; + } else if (num_rows_in_block < batch_size) { + return raw_bytes_read < raw_bytes_threshold * time && + raw_rows_read < raw_rows_threshold * time; + } + return false; + }; + + while (!eos && should_do_scan()) { // TODO llj task group should should_yield? if (UNLIKELY(ctx->done())) { // No need to set status on error here. @@ -384,10 +394,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext ctx->return_free_block(std::move(block)); } else { if (!blocks.empty() && blocks.back()->rows() + block->rows() <= state->batch_size()) { - status = vectorized::MutableBlock(blocks.back().get()).merge(*block); - if (!status.ok()) { - break; - } + static_cast(vectorized::MutableBlock(blocks.back().get()).merge(*block)); ctx->return_free_block(std::move(block)); } else { blocks.push_back(std::move(block)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index f3622b62fb..1ca37d7e70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -374,6 +374,7 @@ public class Coordinator implements CoordInterface { this.queryOptions.setExecutionTimeout(context.getExecTimeout()); this.queryOptions.setEnableScanNodeRunSerial(context.getSessionVariable().isEnableScanRunSerial()); this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID()); + this.queryOptions.setWaitFullBlockScheduleTimes(context.getSessionVariable().getWaitFullBlockScheduleTimes()); } public ConnectContext getConnectContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index c8ba8f4943..ff957244c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -468,6 +468,8 @@ public class SessionVariable implements Serializable, Writable { // this session variable is set to true. public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt"; + public static final String WAIT_FULL_BLOCK_SCHEDULE_TIMES = "wait_full_block_schedule_times"; + public static final List DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -834,6 +836,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = USE_RF_DEFAULT) public boolean useRuntimeFilterDefaultSize = false; + @VariableMgr.VarAttr(name = WAIT_FULL_BLOCK_SCHEDULE_TIMES) + public int waitFullBlockScheduleTimes = 2; + public int getBeNumberForTest() { return beNumberForTest; } @@ -2168,6 +2173,10 @@ public class SessionVariable implements Serializable, Writable { return sqlDialect; } + public int getWaitFullBlockScheduleTimes() { + return waitFullBlockScheduleTimes; + } + public ParseDialect.Dialect getSqlParseDialect() { return ParseDialect.Dialect.getByName(sqlDialect); } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index d1a779e285..401eb548a0 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -257,6 +257,8 @@ struct TQueryOptions { 90: optional bool skip_missing_version = false; 91: optional bool runtime_filter_wait_infinitely = false; + + 92: optional i32 wait_full_block_schedule_times = 1; }