Fix the memory leak issue in CleanDtlIntermRes

This commit is contained in:
obdev
2023-09-01 06:14:22 +00:00
committed by ob-robot
parent b2505d511e
commit f8162edc20
2 changed files with 30 additions and 15 deletions

View File

@ -595,6 +595,18 @@ int ObSerialDfoScheduler::do_schedule_dfo(ObExecContext &ctx, ObDfo &dfo) const
return ret;
}
bool ObSerialDfoScheduler::CleanDtlIntermRes::operator()(const ObAddr &attr,
ObPxCleanDtlIntermResArgs *arg)
{
int ret = OB_SUCCESS;
if (OB_FAIL(coord_info_.rpc_proxy_.to(attr).by(tenant_id_).clean_dtl_interm_result(*arg, NULL))) {
LOG_WARN("send clean dtl interm result rpc failed", K(ret), K(attr), KPC(arg));
}
LOG_TRACE("clean dtl res map", K(attr), K(*arg));
delete arg;
return true;
}
void ObSerialDfoScheduler::clean_dtl_interm_result(ObExecContext &exec_ctx)
{
@ -609,7 +621,7 @@ void ObSerialDfoScheduler::clean_dtl_interm_result(ObExecContext &exec_ctx)
const ObDfo *root = coord_info_.dfo_mgr_.get_root_dfo();
int64_t batch_size = coord_info_.get_rescan_param_count();
ObSEArray<ObDfo*, 8> dfos;
common::hash::ObHashMap<ObAddr, ObPxCleanDtlIntermResArgs *> map;
ObLinearHashMap<ObAddr, ObPxCleanDtlIntermResArgs *> map;
ObIAllocator &allocator = exec_ctx.get_allocator();
for (int64_t i = 0; i < all_dfos.count(); i++) {
ObDfo *dfo = all_dfos.at(i);
@ -627,9 +639,9 @@ void ObSerialDfoScheduler::clean_dtl_interm_result(ObExecContext &exec_ctx)
}
if (OB_LIKELY(msg_idx < sqc.get_serial_receive_channels().count())) {
ObPxCleanDtlIntermResArgs *arg = NULL;
if (!map.created() && OB_FAIL(map.create(8, "CleanDtlRes"))) {
LOG_WARN("create map failed", K(ret));
} else if (OB_FAIL(map.get_refactored(sqc.get_exec_addr(), arg))) {
if (!map.is_inited() && OB_FAIL(map.init("CleanDtlRes", OB_SYS_TENANT_ID))) {
LOG_WARN("init map failed", K(ret));
} else if (OB_FAIL(map.get(sqc.get_exec_addr(), arg))) {
if (OB_HASH_NOT_EXIST == ret) {
void *buf = NULL;
if (OB_ISNULL(buf = allocator.alloc(sizeof(ObPxCleanDtlIntermResArgs)))) {
@ -638,8 +650,8 @@ void ObSerialDfoScheduler::clean_dtl_interm_result(ObExecContext &exec_ctx)
} else {
arg = new(buf) ObPxCleanDtlIntermResArgs();
arg->batch_size_ = coord_info_.get_rescan_param_count();
if (OB_FAIL(map.set_refactored(sqc.get_exec_addr(), arg))) {
LOG_WARN("set refactored failed", K(ret));
if (OB_FAIL(map.insert(sqc.get_exec_addr(), arg))) {
LOG_WARN("insert failed", K(ret));
}
}
} else {
@ -659,21 +671,17 @@ void ObSerialDfoScheduler::clean_dtl_interm_result(ObExecContext &exec_ctx)
}
// ignore allocate, set_refactored and push_back failure.
// send rpc to addrs inserted into the map successfully.
if (OB_UNLIKELY(!map.empty())) {
LOG_TRACE("clean dtl res map", K(map.size()));
if (OB_UNLIKELY(map.count() != 0)) {
LOG_TRACE("clean dtl res map", K(map.count()));
ObSQLSessionInfo *session = exec_ctx.get_my_session();
uint64_t tenant_id = OB_NOT_NULL(session) ? session->get_effective_tenant_id() : OB_SYS_TENANT_ID;
auto iter = map.begin();
for (; iter != map.end(); iter++) {
if (OB_FAIL(coord_info_.rpc_proxy_.to(iter->first).by(tenant_id).clean_dtl_interm_result(*iter->second, NULL))) {
LOG_WARN("send clean dtl interm result rpc failed", K(ret), K(iter->first), KPC(iter->second));
}
LOG_TRACE("clean dtl res map", K(iter->first), K(*(iter->second)));
CleanDtlIntermRes clean_dtl_interm_res(coord_info_, tenant_id);
if (OB_FAIL(map.for_each(clean_dtl_interm_res))) {
LOG_WARN("map for each clean_dtl_interm_res fail", KR(ret));
}
}
}
}
// -------------分割线-----------
// 启动 DFO 的 SQC 线程

View File

@ -68,6 +68,13 @@ public:
virtual void clean_dtl_interm_result(ObExecContext &ctx) override;
private:
struct CleanDtlIntermRes
{
ObPxCoordInfo &coord_info_;
uint64_t tenant_id_;
CleanDtlIntermRes(ObPxCoordInfo &coord_info, const uint64_t &tenant_id) : coord_info_(coord_info), tenant_id_(tenant_id) {}
bool operator()(const ObAddr &attr, ObPxCleanDtlIntermResArgs *arg);
};
int build_transmit_recieve_channel(ObExecContext &ctx, ObDfo *dfo) const;
int init_dfo_channel(ObExecContext &ctx, ObDfo *child, ObDfo *parent) const;
int init_data_xchg_ch(ObExecContext &ctx, ObDfo *dfo) const;