[bugfix](scannode) 1. make rows_read correct 2. use single scanner if has limit clause (#16473)
make rows_read correct so that the scheduler could using this correctly. use single scanner if has limit clause. Move it from fragment context to scannode. --------- Co-authored-by: yiguolei <yiguolei@gmail.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
@ -36,6 +36,10 @@ Status ScannerContext::init() {
|
||||
// should find a more reasonable value.
|
||||
_max_thread_num =
|
||||
std::min(config::doris_scanner_thread_pool_thread_num / 4, (int32_t)_scanners.size());
|
||||
// For select * from table limit 10; should just use one thread.
|
||||
if (_parent->should_run_serial()) {
|
||||
_max_thread_num = 1;
|
||||
}
|
||||
|
||||
// 2. Calculate how many blocks need to be preallocated.
|
||||
// The calculation logic is as follows:
|
||||
|
||||
@ -274,7 +274,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
|
||||
// judge if we need to yield. So we record all raw data read in this round
|
||||
// scan, if this exceeds row number or bytes threshold, we yield this thread.
|
||||
std::vector<vectorized::Block*> blocks;
|
||||
int64_t raw_rows_read = scanner->raw_rows_read();
|
||||
int64_t raw_rows_read = scanner->get_rows_read();
|
||||
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
|
||||
int64_t raw_bytes_read = 0;
|
||||
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
|
||||
@ -332,7 +332,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
|
||||
blocks.push_back(block);
|
||||
}
|
||||
}
|
||||
raw_rows_read = scanner->raw_rows_read();
|
||||
raw_rows_read = scanner->get_rows_read();
|
||||
} // end for while
|
||||
|
||||
// if we failed, check status.
|
||||
|
||||
@ -49,7 +49,14 @@ struct FilterPredicates {
|
||||
class VScanNode : public ExecNode {
|
||||
public:
|
||||
VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
|
||||
: ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {}
|
||||
: ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {
|
||||
if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
|
||||
// Which means the request could be fullfilled in a single segment iterator request.
|
||||
if (tnode.limit > 0 && tnode.limit < 1024) {
|
||||
_should_run_serial = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
virtual ~VScanNode() = default;
|
||||
|
||||
friend class VScanner;
|
||||
@ -95,6 +102,8 @@ public:
|
||||
|
||||
Status try_close();
|
||||
|
||||
bool should_run_serial() const { return _should_run_serial; }
|
||||
|
||||
enum class PushDownType {
|
||||
// The predicate can not be pushed down to data source
|
||||
UNACCEPTABLE,
|
||||
@ -243,6 +252,9 @@ protected:
|
||||
|
||||
bool _need_agg_finalize = true;
|
||||
bool _blocked_by_rf = false;
|
||||
// If the query like select * from table limit 10; then the query should run in
|
||||
// single scanner to avoid too many scanners which will cause lots of useless read.
|
||||
bool _should_run_serial = false;
|
||||
|
||||
// Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector
|
||||
// so that it will be destroyed uniformly at the end of the query.
|
||||
|
||||
@ -37,7 +37,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
|
||||
// only empty block should be here
|
||||
DCHECK(block->rows() == 0);
|
||||
SCOPED_RAW_TIMER(&_per_scanner_timer);
|
||||
int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
|
||||
int64_t rows_read_threshold = _num_rows_read + config::doris_scanner_row_num;
|
||||
if (!block->mem_reuse()) {
|
||||
for (const auto slot_desc : _output_tuple_desc->slots()) {
|
||||
if (!slot_desc->need_materialize()) {
|
||||
@ -71,7 +71,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
|
||||
_num_rows_return += block->rows();
|
||||
}
|
||||
} while (!state->is_cancelled() && block->rows() == 0 && !(*eof) &&
|
||||
raw_rows_read() < raw_rows_threshold);
|
||||
_num_rows_read < rows_read_threshold);
|
||||
}
|
||||
|
||||
if (state->is_cancelled()) {
|
||||
|
||||
@ -47,9 +47,6 @@ public:
|
||||
|
||||
virtual Status close(RuntimeState* state);
|
||||
|
||||
// Subclass must implement this to return the current rows read
|
||||
virtual int64_t raw_rows_read() { return 0; }
|
||||
|
||||
protected:
|
||||
// Subclass should implement this to return data.
|
||||
virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0;
|
||||
|
||||
Reference in New Issue
Block a user