[Improvement](segment iterator) remove range in first read to save time (#26689)
Currently, rowids may be fragmented significantly after `_get_row_ranges_by_column_conditions`, potentially leading to high CPU costs when processing these scattered ranges of rowid.
This PR enhances the `SegmentIterator` by eliminating the initial range read in the `BitmapRangeIterator` constructor and introducing a `read_batch_rowids` method to both `BitmapRangeIterator` and `BackwardBitmapRangeIterator` classes. The aim is to boost performance by omitting redundant read operations, thereby reducing execution time.
Moreover, to avoid unnecessary reads when the range is relatively complete, we employ a simple `is_continuous` check to determine if the block of rows is continuous. If so, we call `next_batch` instead of `read_by_rowids`, streamlining the processing of consecutive rowids.
We selected three SQL statement scenarios to test the effects of the optimization, which are:
1. ```select COUNT() from wc_httplogs_inverted_index where request match "images" and (size >= 10 and status = 200);```
2. ```select COUNT() from wc_httplogs_inverted_index where request match "HTTP" and (size >= 10 and status = 200);```
3. ```select COUNT() from wc_httplogs_inverted_index where request match "GET" and (size >= 10 and status = 200);```
- The first SQL statement represents the scenario primarily optimized in this PR, where the first read matches a large number of rows but is highly fragmented.
- The second SQL statement represents a scenario where the first read fully hits, mainly to verify if there is any performance degradation in the PR when hitting a complete rowid range.
- The third SQL statement represents a near-total hit with only occasional misses, used to check if the PR degrades when the rowid range contains many continuous ranges.
The results are as follows:
1. For the first SQL statement:
1. Before optimization: Execution time: 0.32 sec, FirstReadTime: 6s628ms
2. After optimization: Execution time: 0.16 sec, FirstReadTime: 1s604ms
2. For the second SQL statement:
1. Before optimization: Execution time: 0.16 sec, FirstReadTime: 682.816ms
2. After optimization: Execution time: 0.15 sec, FirstReadTime: 635.156ms
3. For the third SQL statement:
1. Before optimization: Execution time: 0.16 sec, FirstReadTime: 787.904ms
2. After optimization: Execution time: 0.16 sec, FirstReadTime: 798.861ms
This commit is contained in:
@ -99,11 +99,12 @@ public:
|
||||
|
||||
explicit BitmapRangeIterator(const roaring::Roaring& bitmap) {
|
||||
roaring_init_iterator(&bitmap.roaring, &_iter);
|
||||
_read_next_batch();
|
||||
}
|
||||
|
||||
bool has_more_range() const { return !_eof; }
|
||||
|
||||
[[nodiscard]] static uint32_t get_batch_size() { return kBatchSize; }
|
||||
|
||||
// read next range into [*from, *to) whose size <= max_range_size.
|
||||
// return false when there is no more range.
|
||||
virtual bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) {
|
||||
@ -142,6 +143,11 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
// read batch_size of rowids from roaring bitmap into buf array
|
||||
virtual uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) {
|
||||
return roaring::api::roaring_read_uint32_iterator(&_iter, buf, batch_size);
|
||||
}
|
||||
|
||||
private:
|
||||
void _read_next_batch() {
|
||||
_buf_pos = 0;
|
||||
@ -166,6 +172,8 @@ class SegmentIterator::BackwardBitmapRangeIterator : public SegmentIterator::Bit
|
||||
public:
|
||||
explicit BackwardBitmapRangeIterator(const roaring::Roaring& bitmap) {
|
||||
roaring_init_iterator_last(&bitmap.roaring, &_riter);
|
||||
_rowid_count = roaring_bitmap_get_cardinality(&bitmap.roaring);
|
||||
_rowid_left = _rowid_count;
|
||||
}
|
||||
|
||||
bool has_more_range() const { return !_riter.has_value; }
|
||||
@ -189,9 +197,51 @@ public:
|
||||
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* Reads a batch of row IDs from a roaring bitmap, starting from the end and moving backwards.
|
||||
* This function retrieves the last `batch_size` row IDs from the bitmap and stores them in the provided buffer.
|
||||
* It updates the internal state to track how many row IDs are left to read in subsequent calls.
|
||||
*
|
||||
* The row IDs are read in reverse order, but stored in the buffer maintaining their original order in the bitmap.
|
||||
*
|
||||
* Example:
|
||||
* input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19]
|
||||
* If the bitmap has 12 elements and batch_size is set to 5, the function will first read [15, 16, 17, 18, 19]
|
||||
* into the buffer, leaving 7 elements left. In the next call with batch_size 5, it will read [4, 5, 6, 7, 10].
|
||||
*
|
||||
*/
|
||||
uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) override {
|
||||
if (!_riter.has_value || _rowid_left == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (_rowid_count <= batch_size) {
|
||||
roaring_bitmap_to_uint32_array(_riter.parent,
|
||||
buf); // Fill 'buf' with '_rowid_count' elements.
|
||||
uint32_t num_read = _rowid_left; // Save the number of row IDs read.
|
||||
_rowid_left = 0; // No row IDs left after this operation.
|
||||
return num_read; // Return the number of row IDs read.
|
||||
}
|
||||
|
||||
uint32_t read_size = std::min(batch_size, _rowid_left);
|
||||
uint32_t num_read = 0; // Counter for the number of row IDs read.
|
||||
|
||||
// Read row IDs into the buffer in reverse order.
|
||||
while (num_read < read_size && _riter.has_value) {
|
||||
buf[read_size - num_read - 1] = _riter.current_value;
|
||||
num_read++;
|
||||
_rowid_left--; // Decrement the count of remaining row IDs.
|
||||
roaring_previous_uint32_iterator(&_riter);
|
||||
}
|
||||
|
||||
// Return the actual number of row IDs read.
|
||||
return num_read;
|
||||
}
|
||||
|
||||
private:
|
||||
roaring::api::roaring_uint32_iterator_t _riter;
|
||||
uint32_t _rowid_count;
|
||||
uint32_t _rowid_left;
|
||||
};
|
||||
|
||||
SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema)
|
||||
@ -1610,56 +1660,86 @@ void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads columns by their index, handling both continuous and discontinuous rowid scenarios.
|
||||
*
|
||||
* This function is designed to read a specified number of rows (up to nrows_read_limit)
|
||||
* from the segment iterator, dealing with both continuous and discontinuous rowid arrays.
|
||||
* It operates as follows:
|
||||
*
|
||||
* 1. Reads a batch of rowids (up to the specified limit), and checks if they are continuous.
|
||||
* Continuous here means that the rowids form an unbroken sequence (e.g., 1, 2, 3, 4...).
|
||||
*
|
||||
* 2. For each column that needs to be read (identified by _first_read_column_ids):
|
||||
* - If the rowids are continuous, the function uses seek_to_ordinal and next_batch
|
||||
* for efficient reading.
|
||||
* - If the rowids are not continuous, the function processes them in smaller batches
|
||||
* (each of size up to 256). Each batch is checked for internal continuity:
|
||||
* a. If a batch is continuous, uses seek_to_ordinal and next_batch for that batch.
|
||||
* b. If a batch is not continuous, uses read_by_rowids for individual rowids in the batch.
|
||||
*
|
||||
* This approach optimizes reading performance by leveraging batch processing for continuous
|
||||
* rowid sequences and handling discontinuities gracefully in smaller chunks.
|
||||
*/
|
||||
Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint32_t& nrows_read,
|
||||
bool set_block_rowid) {
|
||||
SCOPED_RAW_TIMER(&_opts.stats->first_read_ns);
|
||||
|
||||
do {
|
||||
uint32_t range_from = 0;
|
||||
uint32_t range_to = 0;
|
||||
bool has_next_range =
|
||||
_range_iter->next_range(nrows_read_limit - nrows_read, &range_from, &range_to);
|
||||
if (!has_next_range) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t rows_to_read = range_to - range_from;
|
||||
_cur_rowid = range_to;
|
||||
|
||||
if (set_block_rowid) {
|
||||
// Here use std::iota is better performance than for-loop, maybe for-loop is not vectorized
|
||||
auto start = _block_rowids.data() + nrows_read;
|
||||
auto end = start + rows_to_read;
|
||||
std::iota(start, end, range_from);
|
||||
nrows_read += rows_to_read;
|
||||
} else {
|
||||
nrows_read += rows_to_read;
|
||||
}
|
||||
|
||||
_split_row_ranges.emplace_back(std::pair {range_from, range_to});
|
||||
} while (nrows_read < nrows_read_limit && !_opts.read_orderby_key_reverse);
|
||||
nrows_read = _range_iter->read_batch_rowids(_block_rowids.data(), nrows_read_limit);
|
||||
bool is_continuous = (nrows_read > 1) &&
|
||||
(_block_rowids[nrows_read - 1] - _block_rowids[0] == nrows_read - 1);
|
||||
|
||||
for (auto cid : _first_read_column_ids) {
|
||||
auto& column = _current_return_columns[cid];
|
||||
if (_prune_column(cid, column, true, nrows_read)) {
|
||||
continue;
|
||||
}
|
||||
for (auto& range : _split_row_ranges) {
|
||||
size_t nrows = range.second - range.first;
|
||||
{
|
||||
_opts.stats->block_first_read_seek_num += 1;
|
||||
if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
|
||||
SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
|
||||
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(range.first));
|
||||
} else {
|
||||
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(range.first));
|
||||
}
|
||||
|
||||
if (is_continuous) {
|
||||
size_t rows_read = nrows_read;
|
||||
_opts.stats->block_first_read_seek_num += 1;
|
||||
if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
|
||||
SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
|
||||
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
|
||||
} else {
|
||||
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
|
||||
}
|
||||
size_t rows_read = nrows;
|
||||
RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column));
|
||||
if (rows_read != nrows) {
|
||||
return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})", nrows,
|
||||
rows_read);
|
||||
if (rows_read != nrows_read) {
|
||||
return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})",
|
||||
nrows_read, rows_read);
|
||||
}
|
||||
} else {
|
||||
const uint32_t batch_size = _range_iter->get_batch_size();
|
||||
uint32_t processed = 0;
|
||||
while (processed < nrows_read) {
|
||||
uint32_t current_batch_size = std::min(batch_size, nrows_read - processed);
|
||||
bool batch_continuous = (current_batch_size > 1) &&
|
||||
(_block_rowids[processed + current_batch_size - 1] -
|
||||
_block_rowids[processed] ==
|
||||
current_batch_size - 1);
|
||||
|
||||
if (batch_continuous) {
|
||||
size_t rows_read = current_batch_size;
|
||||
_opts.stats->block_first_read_seek_num += 1;
|
||||
if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
|
||||
SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
|
||||
RETURN_IF_ERROR(
|
||||
_column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
|
||||
} else {
|
||||
RETURN_IF_ERROR(
|
||||
_column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
|
||||
}
|
||||
RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column));
|
||||
if (rows_read != current_batch_size) {
|
||||
return Status::Error<ErrorCode::INTERNAL_ERROR>(
|
||||
"batch nrows({}) != rows_read({})", current_batch_size, rows_read);
|
||||
}
|
||||
} else {
|
||||
RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(
|
||||
&_block_rowids[processed], current_batch_size, column));
|
||||
}
|
||||
processed += current_batch_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1858,8 +1938,6 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
|
||||
nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100);
|
||||
_wait_times_estimate_row_size--;
|
||||
}
|
||||
_split_row_ranges.clear();
|
||||
_split_row_ranges.reserve(nrows_read_limit / 2);
|
||||
RETURN_IF_ERROR(_read_columns_by_index(
|
||||
nrows_read_limit, _current_batch_rows_read,
|
||||
_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval));
|
||||
@ -2127,35 +2205,28 @@ void SegmentIterator::_output_index_result_column(uint16_t* sel_rowid_idx, uint1
|
||||
}
|
||||
}
|
||||
|
||||
void SegmentIterator::_build_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size,
|
||||
vectorized::Block* block,
|
||||
void SegmentIterator::_build_index_result_column(const uint16_t* sel_rowid_idx,
|
||||
uint16_t select_size, vectorized::Block* block,
|
||||
const std::string& pred_result_sign,
|
||||
const roaring::Roaring& index_result) {
|
||||
auto index_result_column = vectorized::ColumnUInt8::create();
|
||||
vectorized::ColumnUInt8::Container& vec_match_pred = index_result_column->get_data();
|
||||
vec_match_pred.resize(block->rows());
|
||||
size_t idx_in_block = 0;
|
||||
size_t idx_in_row_range = 0;
|
||||
size_t idx_in_selected = 0;
|
||||
// _split_row_ranges store multiple ranges which split in function _read_columns_by_index(),
|
||||
// index_result is a column predicate apply result in a whole segement,
|
||||
// but a scanner thread one time can read max rows limit by block_row_max,
|
||||
// so split _row_bitmap by one time scan range, in order to match size of one scanner thread read rows.
|
||||
for (auto origin_row_range : _split_row_ranges) {
|
||||
for (size_t rowid = origin_row_range.first; rowid < origin_row_range.second; ++rowid) {
|
||||
if (sel_rowid_idx == nullptr || (idx_in_selected < select_size &&
|
||||
idx_in_row_range == sel_rowid_idx[idx_in_selected])) {
|
||||
if (index_result.contains(rowid)) {
|
||||
vec_match_pred[idx_in_block++] = true;
|
||||
} else {
|
||||
vec_match_pred[idx_in_block++] = false;
|
||||
}
|
||||
idx_in_selected++;
|
||||
|
||||
for (uint32_t i = 0; i < _current_batch_rows_read; i++) {
|
||||
auto rowid = _block_rowids[i];
|
||||
if (sel_rowid_idx == nullptr ||
|
||||
(idx_in_selected < select_size && i == sel_rowid_idx[idx_in_selected])) {
|
||||
if (index_result.contains(rowid)) {
|
||||
vec_match_pred[idx_in_selected] = true;
|
||||
} else {
|
||||
vec_match_pred[idx_in_selected] = false;
|
||||
}
|
||||
idx_in_row_range++;
|
||||
idx_in_selected++;
|
||||
}
|
||||
}
|
||||
assert(block->rows() == vec_match_pred.size());
|
||||
DCHECK(block->rows() == vec_match_pred.size());
|
||||
auto index_result_position = block->get_position_by_name(pred_result_sign);
|
||||
block->replace_by_position(index_result_position, std::move(index_result_column));
|
||||
}
|
||||
|
||||
@ -259,7 +259,7 @@ private:
|
||||
std::string _gen_predicate_result_sign(ColumnPredicate* predicate);
|
||||
std::string _gen_predicate_result_sign(ColumnPredicateInfo* predicate_info);
|
||||
|
||||
void _build_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size,
|
||||
void _build_index_result_column(const uint16_t* sel_rowid_idx, uint16_t select_size,
|
||||
vectorized::Block* block, const std::string& pred_result_sign,
|
||||
const roaring::Roaring& index_result);
|
||||
void _output_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size,
|
||||
@ -340,7 +340,6 @@ private:
|
||||
roaring::Roaring _row_bitmap;
|
||||
// "column_name+operator+value-> <in_compound_query, rowid_result>
|
||||
std::unordered_map<std::string, std::pair<bool, roaring::Roaring>> _rowid_result_for_index;
|
||||
std::vector<std::pair<uint32_t, uint32_t>> _split_row_ranges;
|
||||
// an iterator for `_row_bitmap` that can be used to extract row range to scan
|
||||
std::unique_ptr<BitmapRangeIterator> _range_iter;
|
||||
// the next rowid to read
|
||||
|
||||
Reference in New Issue
Block a user