From fbdebe2424e5f8f6f2157e087afa51cc2ea4fe75 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Wed, 21 Sep 2022 20:59:13 +0800 Subject: [PATCH] [feature-wip](new-scan)Add load counter for VFileScanner (#12812) The new scanner (VFileScanner) need a counter to record two values in load job. 1. The number of rows unselected by pre-filter, and 2. The number of rows filtered by unmatched schema or other error. This pr is to implement the counter. --- be/src/vec/exec/scan/vfile_scanner.cpp | 51 +++++++++++++++----------- be/src/vec/exec/scan/vfile_scanner.h | 15 ++++++++ 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 0e571e6bb0..9adf4e594f 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -80,30 +80,40 @@ Status VFileScanner::open(RuntimeState* state) { } Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { - if (_cur_reader == nullptr || _cur_reader_eof) { - RETURN_IF_ERROR(_get_next_reader()); - } - if (!_scanner_eof) { + do { + if (_cur_reader == nullptr || _cur_reader_eof) { + RETURN_IF_ERROR(_get_next_reader()); + } + + if (_scanner_eof) { + *eof = true; + return Status::OK(); + } + // Init src block for load job based on the data file schema (e.g. parquet) // For query job, simply set _src_block_ptr to block. RETURN_IF_ERROR(_init_src_block(block)); // Read next block. RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &_cur_reader_eof)); - // Convert the src block columns type to string in place. - RETURN_IF_ERROR(_cast_to_input_block(block)); - } - if (_scanner_eof && _src_block_ptr->rows() == 0) { - *eof = true; - } + if (_src_block_ptr->rows() > 0) { + // Convert the src block columns type to string in place. + RETURN_IF_ERROR(_cast_to_input_block(block)); + // Fill rows in src block with partition columns from path. (e.g. Hive partition columns) + RETURN_IF_ERROR(_fill_columns_from_path()); + // Apply _pre_conjunct_ctx_ptr to filter src block. + RETURN_IF_ERROR(_pre_filter_src_block()); + // Convert src block to output block (dest block), string to dest data type and apply filters. + RETURN_IF_ERROR(_convert_to_output_block(block)); + break; + } + } while (true); - if (_src_block_ptr->rows() > 0) { - // Fill rows in src block with partition columns from path. (e.g. Hive partition columns) - RETURN_IF_ERROR(_fill_columns_from_path()); - // Apply _pre_conjunct_ctx_ptr to filter src block. - RETURN_IF_ERROR(_pre_filter_src_block()); - // Convert src block to output block (dest block), string to dest data type and apply filters. - RETURN_IF_ERROR(_convert_to_output_block(block)); + // Update filtered rows and unselected rows for load, reset counter. + { + state->update_num_rows_load_filtered(_counter.num_rows_filtered); + state->update_num_rows_load_unselected(_counter.num_rows_unselected); + _reset_counter(); } return Status::OK(); @@ -293,7 +303,7 @@ Status VFileScanner::_convert_to_output_block(Block* block) { std::make_shared(), "filter column")); RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size, dest_size)); - // _counter->num_rows_filtered += rows - dest_block->rows(); + _counter.num_rows_filtered += rows - block->rows(); return Status::OK(); } @@ -301,11 +311,10 @@ Status VFileScanner::_convert_to_output_block(Block* block) { Status VFileScanner::_pre_filter_src_block() { if (_pre_conjunct_ctx_ptr) { auto origin_column_num = _src_block_ptr->columns(); - // filter block - // auto old_rows = _src_block_ptr->rows(); + auto old_rows = _src_block_ptr->rows(); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr, _src_block_ptr, origin_column_num)); - // _counter->num_rows_unselected += old_rows - _src_block.rows(); + _counter.num_rows_unselected += old_rows - _src_block.rows(); } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 3ac8b8bf26..aa6d203265 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -29,6 +29,16 @@ namespace doris::vectorized { class NewFileScanNode; +// The counter will be passed to each scanner. +// Note that this struct is not thread safe. +// So if we support concurrent scan in the future, we need to modify this struct. +struct ScannerCounter { + ScannerCounter() : num_rows_filtered(0), num_rows_unselected(0) {} + + int64_t num_rows_filtered; // unqualified rows (unmatched the dest schema, or no partition) + int64_t num_rows_unselected; // rows filtered by predicates +}; + class VFileScanner : public VScanner { public: VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, @@ -79,6 +89,7 @@ protected: RuntimeProfile* _profile; RuntimeProfile::Counter* _rows_read_counter; RuntimeProfile::Counter* _read_timer; + ScannerCounter _counter; bool _scanner_eof = false; int _rows = 0; @@ -107,5 +118,9 @@ private: Status _cast_to_input_block(Block* block); Status _pre_filter_src_block(); Status _convert_to_output_block(Block* block); + void _reset_counter() { + _counter.num_rows_unselected = 0; + _counter.num_rows_filtered = 0; + } }; } // namespace doris::vectorized