From 3d555f8db133fd77f234e0f70237bee621d5ec87 Mon Sep 17 00:00:00 2001 From: XIAO-HOU <372060054@qq.com> Date: Mon, 30 Sep 2024 09:46:23 +0000 Subject: [PATCH] Fix memory failure because of allocator reuse in count pushdown. --- src/storage/access/ob_aggregate_base.h | 2 +- src/storage/access/ob_aggregated_store.cpp | 4 ++-- src/storage/access/ob_aggregated_store.h | 2 +- .../access/ob_aggregated_store_vec.cpp | 23 +++++++++++-------- src/storage/access/ob_aggregated_store_vec.h | 4 ++-- .../access/ob_pushdown_aggregate_vec.cpp | 2 +- .../cs_encoding/ob_micro_block_cs_decoder.cpp | 1 + .../ob_micro_block_row_scanner.cpp | 4 ++-- .../blocksstable/ob_micro_block_row_scanner.h | 2 +- .../column_store/ob_cg_aggregated_scanner.cpp | 5 ++-- .../column_store/ob_virtual_cg_scanner.cpp | 2 +- 11 files changed, 29 insertions(+), 22 deletions(-) diff --git a/src/storage/access/ob_aggregate_base.h b/src/storage/access/ob_aggregate_base.h index 4ef265768..efae77edc 100644 --- a/src/storage/access/ob_aggregate_base.h +++ b/src/storage/access/ob_aggregate_base.h @@ -105,7 +105,7 @@ public: blocksstable::ObIMicroBlockReader *reader, const int32_t *row_ids, const int64_t row_count, - const bool projected) = 0; + const bool reserve_memory) = 0; virtual int can_use_index_info(const blocksstable::ObMicroIndexInfo &index_info, bool &can_agg) = 0; virtual int fill_index_info(const blocksstable::ObMicroIndexInfo &index_info, const bool is_cg) = 0; DECLARE_PURE_VIRTUAL_TO_STRING; diff --git a/src/storage/access/ob_aggregated_store.cpp b/src/storage/access/ob_aggregated_store.cpp index 582a8b0a2..a2a62ca16 100644 --- a/src/storage/access/ob_aggregated_store.cpp +++ b/src/storage/access/ob_aggregated_store.cpp @@ -94,9 +94,9 @@ int ObCGAggCells::eval_batch( blocksstable::ObIMicroBlockReader *reader, const int32_t *row_ids, const int64_t row_count, - const bool projected) + const bool reserve_memory) { - UNUSED(projected); + UNUSED(reserve_memory); int ret = OB_SUCCESS; for (int64_t i = 0; OB_SUCC(ret) && i < agg_cells_.count(); ++i) { if (agg_cells_.at(i)->finished()) { diff --git a/src/storage/access/ob_aggregated_store.h b/src/storage/access/ob_aggregated_store.h index 0090be342..cd1f957f1 100644 --- a/src/storage/access/ob_aggregated_store.h +++ b/src/storage/access/ob_aggregated_store.h @@ -49,7 +49,7 @@ public: blocksstable::ObIMicroBlockReader *reader, const int32_t *row_ids, const int64_t row_count, - const bool projected) override; + const bool reserve_memory) override; int eval(blocksstable::ObStorageDatum &datum, const int64_t row_count) override; int fill_index_info(const blocksstable::ObMicroIndexInfo &index_info, const bool is_cg) override; OB_INLINE bool is_vec() const override { return false; } diff --git a/src/storage/access/ob_aggregated_store_vec.cpp b/src/storage/access/ob_aggregated_store_vec.cpp index 590cf1547..6910e1750 100644 --- a/src/storage/access/ob_aggregated_store_vec.cpp +++ b/src/storage/access/ob_aggregated_store_vec.cpp @@ -74,15 +74,16 @@ int ObAggGroupVec::eval_batch( blocksstable::ObIMicroBlockReader *reader, const int32_t *row_ids, const int64_t row_count, - const bool projected) + const bool reserve_memory) { UNUSEDx(iter_param, context, col_idx); int ret = OB_SUCCESS; - blocksstable::ObIMicroBlockReader *real_reader = nullptr; + if (nullptr != reader && reserve_memory) { + reader->reserve_reader_memory(true); // hold memory before aggregation finished + } for (int64_t i = 0; OB_SUCC(ret) && i < agg_cells_.count(); ++i) { ObAggCellVec *agg_cell = agg_cells_.at(i); - real_reader = PD_COUNT == agg_cell->get_type() && !agg_type_flag_.only_count() && projected ? nullptr : reader; - if (OB_FAIL(agg_cell->eval_batch(real_reader, col_offset_, row_ids, row_count))) { + if (OB_FAIL(agg_cell->eval_batch(reader, col_offset_, row_ids, row_count))) { LOG_WARN("Failed to aggregate batch rows", K(ret), K(row_count)); } } @@ -402,7 +403,11 @@ int ObAggregatedStoreVec::fill_rows( bool need_get_row_ids = false; int64_t micro_row_count = 0; blocksstable::ObIMicroBlockReader *reader = scanner.get_reader(); - if (OB_FAIL(reader->get_row_count(micro_row_count))) { + if (OB_ISNULL(reader)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("Unexpected null reader", K(ret), K(reader)); + } else if (FALSE_IT(reader->reserve_reader_memory(false))) { + } else if (OB_FAIL(reader->get_row_count(micro_row_count))) { LOG_WARN("Failed to get micro row count", K(ret)); } else if (!need_access_data_) { if (need_get_row_ids_ || micro_row_count != covered_row_count) { @@ -424,7 +429,7 @@ int ObAggregatedStoreVec::fill_rows( !filter_is_null()))) { LOG_WARN("Failed to project rows in aggregate pushdown", K(ret), K(begin_index), K(end_index), K(res)); } - if (OB_SUCC(ret) && OB_FAIL(do_aggregate(reader))) { + if (OB_SUCC(ret) && OB_FAIL(do_aggregate(reader, need_access_data_))) { LOG_WARN("Failed to aggregate rows", K(ret), KP(reader)); } } @@ -457,14 +462,14 @@ int ObAggregatedStoreVec::fill_row(blocksstable::ObDatumRow &row) } else { count_++; eval_ctx_.set_batch_idx(count_); - if (OB_FAIL(do_aggregate())) { + if (OB_FAIL(do_aggregate(nullptr/*reader*/, false/*reserve_memory*/))) { LOG_WARN("Failed to aggregate rows", K(ret)); } } return ret; } -int ObAggregatedStoreVec::do_aggregate(blocksstable::ObIMicroBlockReader *reader) +int ObAggregatedStoreVec::do_aggregate(blocksstable::ObIMicroBlockReader *reader, const bool reserve_memory) { int ret = OB_SUCCESS; if (count_ > 0) { @@ -476,7 +481,7 @@ int ObAggregatedStoreVec::do_aggregate(blocksstable::ObIMicroBlockReader *reader reader, row_ids_, count_, - true))) { + reserve_memory))) { LOG_WARN("Failed to eval batch", K(ret), KPC(agg_group), K_(need_access_data), K_(need_get_row_ids)); } } diff --git a/src/storage/access/ob_aggregated_store_vec.h b/src/storage/access/ob_aggregated_store_vec.h index b1b2ea6fa..7e145209b 100644 --- a/src/storage/access/ob_aggregated_store_vec.h +++ b/src/storage/access/ob_aggregated_store_vec.h @@ -69,7 +69,7 @@ public: blocksstable::ObIMicroBlockReader *reader, const int32_t *row_ids, const int64_t row_count, - const bool projected) override; + const bool reserve_memory) override; int can_use_index_info(const blocksstable::ObMicroIndexInfo &index_info, bool &can_agg) override; int fill_index_info(const blocksstable::ObMicroIndexInfo &index_info, const bool is_cg) override; int collect_result(); @@ -146,7 +146,7 @@ private: void release_agg_group(); int init_agg_groups(const ObTableAccessParam ¶m); int check_agg_store_valid(); - int do_aggregate(blocksstable::ObIMicroBlockReader *reader = nullptr); + int do_aggregate(blocksstable::ObIMicroBlockReader *reader, const bool reserve_memory); OB_INLINE void reset_after_aggregate() { count_ = 0; diff --git a/src/storage/access/ob_pushdown_aggregate_vec.cpp b/src/storage/access/ob_pushdown_aggregate_vec.cpp index afecbdba4..51df34ee3 100644 --- a/src/storage/access/ob_pushdown_aggregate_vec.cpp +++ b/src/storage/access/ob_pushdown_aggregate_vec.cpp @@ -471,7 +471,7 @@ int ObCountAggCellVec::eval_batch( int64_t &data = *reinterpret_cast(agg_cell); if (!exclude_null_) { data += row_count; - } else if (nullptr == reader) { // row scan or group by pushdown or has other aggregate in one column + } else if (nullptr == reader) { // row scan or group by pushdown if (OB_FAIL(ObAggCellVec::eval_batch(reader, col_offset, row_ids, row_count, row_offset, agg_row_idx))) { LOG_WARN("Failed to aggregate batch rows", K(ret)); } diff --git a/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_decoder.cpp b/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_decoder.cpp index e53559f3b..43fe45d04 100644 --- a/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_decoder.cpp +++ b/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_decoder.cpp @@ -1750,6 +1750,7 @@ int ObMicroBlockCSDecoder::get_aggregate_result( ObAggCellVec &agg_cell) { int ret = OB_SUCCESS; + decoder_allocator_.reuse(); ObColumnCSDecoder *column_decoder = nullptr; if (OB_UNLIKELY(nullptr == row_ids || row_cap <= 0)) { ret = OB_INVALID_ARGUMENT; diff --git a/src/storage/blocksstable/ob_micro_block_row_scanner.cpp b/src/storage/blocksstable/ob_micro_block_row_scanner.cpp index af906efa7..c4692b8f9 100644 --- a/src/storage/blocksstable/ob_micro_block_row_scanner.cpp +++ b/src/storage/blocksstable/ob_micro_block_row_scanner.cpp @@ -986,7 +986,7 @@ int ObIMicroBlockRowScanner::get_aggregate_result( const int32_t col_idx, const int32_t *row_ids, const int64_t row_cap, - const bool projected, + const bool reserve_memory, ObAggGroupBase &agg_group) { int ret = OB_SUCCESS; @@ -996,7 +996,7 @@ int ObIMicroBlockRowScanner::get_aggregate_result( } else if (OB_UNLIKELY(nullptr == row_ids || row_cap < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid arguments", K(ret), KP(row_ids), K(row_cap)); - } else if (OB_FAIL(agg_group.eval_batch(param_, context_, col_idx, reader_, row_ids, row_cap, projected))) { + } else if (OB_FAIL(agg_group.eval_batch(param_, context_, col_idx, reader_, row_ids, row_cap, reserve_memory))) { LOG_WARN("Fail to eval batch rows", K(ret)); } return ret; diff --git a/src/storage/blocksstable/ob_micro_block_row_scanner.h b/src/storage/blocksstable/ob_micro_block_row_scanner.h index fa92e84d3..1db08e3a4 100644 --- a/src/storage/blocksstable/ob_micro_block_row_scanner.h +++ b/src/storage/blocksstable/ob_micro_block_row_scanner.h @@ -98,7 +98,7 @@ public: const int32_t col_idx, const int32_t *row_ids, const int64_t row_cap, - const bool projected, + const bool reserve_memory, ObAggGroupBase &agg_group); int advance_to_border( const ObDatumRowkey &rowkey, diff --git a/src/storage/column_store/ob_cg_aggregated_scanner.cpp b/src/storage/column_store/ob_cg_aggregated_scanner.cpp index 810213db7..fedb206e1 100644 --- a/src/storage/column_store/ob_cg_aggregated_scanner.cpp +++ b/src/storage/column_store/ob_cg_aggregated_scanner.cpp @@ -122,7 +122,7 @@ int ObCGAggregatedScanner::get_next_rows(uint64_t &count, const uint64_t capacit nullptr/*reader*/, nullptr/*row_ids*/, cur_processed_row_count_, - false))) { + false/*reserve_memory*/))) { LOG_WARN("Fail to eval batch rows", K(ret)); } else { ret = OB_ITER_END; @@ -156,6 +156,7 @@ int ObCGAggregatedScanner::inner_fetch_rows(const int64_t row_cap, const int64_t int ret = OB_SUCCESS; bool projected = true; if (access_ctx_->block_row_store_->is_vec2()) { + micro_scanner_->reserve_reader_memory(false); ObAggGroupVec *agg_group_vec = static_cast(agg_group_); projected = agg_group_vec->check_need_project(micro_scanner_->get_reader(), row_ids_, row_cap); if (!projected) { @@ -174,7 +175,7 @@ int ObCGAggregatedScanner::inner_fetch_rows(const int64_t row_cap, const int64_t } if (OB_FAIL(ret)) { - } else if (OB_FAIL(micro_scanner_->get_aggregate_result(0/*col_idx*/, row_ids_, row_cap, projected, *agg_group_))) { + } else if (OB_FAIL(micro_scanner_->get_aggregate_result(0/*col_idx*/, row_ids_, row_cap, projected/*reserve_memory*/, *agg_group_))) { LOG_WARN("Fail to get aggregate result", K(ret)); } return ret; diff --git a/src/storage/column_store/ob_virtual_cg_scanner.cpp b/src/storage/column_store/ob_virtual_cg_scanner.cpp index 7f73187ea..72224481d 100644 --- a/src/storage/column_store/ob_virtual_cg_scanner.cpp +++ b/src/storage/column_store/ob_virtual_cg_scanner.cpp @@ -168,7 +168,7 @@ int ObVirtualCGScanner::get_next_rows(uint64_t &count, const uint64_t capacity) } } else { if (OB_FAIL(agg_group_->eval_batch(iter_param_, access_ctx_, 0/*col_idx*/, nullptr/*reader*/, - nullptr/*row_ids*/, current_group_size_, false))) { + nullptr/*row_ids*/, current_group_size_, false/*reserve_memory*/))) { LOG_WARN("Fail to eval batch rows", K(ret)); } else { count = current_group_size_;