fix direct load mgr leak
This commit is contained in:
@ -155,7 +155,7 @@ void ObDDLStartClogCb::try_release()
|
|||||||
|
|
||||||
ObDDLMacroBlockClogCb::ObDDLMacroBlockClogCb()
|
ObDDLMacroBlockClogCb::ObDDLMacroBlockClogCb()
|
||||||
: is_inited_(false), status_(), ls_id_(), redo_info_(), macro_block_id_(),
|
: is_inited_(false), status_(), ls_id_(), redo_info_(), macro_block_id_(),
|
||||||
data_buffer_lock_(), is_data_buffer_freed_(false), direct_load_mgr_handle_()
|
data_buffer_lock_(), is_data_buffer_freed_(false)
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -175,30 +175,12 @@ int ObDDLMacroBlockClogCb::init(const share::ObLSID &ls_id,
|
|||||||
ObTabletHandle &tablet_handle)
|
ObTabletHandle &tablet_handle)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *);
|
|
||||||
direct_load_mgr_handle_.reset();
|
|
||||||
bool is_major_sstable_exist = false;
|
|
||||||
if (OB_UNLIKELY(is_inited_)) {
|
if (OB_UNLIKELY(is_inited_)) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
LOG_WARN("init twice", K(ret));
|
LOG_WARN("init twice", K(ret));
|
||||||
} else if (OB_UNLIKELY(!ls_id.is_valid() || !redo_info.is_valid() || !macro_block_id.is_valid())) {
|
} else if (OB_UNLIKELY(!ls_id.is_valid() || !redo_info.is_valid() || !macro_block_id.is_valid())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ret), K(ls_id), K(redo_info), K(macro_block_id));
|
LOG_WARN("invalid argument", K(ret), K(ls_id), K(redo_info), K(macro_block_id));
|
||||||
} else if (OB_ISNULL(tenant_direct_load_mgr)) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("unexpected err", K(ret), K(MTL_ID()));
|
|
||||||
} else if (OB_FAIL(tenant_direct_load_mgr->get_tablet_mgr_and_check_major(
|
|
||||||
ls_id,
|
|
||||||
redo_info.table_key_.tablet_id_,
|
|
||||||
true/* is_full_direct_load */,
|
|
||||||
direct_load_mgr_handle_,
|
|
||||||
is_major_sstable_exist))) {
|
|
||||||
if (OB_ENTRY_NOT_EXIST == ret && is_major_sstable_exist) {
|
|
||||||
ret = OB_TASK_EXPIRED;
|
|
||||||
LOG_INFO("major sstable already exist", K(ret), K(redo_info_));
|
|
||||||
} else {
|
|
||||||
LOG_WARN("get tablet mgr failed", K(ret), K(redo_info_));
|
|
||||||
}
|
|
||||||
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.inc_ref(macro_block_id))) {
|
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.inc_ref(macro_block_id))) {
|
||||||
LOG_WARN("inc reference count failed", K(ret), K(macro_block_id));
|
LOG_WARN("inc reference count failed", K(ret), K(macro_block_id));
|
||||||
} else {
|
} else {
|
||||||
@ -225,11 +207,28 @@ void ObDDLMacroBlockClogCb::try_release()
|
|||||||
int ObDDLMacroBlockClogCb::on_success()
|
int ObDDLMacroBlockClogCb::on_success()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
bool is_major_sstable_exist = false;
|
||||||
|
ObTabletDirectLoadMgrHandle direct_load_mgr_handle;
|
||||||
|
ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *);
|
||||||
|
if (OB_ISNULL(tenant_direct_load_mgr)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("unexpected err", K(ret), K(MTL_ID()));
|
||||||
|
} else if (OB_FAIL(tenant_direct_load_mgr->get_tablet_mgr_and_check_major(
|
||||||
|
ls_id_, redo_info_.table_key_.tablet_id_, true/*is_full_direct_load*/, direct_load_mgr_handle, is_major_sstable_exist))) {
|
||||||
|
if (OB_ENTRY_NOT_EXIST == ret && is_major_sstable_exist) {
|
||||||
|
ret = OB_TASK_EXPIRED;
|
||||||
|
LOG_INFO("major sstable already exist", K(ret), K(redo_info_));
|
||||||
|
} else {
|
||||||
|
LOG_WARN("get tablet mgr failed", K(ret), K(redo_info_));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ObDDLMacroBlock macro_block;
|
ObDDLMacroBlock macro_block;
|
||||||
{
|
{
|
||||||
ObSpinLockGuard data_buffer_guard(data_buffer_lock_);
|
ObSpinLockGuard data_buffer_guard(data_buffer_lock_);
|
||||||
if (is_data_buffer_freed_) {
|
if (is_data_buffer_freed_) {
|
||||||
LOG_INFO("data buffer is freed, do not need to callback");
|
LOG_INFO("data buffer is freed, do not need to callback");
|
||||||
|
} else if (OB_FAIL(ret)) {
|
||||||
} else if (OB_FAIL(macro_block.block_handle_.set_block_id(macro_block_id_))) {
|
} else if (OB_FAIL(macro_block.block_handle_.set_block_id(macro_block_id_))) {
|
||||||
LOG_WARN("set macro block id failed", K(ret), K(macro_block_id_));
|
LOG_WARN("set macro block id failed", K(ret), K(macro_block_id_));
|
||||||
} else {
|
} else {
|
||||||
@ -244,7 +243,7 @@ int ObDDLMacroBlockClogCb::on_success()
|
|||||||
const int64_t snapshot_version = redo_info_.table_key_.get_snapshot_version();
|
const int64_t snapshot_version = redo_info_.table_key_.get_snapshot_version();
|
||||||
const uint64_t data_format_version = redo_info_.data_format_version_;
|
const uint64_t data_format_version = redo_info_.data_format_version_;
|
||||||
if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle_.get_obj(), macro_block,
|
if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle_.get_obj(), macro_block,
|
||||||
snapshot_version, data_format_version, direct_load_mgr_handle_))) {
|
snapshot_version, data_format_version, direct_load_mgr_handle))) {
|
||||||
LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle_), K(macro_block),
|
LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle_), K(macro_block),
|
||||||
K(snapshot_version), K(data_format_version));
|
K(snapshot_version), K(data_format_version));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -132,7 +132,6 @@ private:
|
|||||||
blocksstable::MacroBlockId macro_block_id_;
|
blocksstable::MacroBlockId macro_block_id_;
|
||||||
ObSpinLock data_buffer_lock_;
|
ObSpinLock data_buffer_lock_;
|
||||||
bool is_data_buffer_freed_;
|
bool is_data_buffer_freed_;
|
||||||
ObTabletDirectLoadMgrHandle direct_load_mgr_handle_;
|
|
||||||
ObTabletHandle tablet_handle_;
|
ObTabletHandle tablet_handle_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -147,9 +147,9 @@ int ObDDLKVHandle::set_obj(ObDDLKV *ddl_kv)
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ret), KP(ddl_kv));
|
LOG_WARN("invalid argument", K(ret), KP(ddl_kv));
|
||||||
} else {
|
} else {
|
||||||
|
ddl_kv->inc_ref();
|
||||||
reset();
|
reset();
|
||||||
ddl_kv_ = ddl_kv;
|
ddl_kv_ = ddl_kv;
|
||||||
ddl_kv_->inc_ref();
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -264,12 +264,12 @@ ObTabletDirectLoadMgrHandle::~ObTabletDirectLoadMgrHandle()
|
|||||||
int ObTabletDirectLoadMgrHandle::set_obj(ObTabletDirectLoadMgr *mgr)
|
int ObTabletDirectLoadMgrHandle::set_obj(ObTabletDirectLoadMgr *mgr)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
reset();
|
|
||||||
if (OB_ISNULL(mgr)) {
|
if (OB_ISNULL(mgr)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid arg", K(ret));
|
LOG_WARN("invalid arg", K(ret));
|
||||||
} else {
|
} else {
|
||||||
mgr->inc_ref();
|
mgr->inc_ref();
|
||||||
|
reset();
|
||||||
tablet_mgr_ = mgr;
|
tablet_mgr_ = mgr;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
Reference in New Issue
Block a user