[feature-wip](new-scan)Add load counter for VFileScanner (#12812)
The new scanner (VFileScanner) need a counter to record two values in load job. 1. The number of rows unselected by pre-filter, and 2. The number of rows filtered by unmatched schema or other error. This pr is to implement the counter.
This commit is contained in:
@ -80,30 +80,40 @@ Status VFileScanner::open(RuntimeState* state) {
|
||||
}
|
||||
|
||||
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
|
||||
if (_cur_reader == nullptr || _cur_reader_eof) {
|
||||
RETURN_IF_ERROR(_get_next_reader());
|
||||
}
|
||||
if (!_scanner_eof) {
|
||||
do {
|
||||
if (_cur_reader == nullptr || _cur_reader_eof) {
|
||||
RETURN_IF_ERROR(_get_next_reader());
|
||||
}
|
||||
|
||||
if (_scanner_eof) {
|
||||
*eof = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Init src block for load job based on the data file schema (e.g. parquet)
|
||||
// For query job, simply set _src_block_ptr to block.
|
||||
RETURN_IF_ERROR(_init_src_block(block));
|
||||
// Read next block.
|
||||
RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &_cur_reader_eof));
|
||||
// Convert the src block columns type to string in place.
|
||||
RETURN_IF_ERROR(_cast_to_input_block(block));
|
||||
}
|
||||
|
||||
if (_scanner_eof && _src_block_ptr->rows() == 0) {
|
||||
*eof = true;
|
||||
}
|
||||
if (_src_block_ptr->rows() > 0) {
|
||||
// Convert the src block columns type to string in place.
|
||||
RETURN_IF_ERROR(_cast_to_input_block(block));
|
||||
// Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
|
||||
RETURN_IF_ERROR(_fill_columns_from_path());
|
||||
// Apply _pre_conjunct_ctx_ptr to filter src block.
|
||||
RETURN_IF_ERROR(_pre_filter_src_block());
|
||||
// Convert src block to output block (dest block), string to dest data type and apply filters.
|
||||
RETURN_IF_ERROR(_convert_to_output_block(block));
|
||||
break;
|
||||
}
|
||||
} while (true);
|
||||
|
||||
if (_src_block_ptr->rows() > 0) {
|
||||
// Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
|
||||
RETURN_IF_ERROR(_fill_columns_from_path());
|
||||
// Apply _pre_conjunct_ctx_ptr to filter src block.
|
||||
RETURN_IF_ERROR(_pre_filter_src_block());
|
||||
// Convert src block to output block (dest block), string to dest data type and apply filters.
|
||||
RETURN_IF_ERROR(_convert_to_output_block(block));
|
||||
// Update filtered rows and unselected rows for load, reset counter.
|
||||
{
|
||||
state->update_num_rows_load_filtered(_counter.num_rows_filtered);
|
||||
state->update_num_rows_load_unselected(_counter.num_rows_unselected);
|
||||
_reset_counter();
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -293,7 +303,7 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
|
||||
std::make_shared<vectorized::DataTypeUInt8>(),
|
||||
"filter column"));
|
||||
RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size, dest_size));
|
||||
// _counter->num_rows_filtered += rows - dest_block->rows();
|
||||
_counter.num_rows_filtered += rows - block->rows();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
@ -301,11 +311,10 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
|
||||
Status VFileScanner::_pre_filter_src_block() {
|
||||
if (_pre_conjunct_ctx_ptr) {
|
||||
auto origin_column_num = _src_block_ptr->columns();
|
||||
// filter block
|
||||
// auto old_rows = _src_block_ptr->rows();
|
||||
auto old_rows = _src_block_ptr->rows();
|
||||
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr,
|
||||
_src_block_ptr, origin_column_num));
|
||||
// _counter->num_rows_unselected += old_rows - _src_block.rows();
|
||||
_counter.num_rows_unselected += old_rows - _src_block.rows();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -29,6 +29,16 @@ namespace doris::vectorized {
|
||||
|
||||
class NewFileScanNode;
|
||||
|
||||
// The counter will be passed to each scanner.
|
||||
// Note that this struct is not thread safe.
|
||||
// So if we support concurrent scan in the future, we need to modify this struct.
|
||||
struct ScannerCounter {
|
||||
ScannerCounter() : num_rows_filtered(0), num_rows_unselected(0) {}
|
||||
|
||||
int64_t num_rows_filtered; // unqualified rows (unmatched the dest schema, or no partition)
|
||||
int64_t num_rows_unselected; // rows filtered by predicates
|
||||
};
|
||||
|
||||
class VFileScanner : public VScanner {
|
||||
public:
|
||||
VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
|
||||
@ -79,6 +89,7 @@ protected:
|
||||
RuntimeProfile* _profile;
|
||||
RuntimeProfile::Counter* _rows_read_counter;
|
||||
RuntimeProfile::Counter* _read_timer;
|
||||
ScannerCounter _counter;
|
||||
|
||||
bool _scanner_eof = false;
|
||||
int _rows = 0;
|
||||
@ -107,5 +118,9 @@ private:
|
||||
Status _cast_to_input_block(Block* block);
|
||||
Status _pre_filter_src_block();
|
||||
Status _convert_to_output_block(Block* block);
|
||||
void _reset_counter() {
|
||||
_counter.num_rows_unselected = 0;
|
||||
_counter.num_rows_filtered = 0;
|
||||
}
|
||||
};
|
||||
} // namespace doris::vectorized
|
||||
|
||||
Reference in New Issue
Block a user