[OBKV] Fix multi-cf iterator memory leak when init failed

This commit is contained in:
shenyunlong.syl 2024-10-23 03:44:05 +00:00 committed by ob-robot
parent 634ab9cd35
commit 9dfa7802d5
5 changed files with 65 additions and 28 deletions

View File

@ -27,6 +27,7 @@ public:
ObBinaryHeapBase(CompareFunctor &cmp, common::ObIAllocator *allocator = NULL);
virtual ~ObBinaryHeapBase() { /* do nothing */}
int push(const T &element);
int push(const T &element, bool &in_heap);
int pop();
const T& top() const;
T& top();
@ -89,6 +90,20 @@ int ObBinaryHeapBase<T, CompareFunctor, LOCAL_ARRAY_SIZE>::push(const T &element
return ret;
}
template<typename T, typename CompareFunctor, int64_t LOCAL_ARRAY_SIZE>
int ObBinaryHeapBase<T, CompareFunctor, LOCAL_ARRAY_SIZE>::push(const T &element, bool &in_heap)
{
int ret = OB_SUCCESS;
in_heap = false;
if (OB_FAIL(array_.push_back(element))) {
LIB_LOG(WARN, "push element to array failed", K(ret));
} else {
in_heap = true;
ret = upheap(array_.count() - 1);
}
return ret;
}
template<typename T, typename CompareFunctor, int64_t LOCAL_ARRAY_SIZE>
int ObBinaryHeapBase<T, CompareFunctor, LOCAL_ARRAY_SIZE>::pop()
{

View File

@ -26,7 +26,7 @@ template <typename Row, typename Compare>
class ObMergeTableQueryResultIterator : public ObTableQueryResultIterator
{
public:
ObMergeTableQueryResultIterator(common::ObIAllocator* allocator, const ObTableQuery& query, table::ObTableQueryResult &one_result);
ObMergeTableQueryResultIterator(common::ObIAllocator& allocator, const ObTableQuery& query, table::ObTableQueryResult &one_result);
TO_STRING_KV(KP_(one_result), K_(binary_heap));
virtual ~ObMergeTableQueryResultIterator()
{
@ -35,9 +35,13 @@ public:
binary_heap_.at(i)->~ObRowCacheIterator();
}
}
for (int j = 0; j < inner_result_iters_.count(); j++) {
ObTableQueryUtils::destroy_result_iterator(inner_result_iters_.at(j));
}
}
int init(const ObArray<ObTableQueryResultIterator*>& result_iters, Compare* compare);
int init(Compare* compare);
virtual int get_next_result(ObTableQueryResult *&one_result) override;
@ -48,6 +52,7 @@ public:
virtual void set_one_result(ObTableQueryResult *result) { one_result_ = result; }
int seek(const ObString& key);
common::ObIArray<ObTableQueryResultIterator *>& get_inner_result_iterators() { return inner_result_iters_; }
private:
@ -60,7 +65,7 @@ private:
virtual hfilter::Filter *get_filter() const override { return nullptr; }
private:
int build_heap(const ObArray<ObTableQueryResultIterator*>& result_iters);
int build_heap(const common::ObIArray<ObTableQueryResultIterator*>& result_iters);
private:
struct ObRowCacheIterator
@ -76,9 +81,6 @@ private:
if (OB_NOT_NULL(iterable_result_)) {
iterable_result_->~ObTableQueryIterableResult();
}
if (OB_NOT_NULL(result_iter_)) {
result_iter_->~ObTableQueryResultIterator();
}
}
int init(ObTableQueryResultIterator *result_iter);
int get_next_row(Row &row);
@ -101,18 +103,19 @@ private:
int error_code_;
};
private:
common::ObIAllocator* allocator_;
common::ObIAllocator& allocator_;
HeapCompare compare_;
table::ObTableQueryResult* one_result_;
common::ObBinaryHeap<ObRowCacheIterator*, HeapCompare, 64> binary_heap_;
common::ObSEArray<ObTableQueryResultIterator *, 8> inner_result_iters_;
bool is_inited_;
};
template <typename Row, typename Compare>
ObMergeTableQueryResultIterator<Row, Compare>::ObMergeTableQueryResultIterator(common::ObIAllocator* allocator,
ObMergeTableQueryResultIterator<Row, Compare>::ObMergeTableQueryResultIterator(common::ObIAllocator& allocator,
const ObTableQuery& query,
table::ObTableQueryResult &one_result)
: ObTableQueryResultIterator(&query),
@ -121,6 +124,7 @@ ObMergeTableQueryResultIterator<Row, Compare>::ObMergeTableQueryResultIterator(c
binary_heap_(compare_),
is_inited_(false)
{
inner_result_iters_.set_attr(ObMemAttr(MTL_ID(), "MergeInnerIters"));
}
@ -145,7 +149,7 @@ bool ObMergeTableQueryResultIterator<Row, Compare>::HeapCompare::operator()(cons
}
template <typename Row, typename Compare>
int ObMergeTableQueryResultIterator<Row, Compare>::init(const ObArray<ObTableQueryResultIterator*>& result_iters, Compare* compare)
int ObMergeTableQueryResultIterator<Row, Compare>::init(Compare* compare)
{
int ret = common::OB_SUCCESS;
if (IS_INIT) {
@ -154,8 +158,8 @@ int ObMergeTableQueryResultIterator<Row, Compare>::init(const ObArray<ObTableQue
ret = common::OB_INVALID_ARGUMENT;
} else {
compare_.compare_ = compare;
if (result_iters.count() >= 1 && OB_FAIL(build_heap(result_iters))) {
SERVER_LOG(WARN, "fail to build heap", K(ret), K(result_iters.count()));
if (inner_result_iters_.count() >= 1 && OB_FAIL(build_heap(inner_result_iters_))) {
SERVER_LOG(WARN, "fail to build heap", K(ret), K(inner_result_iters_.count()));
} else {
is_inited_ = true;
}
@ -164,11 +168,12 @@ int ObMergeTableQueryResultIterator<Row, Compare>::init(const ObArray<ObTableQue
}
template <typename Row, typename Compare>
int ObMergeTableQueryResultIterator<Row, Compare>::build_heap(const ObArray<ObTableQueryResultIterator*>& result_iters)
int ObMergeTableQueryResultIterator<Row, Compare>::build_heap(const ObIArray<ObTableQueryResultIterator*>& result_iters)
{
int ret = common::OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < result_iters.count(); ++i) {
ObRowCacheIterator* cache_iterator = OB_NEWx(ObRowCacheIterator, allocator_);
bool in_heap = false;
ObRowCacheIterator* cache_iterator = OB_NEWx(ObRowCacheIterator, &allocator_);
ObTableQueryResultIterator* iter = result_iters.at(i);
if (OB_ISNULL(cache_iterator)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -179,13 +184,16 @@ int ObMergeTableQueryResultIterator<Row, Compare>::build_heap(const ObArray<ObTa
} else {
SERVER_LOG(WARN, "fail to init cache_iter", K(ret));
}
cache_iterator->~ObRowCacheIterator();
} else if (OB_FAIL(binary_heap_.push(cache_iterator))) {
cache_iterator->~ObRowCacheIterator();
} else if (OB_FAIL(binary_heap_.push(cache_iterator, in_heap))) {
SERVER_LOG(WARN, "fail to push heap", K(ret));
} else if (OB_FAIL(compare_.get_error_code())) {
SERVER_LOG(WARN, "fail to compare items", K(ret));
}
if (in_heap == false && OB_NOT_NULL(cache_iterator)) {
cache_iterator->~ObRowCacheIterator();
allocator_.free(cache_iterator);
}
}
return ret;
}

View File

@ -643,26 +643,34 @@ int ObTableQueryAsyncP::init_tb_ctx(ObTableCtx &ctx)
return ret;
}
int ObTableQueryAsyncP::generate_merge_result_iterator(const ObArray<ObTableQueryResultIterator*>& array_result) {
int ObTableQueryAsyncP::generate_merge_result_iterator()
{
int ret = OB_SUCCESS;
// Merge Iterator: Holds multiple underlying iterators, stored in its own heap
ResultMergeIterator *merge_result_iter = nullptr;
ObTableHbaseRowKeyDefaultCompare *compare = nullptr;
if (OB_ISNULL(merge_result_iter = OB_NEWx(ResultMergeIterator,
&allocator_,
&allocator_,
allocator_,
query_session_->get_query(),
result_ /* Response serlize row*/))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to create merge_result_iter", K(ret));
} else if (OB_FAIL(generate_multi_result_iterator(merge_result_iter->get_inner_result_iterators()))) {
LOG_WARN("fail to generate multi result inner iterator", K(ret));
} else if (OB_ISNULL(compare = OB_NEWx(ObTableHbaseRowKeyDefaultCompare, &allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to create compare, alloc memory fail", K(ret));
} else if (OB_FAIL(merge_result_iter->init(array_result, compare))) {
} else if (OB_FAIL(merge_result_iter->init(compare))) {
LOG_WARN("fail to build merge_result_iter", K(ret));
} else {
query_session_->set_result_iterator(merge_result_iter);
}
if (OB_FAIL(ret) && OB_NOT_NULL(merge_result_iter)) {
merge_result_iter->~ResultMergeIterator();
allocator_.free(merge_result_iter);
}
return ret;
}
@ -749,7 +757,7 @@ int ObTableQueryAsyncP::process_table_info(ObTableSingleQueryInfo* table_info,
return ret;
}
int ObTableQueryAsyncP::generate_multi_result_iterator(ObArray<ObTableQueryResultIterator*>& inner_result_iterator_list) {
int ObTableQueryAsyncP::generate_multi_result_iterator(ObIArray<ObTableQueryResultIterator*>& inner_result_iterator_list) {
int ret = OB_SUCCESS;
ObIAllocator *allocator = query_session_->get_allocator();
ObTableQueryAsyncCtx &query_ctx = query_session_->get_query_ctx();
@ -817,6 +825,10 @@ int ObTableQueryAsyncP::generate_multi_result_iterator(ObArray<ObTableQueryResul
LOG_WARN(" fail to push back result_iter to array_result", K(ret));
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(result_iter)) {
ObTableQueryUtils::destroy_result_iterator(result_iter);
}
}
}
return ret;
@ -1211,7 +1223,6 @@ int ObTableQueryAsyncP::query_scan_multi_cf_with_init() {
}
}
}
ObArray<ObTableQueryResultIterator*> inner_result_iterator_list;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(query_session_->deep_copy_select_columns(query.get_select_columns(), query_infos.at(0)->tb_ctx_.get_query_col_names()))) {
LOG_WARN("fail to deep copy select columns from table ctx", K(ret));
@ -1222,9 +1233,7 @@ int ObTableQueryAsyncP::query_scan_multi_cf_with_init() {
query_infos.at(0)->tb_ctx_.need_dist_das(),
query_session_->get_trans_state()))) {
LOG_WARN("fail to start readonly transaction", K(ret), K(query_infos.at(0)->tb_ctx_));
} else if (OB_FAIL(generate_multi_result_iterator(inner_result_iterator_list))) {
LOG_WARN("fail to generate_multi_result_iterator", K(ret));
} else if (OB_FAIL(generate_merge_result_iterator(inner_result_iterator_list))) {
} else if (OB_FAIL(generate_merge_result_iterator())) {
LOG_WARN("fail to generate_merge_result_iterator", K(ret));
} else if (OB_FAIL(execute_multi_cf_query())) {
LOG_WARN("fail to execute query", K(ret));

View File

@ -316,9 +316,9 @@ private:
int create_result_iterator(ObTableQueryAsyncSession* query_session, table::ObTableQueryResultIterator*& result_iter, uint64_t table_id);
int generate_multi_result_iterator(ObArray<table::ObTableQueryResultIterator*>& array);
int generate_multi_result_iterator(common::ObIArray<table::ObTableQueryResultIterator*>& array);
int generate_merge_result_iterator(const ObArray<table::ObTableQueryResultIterator*>& array);
int generate_merge_result_iterator();
int execute_multi_cf_query();
int execute_query();

View File

@ -101,6 +101,9 @@ int ObTableQueryUtils::generate_htable_result_iterator(ObIAllocator &allocator,
if (OB_SUCC(ret)) {
result_iter = htable_result_iter;
} else if (OB_NOT_NULL(htable_result_iter)) {
htable_result_iter->~ObHTableFilterOperator();
allocator.free(htable_result_iter);
}
return ret;
@ -148,8 +151,8 @@ int ObTableQueryUtils::generate_query_result_iterator(ObIAllocator &allocator,
table_result_iter->init_aggregation();
table_result_iter->get_agg_calculator().set_projs(tb_ctx.get_agg_projs());
}
tmp_result_iter = table_result_iter;
}
tmp_result_iter = table_result_iter;
}
} else { // no filter
ObNormalTableQueryResultIterator *normal_result_iter = nullptr;
@ -170,12 +173,14 @@ int ObTableQueryUtils::generate_query_result_iterator(ObIAllocator &allocator,
normal_result_iter->init_aggregation();
normal_result_iter->get_agg_calculator().set_projs(tb_ctx.get_agg_projs());
}
tmp_result_iter = normal_result_iter;
}
tmp_result_iter = normal_result_iter;
}
if (OB_SUCC(ret)) {
result_iter = tmp_result_iter;
} else if (OB_NOT_NULL(tmp_result_iter)) {
destroy_result_iterator(tmp_result_iter);
}
return ret;