replace get_ls_ctx_mgr with get_ls in acquire_local_snapshot
This commit is contained in:
@ -2340,6 +2340,7 @@ int ObTxCtxMgr::remove_ls(const ObLSID &ls_id, const bool graceful)
|
|||||||
// if ls_id has been removed, OB_SUCCESS is returned.
|
// if ls_id has been removed, OB_SUCCESS is returned.
|
||||||
if (OB_SUCC(get_ls_tx_ctx_mgr(ls_id, ls_tx_ctx_mgr))) {
|
if (OB_SUCC(get_ls_tx_ctx_mgr(ls_id, ls_tx_ctx_mgr))) {
|
||||||
// remove ls_id transaction context from map
|
// remove ls_id transaction context from map
|
||||||
|
ls_tx_ctx_mgr->get_ls_log_adapter()->reset();
|
||||||
if (OB_FAIL(ls_tx_ctx_mgr_map_.del(ls_id, ls_tx_ctx_mgr))) {
|
if (OB_FAIL(ls_tx_ctx_mgr_map_.del(ls_id, ls_tx_ctx_mgr))) {
|
||||||
TRANS_LOG(WARN, "remove ls error", KR(ret), K(ls_id));
|
TRANS_LOG(WARN, "remove ls error", KR(ret), K(ls_id));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -1576,7 +1576,7 @@ int ObTransService::abort_participants_(const ObTxDesc &tx_desc)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_id,
|
OB_NOINLINE int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_id,
|
||||||
SCN &snapshot,
|
SCN &snapshot,
|
||||||
const bool is_read_only,
|
const bool is_read_only,
|
||||||
bool &acquire_from_follower)
|
bool &acquire_from_follower)
|
||||||
@ -1587,31 +1587,45 @@ int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_id,
|
|||||||
int64_t committing_dup_trx_cnt = 0;
|
int64_t committing_dup_trx_cnt = 0;
|
||||||
int dup_trx_status = OB_SUCCESS;
|
int dup_trx_status = OB_SUCCESS;
|
||||||
bool leader = false;
|
bool leader = false;
|
||||||
|
bool is_leader_serving = false;
|
||||||
SCN snapshot0;
|
SCN snapshot0;
|
||||||
SCN snapshot1;
|
SCN snapshot1;
|
||||||
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
|
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
|
||||||
const bool can_elr = MTL_IS_PRIMARY_TENANT() ? true : false;
|
const bool can_elr = MTL_IS_PRIMARY_TENANT() ? true : false;
|
||||||
if (OB_FAIL(tx_ctx_mgr_.get_ls_tx_ctx_mgr(ls_id, ls_tx_ctx_mgr))) {
|
ObLSHandle ls_handle;
|
||||||
TRANS_LOG(WARN, "get ls_tx_ctx_mgr fail", K(ret), K(ls_id));
|
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::TRANS_MOD))) {
|
||||||
} else if (OB_FAIL(ls_tx_ctx_mgr->get_ls_log_adapter()->get_role(leader, epoch))) {
|
TRANS_LOG(WARN, "get ls fail", K(ret), K(ls_id));
|
||||||
|
} else if (!ls_handle.is_valid() || OB_ISNULL(ls_handle.get_ls())) {
|
||||||
|
ret = OB_NOT_MASTER;
|
||||||
|
TRANS_LOG(WARN, "invalid ls, acquire gts for snapshot", K(ret), K(ls_id), K(ls_handle));
|
||||||
|
} else if (OB_FAIL(ls_handle.get_ls()->get_tx_svr()->get_tx_ls_log_adapter()->get_role(leader,
|
||||||
|
epoch))) {
|
||||||
TRANS_LOG(WARN, "get replica role fail", K(ret), K(ls_id));
|
TRANS_LOG(WARN, "get replica role fail", K(ret), K(ls_id));
|
||||||
} else if (!leader) {
|
} else if (!leader) {
|
||||||
ret = OB_NOT_MASTER;
|
ret = OB_NOT_MASTER;
|
||||||
} else if (!ls_tx_ctx_mgr->in_leader_serving_state()) {
|
} else if (OB_FAIL(ls_handle.get_ls()->get_tx_svr()->check_in_leader_serving_state(
|
||||||
|
is_leader_serving))) {
|
||||||
ret = OB_NOT_MASTER;
|
ret = OB_NOT_MASTER;
|
||||||
// XXX In standby cluster mode, the failure to call acquire_local_snapshot_ is an
|
// XXX In standby cluster mode, the failure to call acquire_local_snapshot_ is an
|
||||||
// normal situation, no error log needs to be printed
|
// normal situation, no error log needs to be printed
|
||||||
// TRANS_LOG(WARN, "check ls tx service leader serving state fail", K(ret), K(ls_id), K(ret));
|
// TRANS_LOG(WARN, "check ls tx service leader serving state fail", K(ret), K(ls_id), K(ret));
|
||||||
|
} else if (!is_leader_serving) {
|
||||||
|
ret = OB_NOT_MASTER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_NOT_MASTER == ret && is_read_only) {
|
if (OB_NOT_MASTER == ret && is_read_only && ls_handle.is_valid()
|
||||||
|
&& OB_NOT_NULL(ls_handle.get_ls())) {
|
||||||
dup_trx_status =
|
dup_trx_status =
|
||||||
ls_tx_ctx_mgr->get_ls_log_adapter()->get_committing_dup_trx_cnt(committing_dup_trx_cnt);
|
ls_handle.get_ls()->get_tx_svr()->get_tx_ls_log_adapter()->get_committing_dup_trx_cnt(committing_dup_trx_cnt);
|
||||||
if (!MTL_IS_PRIMARY_TENANT()) {
|
if (!MTL_IS_PRIMARY_TENANT()) {
|
||||||
ret = OB_NOT_MASTER;
|
ret = OB_NOT_MASTER;
|
||||||
TRANS_LOG(DEBUG, "the max_commmit_ts can not be used as a snapshot in standby tenant ", K(ret),
|
TRANS_LOG(DEBUG, "the max_commmit_ts can not be used as a snapshot in standby tenant ",
|
||||||
K(ls_id), K(snapshot), K(MTL_IS_PRIMARY_TENANT()), K(committing_dup_trx_cnt));
|
K(ret), K(ls_id), K(snapshot), K(MTL_IS_PRIMARY_TENANT()),
|
||||||
} else if (!ls_tx_ctx_mgr->get_ls_log_adapter()->is_dup_table_lease_valid()) {
|
K(committing_dup_trx_cnt));
|
||||||
|
} else if (!ls_handle.get_ls()
|
||||||
|
->get_tx_svr()
|
||||||
|
->get_tx_ls_log_adapter()
|
||||||
|
->is_dup_table_lease_valid()) {
|
||||||
ret = OB_NOT_MASTER;
|
ret = OB_NOT_MASTER;
|
||||||
} else if (committing_dup_trx_cnt > 0 || OB_SUCCESS != dup_trx_status) {
|
} else if (committing_dup_trx_cnt > 0 || OB_SUCCESS != dup_trx_status) {
|
||||||
ret = OB_NOT_MASTER;
|
ret = OB_NOT_MASTER;
|
||||||
@ -1645,8 +1659,8 @@ int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_id,
|
|||||||
//
|
//
|
||||||
}
|
}
|
||||||
|
|
||||||
if(OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
//do nothing
|
// do nothing
|
||||||
} else if (FALSE_IT(snapshot0 = tx_version_mgr_.get_max_commit_ts(can_elr))) {
|
} else if (FALSE_IT(snapshot0 = tx_version_mgr_.get_max_commit_ts(can_elr))) {
|
||||||
} else if (!snapshot0.is_valid_and_not_min()) {
|
} else if (!snapshot0.is_valid_and_not_min()) {
|
||||||
ret = OB_EAGAIN;
|
ret = OB_EAGAIN;
|
||||||
@ -1654,9 +1668,6 @@ int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_id,
|
|||||||
} else {
|
} else {
|
||||||
snapshot = SCN::max(snapshot0, snapshot1);
|
snapshot = SCN::max(snapshot0, snapshot1);
|
||||||
}
|
}
|
||||||
if (OB_NOT_NULL(ls_tx_ctx_mgr)) {
|
|
||||||
tx_ctx_mgr_.revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (acquire_from_follower) {
|
if (acquire_from_follower) {
|
||||||
TRANS_LOG(INFO, "acquire local snapshot from a dup ls follower", K(ret), K(leader), K(epoch),
|
TRANS_LOG(INFO, "acquire local snapshot from a dup ls follower", K(ret), K(leader), K(epoch),
|
||||||
|
|||||||
@ -94,6 +94,13 @@ int ObITxLogAdapter::remove_commiting_dup_trx(const ObTransID &tx_id)
|
|||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ObLSTxLogAdapter::reset()
|
||||||
|
{
|
||||||
|
log_handler_ = nullptr;
|
||||||
|
dup_table_ls_handler_ = nullptr;
|
||||||
|
tx_table_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
int ObLSTxLogAdapter::init(ObITxLogParam *param, ObTxTable *tx_table)
|
int ObLSTxLogAdapter::init(ObITxLogParam *param, ObTxTable *tx_table)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
@ -56,6 +56,9 @@ private:
|
|||||||
class ObITxLogAdapter
|
class ObITxLogAdapter
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
|
virtual void reset() {}
|
||||||
|
|
||||||
virtual int submit_log(const char *buf,
|
virtual int submit_log(const char *buf,
|
||||||
const int64_t size,
|
const int64_t size,
|
||||||
const share::SCN &base_ts,
|
const share::SCN &base_ts,
|
||||||
@ -98,6 +101,8 @@ class ObLSTxLogAdapter : public ObITxLogAdapter
|
|||||||
public:
|
public:
|
||||||
ObLSTxLogAdapter() : log_handler_(nullptr), dup_table_ls_handler_(nullptr), tx_table_(nullptr) {}
|
ObLSTxLogAdapter() : log_handler_(nullptr), dup_table_ls_handler_(nullptr), tx_table_(nullptr) {}
|
||||||
|
|
||||||
|
void reset();
|
||||||
|
|
||||||
int init(ObITxLogParam *param, ObTxTable *tx_table);
|
int init(ObITxLogParam *param, ObTxTable *tx_table);
|
||||||
int submit_log(const char *buf,
|
int submit_log(const char *buf,
|
||||||
const int64_t size,
|
const int64_t size,
|
||||||
|
|||||||
@ -48,6 +48,17 @@ private:
|
|||||||
ObTxNode *n_;
|
ObTxNode *n_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
OB_NOINLINE int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_id,
|
||||||
|
SCN &snapshot,
|
||||||
|
const bool is_read_only,
|
||||||
|
bool &acquire_from_follower)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
snapshot = tx_version_mgr_.get_max_commit_ts(false);
|
||||||
|
acquire_from_follower = false;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
class ObTestRegisterMDS : public ::testing::Test
|
class ObTestRegisterMDS : public ::testing::Test
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|||||||
Reference in New Issue
Block a user