[improvement] dynamically calculate max rows to read in a batch to avoid oom (#10972)
This commit is contained in:
@ -169,6 +169,8 @@ CONF_mInt64(thrift_client_retry_interval_ms, "1000");
|
||||
CONF_mInt32(doris_scan_range_row_count, "524288");
|
||||
// max bytes number for single scan range, used in segmentv2
|
||||
CONF_mInt32(doris_scan_range_max_mb, "1024");
|
||||
// max bytes number for single scan block, used in segmentv2
|
||||
CONF_mInt32(doris_scan_block_max_mb, "67108864");
|
||||
// size of scanner queue between scanner thread and compute thread
|
||||
CONF_mInt32(doris_scanner_queue_size, "1024");
|
||||
// single read execute fragment row number
|
||||
|
||||
@ -110,7 +110,8 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, const Schema&
|
||||
_bitmap_index_iterators(_schema.num_columns(), nullptr),
|
||||
_cur_rowid(0),
|
||||
_lazy_materialization_read(false),
|
||||
_inited(false) {}
|
||||
_inited(false),
|
||||
_estimate_row_size(true) {}
|
||||
|
||||
SegmentIterator::~SegmentIterator() {
|
||||
for (auto iter : _column_iterators) {
|
||||
@ -999,6 +1000,10 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
|
||||
|
||||
uint32_t nrows_read = 0;
|
||||
uint32_t nrows_read_limit = _opts.block_row_max;
|
||||
if (UNLIKELY(_estimate_row_size)) {
|
||||
// read 100 rows to estimate average row size
|
||||
nrows_read_limit = 100;
|
||||
}
|
||||
_read_columns_by_index(nrows_read_limit, nrows_read, _lazy_materialization_read);
|
||||
|
||||
_opts.stats->blocks_load += 1;
|
||||
@ -1040,6 +1045,10 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
|
||||
}
|
||||
// shrink char_type suffix zero data
|
||||
block->shrink_char_type_column_suffix_zero(_char_type_idx);
|
||||
|
||||
if (UNLIKELY(_estimate_row_size) && block->rows() > 0) {
|
||||
_update_max_row(block);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1063,8 +1072,19 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
|
||||
// shrink char_type suffix zero data
|
||||
block->shrink_char_type_column_suffix_zero(_char_type_idx);
|
||||
|
||||
if (UNLIKELY(_estimate_row_size) && block->rows() > 0) {
|
||||
_update_max_row(block);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void SegmentIterator::_update_max_row(const vectorized::Block* block) {
|
||||
_estimate_row_size = false;
|
||||
auto avg_row_size = block->bytes() / block->rows();
|
||||
|
||||
int block_row_max = config::doris_scan_block_max_mb / avg_row_size;
|
||||
_opts.block_row_max = std::min(block_row_max, _opts.block_row_max);
|
||||
}
|
||||
|
||||
} // namespace segment_v2
|
||||
} // namespace doris
|
||||
|
||||
@ -146,6 +146,8 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
void _update_max_row(const vectorized::Block* block);
|
||||
|
||||
private:
|
||||
class BitmapRangeIterator;
|
||||
|
||||
@ -194,6 +196,7 @@ private:
|
||||
|
||||
// the actual init process is delayed to the first call to next_batch()
|
||||
bool _inited;
|
||||
bool _estimate_row_size;
|
||||
|
||||
StorageReadOptions _opts;
|
||||
// make a copy of `_opts.column_predicates` in order to make local changes
|
||||
|
||||
Reference in New Issue
Block a user