[CP] Revert Fix ls offline fail for leak transaction context

This commit is contained in:
obdev 2024-02-10 06:22:42 +00:00 committed by ob-robot
parent f12942adec
commit c93cbf8c74
3 changed files with 49 additions and 88 deletions

View File

@ -774,15 +774,15 @@ int ObLSTxService::offline()
int ret = OB_SUCCESS;
const int64_t PRINT_LOG_INTERVAL = 1000 * 1000; // 1s
const bool graceful = false;
bool is_all_tx_clean_up = false;
bool unused_is_all_tx_clean_up = false;
if (OB_ISNULL(mgr_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", KR(ret), K_(ls_id));
} else if (OB_FAIL(mgr_->block_all(is_all_tx_clean_up))) {
} else if (OB_FAIL(mgr_->block_all(unused_is_all_tx_clean_up))) {
TRANS_LOG(WARN, "block all failed", K_(ls_id));
} else if (OB_FAIL(mgr_->kill_all_tx(graceful, is_all_tx_clean_up))) {
} else if (OB_FAIL(mgr_->kill_all_tx(graceful, unused_is_all_tx_clean_up))) {
TRANS_LOG(WARN, "kill_all_tx failed", K_(ls_id));
} else if (!is_all_tx_clean_up) {
} else if (mgr_->get_tx_ctx_count() > 0) {
ret = OB_EAGAIN;
if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL)) {
TRANS_LOG(WARN, "transaction not empty, try again", K(ret), KP(mgr_), K_(ls_id), K(mgr_->get_tx_ctx_count()));

View File

@ -508,7 +508,7 @@ int ObLSTxCtxMgr::create_tx_ctx_(const ObTxCreateArg &arg,
ctx = static_cast<ObPartTransCtx *>(exist_ctx);
}
if (REACH_TIME_INTERVAL(OB_TRANS_STATISTICS_INTERVAL)) {
TRANS_LOG(INFO, "transaction statistics", K_(ls_id), KPC(this));
TRANS_LOG(INFO, "transaction statistics", K_(ls_id), "total_count", get_tx_ctx_count_());
}
return ret;
}
@ -584,7 +584,8 @@ int ObLSTxCtxMgr::get_tx_ctx_(const ObTransID &tx_id, const bool for_replay, ObP
TRANS_LOG(ERROR, "get transaction context error", KR(ret), K(tx_id));
}
if (REACH_TIME_INTERVAL(OB_TRANS_STATISTICS_INTERVAL)) {
TRANS_LOG(INFO, "transaction statistics", K_(ls_id), KPC(this));
TRANS_LOG(INFO, "transaction statistics", K_(ls_id),
"total_tx_ctx_count", get_tx_ctx_count_());
}
}
return ret;
@ -1017,7 +1018,7 @@ int ObLSTxCtxMgr::kill_all_tx(const bool graceful, bool &is_all_tx_cleaned_up)
} else if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
TRANS_LOG(WARN, "for each transaction context error", KR(ret), "manager", *this);
}
is_all_tx_cleaned_up = (active_tx_count_ == 0 && total_active_readonly_request_count_ == 0);
is_all_tx_cleaned_up = (get_tx_ctx_count_() == 0);
}
if (timeguard.get_diff() > 3 * 1000000) {
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "kill_all_tx use too much time", K(timeguard), "manager", *this);
@ -1038,7 +1039,7 @@ int ObLSTxCtxMgr::block_tx(bool &is_all_tx_cleaned_up)
} else if (OB_FAIL(state_helper.switch_state(Ops::BLOCK_TX))) {
TRANS_LOG(WARN, "switch state error", KR(ret), "manager", *this);
} else {
is_all_tx_cleaned_up = (active_tx_count_ == 0 && total_active_readonly_request_count_ == 0);
is_all_tx_cleaned_up = (get_tx_ctx_count() == 0);
}
TRANS_LOG(INFO, "block ls", K(ret), "manager", *this);
return ret;
@ -1055,7 +1056,7 @@ int ObLSTxCtxMgr::block_all(bool &is_all_tx_cleaned_up)
} else if (OB_FAIL(state_helper.switch_state(Ops::BLOCK_ALL))) {
TRANS_LOG(WARN, "switch state error", KR(ret), "manager", *this);
} else {
is_all_tx_cleaned_up = (active_tx_count_ == 0 && total_active_readonly_request_count_ == 0);
is_all_tx_cleaned_up = (get_tx_ctx_count() == 0);
}
TRANS_LOG(INFO, "block ls", K(ret), "manager", *this);
return ret;
@ -1070,7 +1071,7 @@ int ObLSTxCtxMgr::block_normal(bool &is_all_tx_cleaned_up)
if (OB_FAIL(state_helper.switch_state(Ops::BLOCK_NORMAL))) {
TRANS_LOG(WARN, "switch state error", KR(ret), "manager", *this);
} else {
is_all_tx_cleaned_up = (active_tx_count_ == 0 && total_active_readonly_request_count_ == 0);
is_all_tx_cleaned_up = (get_tx_ctx_count() == 0);
}
TRANS_LOG(INFO, "block ls normally", K(ret), "manager", *this);
return ret;
@ -1305,12 +1306,6 @@ int ObLSTxCtxMgr::revert_tx_ctx_without_lock(ObTransCtx *ctx)
if (OB_ISNULL(ctx)) {
TRANS_LOG(WARN, "invalid argument", KP(ctx));
ret = OB_INVALID_ARGUMENT;
} else if (this != ctx->get_ls_tx_ctx_mgr()) {
TRANS_LOG(ERROR, "unexpected ls tx ctx mgr", KPC(this),
"trans_id", ctx->get_trans_id());
if (NULL != ctx->get_ls_tx_ctx_mgr()) {
ctx->get_ls_tx_ctx_mgr()->ls_tx_ctx_map_.revert(ctx);
}
} else {
ls_tx_ctx_map_.revert(ctx);
}
@ -1670,7 +1665,7 @@ int ObTxCtxMgr::stop_ls_(const ObLSID &ls_id, const bool graceful)
if (OB_FAIL(ls_tx_ctx_mgr->stop(graceful))) {
TRANS_LOG(WARN, "stop ls error", KR(ret), K(ls_id));
} else {
TRANS_LOG(INFO, "stop ls success", K(ls_id), KPC(ls_tx_ctx_mgr));
TRANS_LOG(INFO, "stop ls success", K(ls_id), "ctx_count", ls_tx_ctx_mgr->get_tx_ctx_count());
}
revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr);
}
@ -1974,13 +1969,16 @@ int ObTxCtxMgr::revert_tx_ctx(ObPartTransCtx *ctx)
// and core dump may occur when printing ls_id
const ObTransID tx_id = ctx->get_trans_id();
const ObLSID ls_id = ctx->get_ls_id();
if (NULL == (ls_tx_ctx_mgr = ctx->get_ls_tx_ctx_mgr())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected erorr", K(ret), K(tx_id), K(ls_id));
} else if (OB_FAIL(ls_tx_ctx_mgr->revert_tx_ctx(ctx))) {
TRANS_LOG(WARN, "revert tx ctx error", KR(ret), K(tx_id), K(ls_id));
if (OB_FAIL(get_ls_tx_ctx_mgr(ls_id, ls_tx_ctx_mgr))) {
TRANS_LOG(WARN, "get ls_tx_ctx_mgr error", K(ls_id));
ret = OB_PARTITION_NOT_EXIST;
} else {
TRANS_LOG(DEBUG, "revert tx ctx success", K(ls_id), K(tx_id));
if (OB_FAIL(ls_tx_ctx_mgr->revert_tx_ctx(ctx))) {
TRANS_LOG(WARN, "revert tx ctx error", KR(ret), K(ls_id), "context", *ctx);
} else {
TRANS_LOG(DEBUG, "revert tx ctx success", K(ls_id), K(tx_id), K(ctx));
}
revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr);
}
}
@ -2005,7 +2003,7 @@ int ObTxCtxMgr::block_tx(const ObLSID &ls_id, bool &is_all_tx_cleaned_up)
if (OB_FAIL(ls_tx_ctx_mgr->block_tx(is_all_tx_cleaned_up))) {
TRANS_LOG(WARN, "block ls error", KR(ret), K(ls_id));
} else {
TRANS_LOG(INFO, "block ls success", K(ls_id), KPC(this));
TRANS_LOG(INFO, "block ls success", K(ls_id), "ctx_count", ls_tx_ctx_mgr->get_tx_ctx_count());
}
revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr);
}
@ -2030,7 +2028,7 @@ int ObTxCtxMgr::block_all(const ObLSID &ls_id, bool &is_all_tx_cleaned_up)
if (OB_FAIL(ls_tx_ctx_mgr->block_all(is_all_tx_cleaned_up))) {
TRANS_LOG(WARN, "block all error", KR(ret), K(ls_id));
} else {
TRANS_LOG(INFO, "block all success", K(ls_id), KPC(this));
TRANS_LOG(INFO, "block all success", K(ls_id), "ctx_count", ls_tx_ctx_mgr->get_tx_ctx_count());
}
revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr);
}
@ -2297,58 +2295,31 @@ int ObTxCtxMgr::create_ls(const int64_t tenant_id,
} else if (OB_UNLIKELY(!ls_id.is_valid())) {
TRANS_LOG(WARN, "invalid argument", K(ls_id));
ret = OB_INVALID_ARGUMENT;
} else if (OB_SUCC(get_ls_tx_ctx_mgr(ls_id, ls_tx_ctx_mgr)) && NULL != ls_tx_ctx_mgr) {
// remove ls_id transaction context from map
ls_tx_ctx_mgr->get_ls_log_adapter()->reset();
if (OB_FAIL(ls_tx_ctx_mgr_map_.del(ls_id, ls_tx_ctx_mgr))) {
TRANS_LOG(WARN, "remove ls error", KR(ret), K(ls_id));
} else {
ATOMIC_INC(&ls_release_cnt_);
TRANS_LOG(INFO, "remove orphan ls success ", KPC(ls_tx_ctx_mgr), K(ls_id),
"total_alloc", ls_alloc_cnt_,
"total_release", ls_release_cnt_);
}
if (ls_tx_ctx_mgr->get_tx_ctx_count() > 0) {
TRANS_LOG(ERROR, "some dead transacton context already active, memory leak !",
K(ls_id), K(tenant_id));
} else {
revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr);
}
} else if (OB_ENTRY_NOT_EXIST == ret) {
// overwrite retcode
ret = OB_SUCCESS;
} else if (OB_ISNULL(ls_tx_ctx_mgr = ObLSTxCtxMgrFactory::alloc(tenant_id))) {
TRANS_LOG(WARN, "alloc ls_id transaction context manager error", K(ls_id));
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_FAIL(ls_tx_ctx_mgr->init(tenant_id,
ls_id, tx_table, lock_table,
ts_mgr_, txs_, param, log_adapter))) {
TRANS_LOG(WARN, "ls_tx_ctx_mgr inited error", KR(ret), K(ls_id));
ObLSTxCtxMgrFactory::release(ls_tx_ctx_mgr);
ls_tx_ctx_mgr = NULL;
} else if (OB_FAIL(ls_tx_svr.init(ls_id, ls_tx_ctx_mgr, txs_))) {
TRANS_LOG(WARN, "ls tx service init failed", K(ret), K(ls_id));
ObLSTxCtxMgrFactory::release(ls_tx_ctx_mgr);
ls_tx_ctx_mgr = NULL;
} else if (OB_FAIL(ls_tx_ctx_mgr_map_.insert_and_get(ls_id, ls_tx_ctx_mgr, NULL))) {
TRANS_LOG(WARN, "ls_tx_ctx_mgr_map_ insert error", KR(ret), K(ls_id));
ObLSTxCtxMgrFactory::release(ls_tx_ctx_mgr);
ls_tx_ctx_mgr = NULL;
} else {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected error", K(ret), K(tenant_id), K(ls_id));
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(ls_tx_ctx_mgr = ObLSTxCtxMgrFactory::alloc(tenant_id))) {
TRANS_LOG(WARN, "alloc ls_id transaction context manager error", K(ls_id));
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_FAIL(ls_tx_ctx_mgr->init(tenant_id,
ls_id, tx_table, lock_table,
ts_mgr_, txs_, param, log_adapter))) {
TRANS_LOG(WARN, "ls_tx_ctx_mgr inited error", KR(ret), K(ls_id));
ObLSTxCtxMgrFactory::release(ls_tx_ctx_mgr);
ls_tx_ctx_mgr = NULL;
} else if (OB_FAIL(ls_tx_svr.init(ls_id, ls_tx_ctx_mgr, txs_))) {
TRANS_LOG(WARN, "ls tx service init failed", K(ret), K(ls_id));
ObLSTxCtxMgrFactory::release(ls_tx_ctx_mgr);
ls_tx_ctx_mgr = NULL;
} else if (OB_FAIL(ls_tx_ctx_mgr_map_.insert_and_get(ls_id, ls_tx_ctx_mgr, NULL))) {
TRANS_LOG(WARN, "ls_tx_ctx_mgr_map_ insert error", KR(ret), K(ls_id));
ObLSTxCtxMgrFactory::release(ls_tx_ctx_mgr);
ls_tx_ctx_mgr = NULL;
} else {
ATOMIC_INC(&ls_alloc_cnt_);
// need to revert the trans ctx ref explicitly
ls_tx_ctx_mgr_map_.revert(ls_tx_ctx_mgr);
TRANS_LOG(INFO, "create ls success", K(tenant_id),
"total_alloc", ls_alloc_cnt_,
"total_release", ls_release_cnt_,
K(ls_id), KP(ls_tx_ctx_mgr));
}
ATOMIC_INC(&ls_alloc_cnt_);
// need to revert the trans ctx ref explicitly
ls_tx_ctx_mgr_map_.revert(ls_tx_ctx_mgr);
TRANS_LOG(INFO, "create ls success", K(tenant_id),
"total_alloc", ls_alloc_cnt_,
"total_release", ls_release_cnt_,
K(ls_id), KP(ls_tx_ctx_mgr));
}
return ret;
@ -2397,17 +2368,8 @@ int ObTxCtxMgr::remove_ls(const ObLSID &ls_id, const bool graceful)
if (OB_FAIL(ls_tx_ctx_mgr->kill_all_tx(arg.graceful_, is_all_trans_cleaned_up))) {
TRANS_LOG(WARN, "kill all transaction context error", KR(ret), K(arg));
} else if (!is_all_trans_cleaned_up) {
if (0 == ls_tx_ctx_mgr->get_total_active_readonly_request_count()
&& 0 == ls_tx_ctx_mgr->get_active_tx_count()) {
if (ls_tx_ctx_mgr->get_tx_ctx_count() != 0) {
TRANS_LOG(ERROR, "maybe some context memory not free, please attention", K(ls_id), KPC(ls_tx_ctx_mgr));
}
need_retry = false;
ret = OB_EAGAIN;
} else {
const bool verbose = true;
ls_tx_ctx_mgr->print_all_tx_ctx(ObLSTxCtxMgr::MAX_HASH_ITEM_PRINT, verbose);
}
const bool verbose = true;
ls_tx_ctx_mgr->print_all_tx_ctx(ObLSTxCtxMgr::MAX_HASH_ITEM_PRINT, verbose);
} else {
need_retry = false;
}

View File

@ -580,7 +580,6 @@ public:
State::state_str(state_),
K_(total_tx_ctx_count),
K_(active_tx_count),
K_(total_active_readonly_request_count),
K_(ls_retain_ctx_mgr),
K_(aggre_rec_scn),
K_(prev_aggre_rec_scn),