[fix](broker load) sync the workflow of BrokerScanner to other Scanner to avoid oom (#9173)
This commit is contained in:
@ -155,7 +155,18 @@ Status BaseScanner::init_expr_ctxes() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
|
||||
Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple) {
|
||||
RETURN_IF_ERROR(_fill_dest_tuple(dest_tuple, mem_pool));
|
||||
if (_success) {
|
||||
free_expr_local_allocations();
|
||||
*fill_tuple = true;
|
||||
} else {
|
||||
*fill_tuple = false;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
|
||||
// filter src tuple by preceding filter first
|
||||
if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) {
|
||||
_counter->num_rows_unselected++;
|
||||
|
||||
@ -58,12 +58,13 @@ public:
|
||||
|
||||
// Close this scanner
|
||||
virtual void close() = 0;
|
||||
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
|
||||
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);
|
||||
|
||||
void fill_slots_of_columns_from_path(int start,
|
||||
const std::vector<std::string>& columns_from_path);
|
||||
|
||||
void free_expr_local_allocations();
|
||||
|
||||
protected:
|
||||
RuntimeState* _state;
|
||||
const TBrokerScanRangeParams& _params;
|
||||
@ -106,6 +107,9 @@ protected:
|
||||
// Used to record whether a row of data is successfully read.
|
||||
bool _success = false;
|
||||
bool _scanner_eof = false;
|
||||
|
||||
private:
|
||||
Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
|
||||
};
|
||||
|
||||
} /* namespace doris */
|
||||
|
||||
@ -114,13 +114,7 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo
|
||||
{
|
||||
COUNTER_UPDATE(_rows_read_counter, 1);
|
||||
SCOPED_TIMER(_materialize_timer);
|
||||
RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool));
|
||||
if (_success) {
|
||||
free_expr_local_allocations();
|
||||
*fill_tuple = true;
|
||||
} else {
|
||||
*fill_tuple = false;
|
||||
}
|
||||
RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool, fill_tuple));
|
||||
break; // break always
|
||||
}
|
||||
}
|
||||
@ -469,14 +463,14 @@ bool is_null(const Slice& slice) {
|
||||
}
|
||||
|
||||
// Convert one row to this tuple
|
||||
Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool) {
|
||||
Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple) {
|
||||
RETURN_IF_ERROR(_line_to_src_tuple(line));
|
||||
if (!_success) {
|
||||
// If not success, which means we met an invalid row, return.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
return fill_dest_tuple(tuple, tuple_pool);
|
||||
return fill_dest_tuple(tuple, tuple_pool, fill_tuple);
|
||||
}
|
||||
|
||||
// Convert one row to this tuple
|
||||
|
||||
@ -86,7 +86,7 @@ private:
|
||||
// Convert one row to one tuple
|
||||
// 'ptr' and 'len' is csv text line
|
||||
// output is tuple
|
||||
Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool);
|
||||
Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple);
|
||||
|
||||
Status _line_to_src_tuple(const Slice& line);
|
||||
|
||||
|
||||
@ -95,8 +95,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool
|
||||
}
|
||||
COUNTER_UPDATE(_rows_read_counter, 1);
|
||||
SCOPED_TIMER(_materialize_timer);
|
||||
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
|
||||
*fill_tuple = _success;
|
||||
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
|
||||
break; // break always
|
||||
}
|
||||
if (_scanner_eof) {
|
||||
|
||||
@ -356,8 +356,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool*
|
||||
}
|
||||
COUNTER_UPDATE(_rows_read_counter, 1);
|
||||
SCOPED_TIMER(_materialize_timer);
|
||||
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
|
||||
*fill_tuple = _success;
|
||||
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
|
||||
break;
|
||||
}
|
||||
if (_scanner_eof) {
|
||||
|
||||
@ -80,8 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo
|
||||
|
||||
COUNTER_UPDATE(_rows_read_counter, 1);
|
||||
SCOPED_TIMER(_materialize_timer);
|
||||
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
|
||||
*fill_tuple = _success;
|
||||
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
|
||||
break; // break always
|
||||
}
|
||||
if (_scanner_eof) {
|
||||
|
||||
Reference in New Issue
Block a user