Fix common array memleak
This commit is contained in:
@ -377,12 +377,20 @@ int ObPxSubCoord::setup_op_input(ObExecContext &ctx,
|
|||||||
ctx, sqc.get_task_count(),
|
ctx, sqc.get_task_count(),
|
||||||
filter_spec->is_shuffle_? sqc.get_sqc_count() : 1))) {
|
filter_spec->is_shuffle_? sqc.get_sqc_count() : 1))) {
|
||||||
LOG_WARN("fail to init share info", K(ret));
|
LOG_WARN("fail to init share info", K(ret));
|
||||||
} else if (filter_spec->is_shuffle_) {
|
} else {
|
||||||
|
if (OB_FAIL(all_shared_rf_msgs_.push_back(filter_input->share_info_.shared_msgs_))) {
|
||||||
|
LOG_WARN("fail to push back rf msgs", K(ret));
|
||||||
|
}
|
||||||
|
if (OB_FAIL(ret) && filter_input->share_info_.shared_msgs_ != 0) {
|
||||||
ObArray<ObP2PDatahubMsgBase *> *array_ptr =
|
ObArray<ObP2PDatahubMsgBase *> *array_ptr =
|
||||||
reinterpret_cast<ObArray<ObP2PDatahubMsgBase *> *>(filter_input->share_info_.shared_msgs_);
|
reinterpret_cast<ObArray<ObP2PDatahubMsgBase *> *>(filter_input->share_info_.shared_msgs_);
|
||||||
for (int i = 0; OB_SUCC(ret) && i < array_ptr->count(); ++i) {
|
for (int j = 0; j < array_ptr->count(); ++j) {
|
||||||
if (OB_FAIL(rf_msgs_.push_back(array_ptr->at(i)))) {
|
if (OB_NOT_NULL(array_ptr->at(j))) {
|
||||||
LOG_WARN("fail to push back rf msgs", K(ret));
|
array_ptr->at(j)->destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!array_ptr->empty()) {
|
||||||
|
array_ptr->reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -719,12 +727,15 @@ int ObPxSubCoord::end_process()
|
|||||||
LOG_WARN("fail check task finish status", K(ret));
|
LOG_WARN("fail check task finish status", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (int i = 0; i < rf_msgs_.count(); ++i) {
|
for (int i = 0; i < all_shared_rf_msgs_.count(); ++i) {
|
||||||
rf_msgs_.at(i)->destroy();
|
ObArray<ObP2PDatahubMsgBase *> *array_ptr =
|
||||||
rf_msgs_.at(i) = nullptr;
|
reinterpret_cast<ObArray<ObP2PDatahubMsgBase *> *>(all_shared_rf_msgs_.at(i));
|
||||||
|
for (int j = 0; OB_NOT_NULL(array_ptr) && j < array_ptr->count(); ++j) {
|
||||||
|
array_ptr->at(j)->destroy();
|
||||||
|
}
|
||||||
|
if (OB_NOT_NULL(array_ptr) && !array_ptr->empty()) {
|
||||||
|
array_ptr->reset();
|
||||||
}
|
}
|
||||||
if (!rf_msgs_.empty()) {
|
|
||||||
rf_msgs_.reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
NG_TRACE(tag3);
|
NG_TRACE(tag3);
|
||||||
|
@ -54,7 +54,7 @@ public:
|
|||||||
thread_worker_factory_(gctx, allocator_),
|
thread_worker_factory_(gctx, allocator_),
|
||||||
first_buffer_cache_(allocator_),
|
first_buffer_cache_(allocator_),
|
||||||
is_single_tsc_leaf_dfo_(false),
|
is_single_tsc_leaf_dfo_(false),
|
||||||
rf_msgs_()
|
all_shared_rf_msgs_()
|
||||||
{}
|
{}
|
||||||
virtual ~ObPxSubCoord() = default;
|
virtual ~ObPxSubCoord() = default;
|
||||||
int pre_process();
|
int pre_process();
|
||||||
@ -148,7 +148,7 @@ private:
|
|||||||
int64_t reserved_thread_count_;
|
int64_t reserved_thread_count_;
|
||||||
dtl::ObDtlLocalFirstBufferCache first_buffer_cache_;
|
dtl::ObDtlLocalFirstBufferCache first_buffer_cache_;
|
||||||
bool is_single_tsc_leaf_dfo_;
|
bool is_single_tsc_leaf_dfo_;
|
||||||
ObArray<ObP2PDatahubMsgBase *> rf_msgs_; // for clear
|
ObArray<int64_t> all_shared_rf_msgs_; // for clear
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObPxSubCoord);
|
DISALLOW_COPY_AND_ASSIGN(ObPxSubCoord);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user