Fix memory failure because of allocator reuse in count pushdown.

This commit is contained in:
XIAO-HOU 2024-09-30 09:46:23 +00:00 committed by ob-robot
parent c5d879f7ba
commit 3d555f8db1
11 changed files with 29 additions and 22 deletions

View File

@ -105,7 +105,7 @@ public:
blocksstable::ObIMicroBlockReader *reader,
const int32_t *row_ids,
const int64_t row_count,
const bool projected) = 0;
const bool reserve_memory) = 0;
virtual int can_use_index_info(const blocksstable::ObMicroIndexInfo &index_info, bool &can_agg) = 0;
virtual int fill_index_info(const blocksstable::ObMicroIndexInfo &index_info, const bool is_cg) = 0;
DECLARE_PURE_VIRTUAL_TO_STRING;

View File

@ -94,9 +94,9 @@ int ObCGAggCells::eval_batch(
blocksstable::ObIMicroBlockReader *reader,
const int32_t *row_ids,
const int64_t row_count,
const bool projected)
const bool reserve_memory)
{
UNUSED(projected);
UNUSED(reserve_memory);
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < agg_cells_.count(); ++i) {
if (agg_cells_.at(i)->finished()) {

View File

@ -49,7 +49,7 @@ public:
blocksstable::ObIMicroBlockReader *reader,
const int32_t *row_ids,
const int64_t row_count,
const bool projected) override;
const bool reserve_memory) override;
int eval(blocksstable::ObStorageDatum &datum, const int64_t row_count) override;
int fill_index_info(const blocksstable::ObMicroIndexInfo &index_info, const bool is_cg) override;
OB_INLINE bool is_vec() const override { return false; }

View File

@ -74,15 +74,16 @@ int ObAggGroupVec::eval_batch(
blocksstable::ObIMicroBlockReader *reader,
const int32_t *row_ids,
const int64_t row_count,
const bool projected)
const bool reserve_memory)
{
UNUSEDx(iter_param, context, col_idx);
int ret = OB_SUCCESS;
blocksstable::ObIMicroBlockReader *real_reader = nullptr;
if (nullptr != reader && reserve_memory) {
reader->reserve_reader_memory(true); // hold memory before aggregation finished
}
for (int64_t i = 0; OB_SUCC(ret) && i < agg_cells_.count(); ++i) {
ObAggCellVec *agg_cell = agg_cells_.at(i);
real_reader = PD_COUNT == agg_cell->get_type() && !agg_type_flag_.only_count() && projected ? nullptr : reader;
if (OB_FAIL(agg_cell->eval_batch(real_reader, col_offset_, row_ids, row_count))) {
if (OB_FAIL(agg_cell->eval_batch(reader, col_offset_, row_ids, row_count))) {
LOG_WARN("Failed to aggregate batch rows", K(ret), K(row_count));
}
}
@ -402,7 +403,11 @@ int ObAggregatedStoreVec::fill_rows(
bool need_get_row_ids = false;
int64_t micro_row_count = 0;
blocksstable::ObIMicroBlockReader *reader = scanner.get_reader();
if (OB_FAIL(reader->get_row_count(micro_row_count))) {
if (OB_ISNULL(reader)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null reader", K(ret), K(reader));
} else if (FALSE_IT(reader->reserve_reader_memory(false))) {
} else if (OB_FAIL(reader->get_row_count(micro_row_count))) {
LOG_WARN("Failed to get micro row count", K(ret));
} else if (!need_access_data_) {
if (need_get_row_ids_ || micro_row_count != covered_row_count) {
@ -424,7 +429,7 @@ int ObAggregatedStoreVec::fill_rows(
!filter_is_null()))) {
LOG_WARN("Failed to project rows in aggregate pushdown", K(ret), K(begin_index), K(end_index), K(res));
}
if (OB_SUCC(ret) && OB_FAIL(do_aggregate(reader))) {
if (OB_SUCC(ret) && OB_FAIL(do_aggregate(reader, need_access_data_))) {
LOG_WARN("Failed to aggregate rows", K(ret), KP(reader));
}
}
@ -457,14 +462,14 @@ int ObAggregatedStoreVec::fill_row(blocksstable::ObDatumRow &row)
} else {
count_++;
eval_ctx_.set_batch_idx(count_);
if (OB_FAIL(do_aggregate())) {
if (OB_FAIL(do_aggregate(nullptr/*reader*/, false/*reserve_memory*/))) {
LOG_WARN("Failed to aggregate rows", K(ret));
}
}
return ret;
}
int ObAggregatedStoreVec::do_aggregate(blocksstable::ObIMicroBlockReader *reader)
int ObAggregatedStoreVec::do_aggregate(blocksstable::ObIMicroBlockReader *reader, const bool reserve_memory)
{
int ret = OB_SUCCESS;
if (count_ > 0) {
@ -476,7 +481,7 @@ int ObAggregatedStoreVec::do_aggregate(blocksstable::ObIMicroBlockReader *reader
reader,
row_ids_,
count_,
true))) {
reserve_memory))) {
LOG_WARN("Failed to eval batch", K(ret), KPC(agg_group), K_(need_access_data), K_(need_get_row_ids));
}
}

View File

@ -69,7 +69,7 @@ public:
blocksstable::ObIMicroBlockReader *reader,
const int32_t *row_ids,
const int64_t row_count,
const bool projected) override;
const bool reserve_memory) override;
int can_use_index_info(const blocksstable::ObMicroIndexInfo &index_info, bool &can_agg) override;
int fill_index_info(const blocksstable::ObMicroIndexInfo &index_info, const bool is_cg) override;
int collect_result();
@ -146,7 +146,7 @@ private:
void release_agg_group();
int init_agg_groups(const ObTableAccessParam &param);
int check_agg_store_valid();
int do_aggregate(blocksstable::ObIMicroBlockReader *reader = nullptr);
int do_aggregate(blocksstable::ObIMicroBlockReader *reader, const bool reserve_memory);
OB_INLINE void reset_after_aggregate()
{
count_ = 0;

View File

@ -471,7 +471,7 @@ int ObCountAggCellVec::eval_batch(
int64_t &data = *reinterpret_cast<int64_t *>(agg_cell);
if (!exclude_null_) {
data += row_count;
} else if (nullptr == reader) { // row scan or group by pushdown or has other aggregate in one column
} else if (nullptr == reader) { // row scan or group by pushdown
if (OB_FAIL(ObAggCellVec::eval_batch(reader, col_offset, row_ids, row_count, row_offset, agg_row_idx))) {
LOG_WARN("Failed to aggregate batch rows", K(ret));
}

View File

@ -1750,6 +1750,7 @@ int ObMicroBlockCSDecoder::get_aggregate_result(
ObAggCellVec &agg_cell)
{
int ret = OB_SUCCESS;
decoder_allocator_.reuse();
ObColumnCSDecoder *column_decoder = nullptr;
if (OB_UNLIKELY(nullptr == row_ids || row_cap <= 0)) {
ret = OB_INVALID_ARGUMENT;

View File

@ -986,7 +986,7 @@ int ObIMicroBlockRowScanner::get_aggregate_result(
const int32_t col_idx,
const int32_t *row_ids,
const int64_t row_cap,
const bool projected,
const bool reserve_memory,
ObAggGroupBase &agg_group)
{
int ret = OB_SUCCESS;
@ -996,7 +996,7 @@ int ObIMicroBlockRowScanner::get_aggregate_result(
} else if (OB_UNLIKELY(nullptr == row_ids || row_cap < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid arguments", K(ret), KP(row_ids), K(row_cap));
} else if (OB_FAIL(agg_group.eval_batch(param_, context_, col_idx, reader_, row_ids, row_cap, projected))) {
} else if (OB_FAIL(agg_group.eval_batch(param_, context_, col_idx, reader_, row_ids, row_cap, reserve_memory))) {
LOG_WARN("Fail to eval batch rows", K(ret));
}
return ret;

View File

@ -98,7 +98,7 @@ public:
const int32_t col_idx,
const int32_t *row_ids,
const int64_t row_cap,
const bool projected,
const bool reserve_memory,
ObAggGroupBase &agg_group);
int advance_to_border(
const ObDatumRowkey &rowkey,

View File

@ -122,7 +122,7 @@ int ObCGAggregatedScanner::get_next_rows(uint64_t &count, const uint64_t capacit
nullptr/*reader*/,
nullptr/*row_ids*/,
cur_processed_row_count_,
false))) {
false/*reserve_memory*/))) {
LOG_WARN("Fail to eval batch rows", K(ret));
} else {
ret = OB_ITER_END;
@ -156,6 +156,7 @@ int ObCGAggregatedScanner::inner_fetch_rows(const int64_t row_cap, const int64_t
int ret = OB_SUCCESS;
bool projected = true;
if (access_ctx_->block_row_store_->is_vec2()) {
micro_scanner_->reserve_reader_memory(false);
ObAggGroupVec *agg_group_vec = static_cast<ObAggGroupVec *>(agg_group_);
projected = agg_group_vec->check_need_project(micro_scanner_->get_reader(), row_ids_, row_cap);
if (!projected) {
@ -174,7 +175,7 @@ int ObCGAggregatedScanner::inner_fetch_rows(const int64_t row_cap, const int64_t
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(micro_scanner_->get_aggregate_result(0/*col_idx*/, row_ids_, row_cap, projected, *agg_group_))) {
} else if (OB_FAIL(micro_scanner_->get_aggregate_result(0/*col_idx*/, row_ids_, row_cap, projected/*reserve_memory*/, *agg_group_))) {
LOG_WARN("Fail to get aggregate result", K(ret));
}
return ret;

View File

@ -168,7 +168,7 @@ int ObVirtualCGScanner::get_next_rows(uint64_t &count, const uint64_t capacity)
}
} else {
if (OB_FAIL(agg_group_->eval_batch(iter_param_, access_ctx_, 0/*col_idx*/, nullptr/*reader*/,
nullptr/*row_ids*/, current_group_size_, false))) {
nullptr/*row_ids*/, current_group_size_, false/*reserve_memory*/))) {
LOG_WARN("Fail to eval batch rows", K(ret));
} else {
count = current_group_size_;