Fix direct load mgr leak after migration
This commit is contained in:
@ -912,6 +912,46 @@ int ObTenantDirectLoadMgr::get_tablet_mgr_no_lock(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantDirectLoadMgr::GetGcCandidateOp::operator() (common::hash::HashMapPair<ObTabletDirectLoadMgrKey, ObTabletDirectLoadMgr *> &kv)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObTabletDirectLoadMgrKey &key = kv.first;
|
||||
ObTabletDirectLoadMgr *tablet_direct_load_mgr = kv.second;
|
||||
if (1 == tablet_direct_load_mgr->get_ref()) {
|
||||
if (OB_FAIL(candidate_mgrs_.push_back(key))) {
|
||||
LOG_WARN("failed to push back", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantDirectLoadMgr::gc_tablet_direct_load(ObLS &ls)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!tablet_mgr_map_.empty()) {
|
||||
ObSEArray<ObTabletDirectLoadMgrKey, 8> candidate_mgrs;
|
||||
{
|
||||
ObBucketTryRLockAllGuard guard(bucket_lock_);
|
||||
if (OB_SUCC(guard.get_ret())) {
|
||||
GetGcCandidateOp op(candidate_mgrs);
|
||||
(void)tablet_mgr_map_.foreach_refactored(op);
|
||||
}
|
||||
}
|
||||
|
||||
for (int64_t i = 0; i < candidate_mgrs.count(); i++) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObTabletDirectLoadMgrKey mgr_key = candidate_mgrs.at(i);
|
||||
ObTabletHandle tablet_handle;
|
||||
if (OB_TMP_FAIL(ls.get_tablet(mgr_key.tablet_id_, tablet_handle))) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K(mgr_key));
|
||||
} else if (tablet_handle.get_obj()->get_major_table_count() > 0) {
|
||||
(void)remove_tablet_direct_load(mgr_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantDirectLoadMgr::remove_tablet_direct_load(const ObTabletDirectLoadMgrKey &mgr_key)
|
||||
{
|
||||
ObBucketHashWLockGuard guard(bucket_lock_, mgr_key.hash());
|
||||
@ -942,6 +982,8 @@ int ObTenantDirectLoadMgr::remove_tablet_direct_load_nolock(const ObTabletDirect
|
||||
if (0 == tablet_direct_load_mgr->dec_ref()) {
|
||||
tablet_direct_load_mgr->~ObTabletDirectLoadMgr();
|
||||
allocator_.free(tablet_direct_load_mgr);
|
||||
} else {
|
||||
// unreachable
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2514,6 +2556,7 @@ int ObTabletFullDirectLoadMgr::set_commit_scn(const share::SCN &commit_scn)
|
||||
return ret;
|
||||
}
|
||||
|
||||
// return latest commit_scn iff tablet_meta is newer than the creation of ObTabletFullDirectLoadMgr
|
||||
share::SCN ObTabletFullDirectLoadMgr::get_commit_scn(const ObTabletMeta &tablet_meta)
|
||||
{
|
||||
share::SCN mgr_commit_scn = commit_scn_.atomic_load();
|
||||
|
@ -184,12 +184,23 @@ public:
|
||||
const share::ObLSID &ls_id,
|
||||
const common::ObTabletID &tablet_id,
|
||||
const bool is_full_direct_load);
|
||||
int gc_tablet_direct_load(ObLS &ls);
|
||||
// remove tablet direct load mgr from hashmap,
|
||||
// for full direct load, it will be called when physical major generates,
|
||||
// for incremental direct load, it will be called when all KVs dump.
|
||||
int remove_tablet_direct_load(const ObTabletDirectLoadMgrKey &mgr_key);
|
||||
ObIAllocator &get_allocator() { return allocator_; }
|
||||
private:
|
||||
struct GetGcCandidateOp final {
|
||||
public:
|
||||
GetGcCandidateOp(ObIArray<ObTabletDirectLoadMgrKey> &candidate_mgrs) : candidate_mgrs_(candidate_mgrs) {}
|
||||
~GetGcCandidateOp() {}
|
||||
int operator() (common::hash::HashMapPair<ObTabletDirectLoadMgrKey, ObTabletDirectLoadMgr *> &kv);
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(GetGcCandidateOp);
|
||||
ObIArray<ObTabletDirectLoadMgrKey> &candidate_mgrs_;
|
||||
};
|
||||
|
||||
int try_create_tablet_direct_load_mgr(
|
||||
const int64_t context_id,
|
||||
const int64_t execution_id,
|
||||
|
@ -410,6 +410,7 @@ int ObLSDDLLogHandler::flush(SCN &rec_scn)
|
||||
}
|
||||
}
|
||||
}
|
||||
(void)tenant_direct_load_mgr->gc_tablet_direct_load(*ls_);
|
||||
}
|
||||
}
|
||||
return OB_SUCCESS;
|
||||
|
Reference in New Issue
Block a user