[master] query tx committed need ensure gts elapsed commit version
This commit is contained in:
@ -461,6 +461,8 @@ int ObLS::start()
|
||||
LOG_WARN("ls is not inited", K(ret));
|
||||
} else if (OB_FAIL(tx_table_.start())) {
|
||||
LOG_WARN("tx table start failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(ls_tx_svr_.set_max_replay_commit_version(ls_meta_.get_clog_checkpoint_scn()))) {
|
||||
LOG_WARN("set max replay commit scn fail", K(ret), K(ls_meta_.get_clog_checkpoint_scn()));
|
||||
} else {
|
||||
checkpoint_executor_.start();
|
||||
LOG_INFO("start_ls finish", KR(ret), KPC(this));
|
||||
@ -851,6 +853,8 @@ int ObLS::online_tx_()
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ls_tx_svr_.online())) {
|
||||
LOG_WARN("ls tx service online failed", K(ret), K(ls_meta_));
|
||||
} else if (OB_FAIL(ls_tx_svr_.set_max_replay_commit_version(ls_meta_.get_clog_checkpoint_scn()))) {
|
||||
LOG_WARN("set max replay commit scn fail", K(ret), K(ls_meta_.get_clog_checkpoint_scn()));
|
||||
} else if (OB_FAIL(tx_table_.online())) {
|
||||
LOG_WARN("tx table online failed", K(ret), K(ls_meta_));
|
||||
}
|
||||
|
||||
@ -632,6 +632,18 @@ int ObLSTxService::online()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTxService::set_max_replay_commit_version(share::SCN commit_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(mgr_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", KR(ret), K_(ls_id));
|
||||
} else {
|
||||
mgr_->update_max_replay_commit_version(commit_version);
|
||||
TRANS_LOG(INFO, "succ set max_replay_commit_version", K(commit_version));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
} // transaction
|
||||
|
||||
} // oceanbase
|
||||
|
||||
@ -165,7 +165,7 @@ public:
|
||||
int traversal_flush();
|
||||
virtual share::SCN get_ls_weak_read_ts();
|
||||
int check_in_leader_serving_state(bool& bool_ret);
|
||||
|
||||
int set_max_replay_commit_version(share::SCN commit_version);
|
||||
transaction::ObTxRetainCtxMgr *get_retain_ctx_mgr();
|
||||
private:
|
||||
void reset_();
|
||||
|
||||
@ -1786,7 +1786,22 @@ int ObTransService::local_ls_commit_tx_(const ObTransID &tx_id,
|
||||
} else {
|
||||
switch (tx_state) {
|
||||
case ObTxData::COMMIT:
|
||||
ret = OB_TRANS_COMMITED;
|
||||
{
|
||||
ObLSTxCtxMgr *ls_tx_mgr = NULL;
|
||||
if (OB_FAIL(tx_ctx_mgr_.get_ls_tx_ctx_mgr(coord, ls_tx_mgr))) {
|
||||
TRANS_LOG(WARN, "can not get ls_tx_mgr", K(ret), "ls_id", coord);
|
||||
} else if (OB_ISNULL(ls_tx_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "ls_tx_mgr is NULL", K(ret), "ls_id", coord);
|
||||
} else if (ls_tx_mgr->in_leader_serving_state()) {
|
||||
ret = OB_TRANS_COMMITED;
|
||||
} else {
|
||||
ret = OB_NOT_MASTER;
|
||||
}
|
||||
if (OB_NOT_NULL(ls_tx_mgr)) {
|
||||
tx_ctx_mgr_.revert_ls_tx_ctx_mgr(ls_tx_mgr);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case ObTxData::ABORT:
|
||||
ret = OB_TRANS_KILLED;
|
||||
|
||||
Reference in New Issue
Block a user