diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index 02b6c340f4..a8057c0199 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -127,19 +127,22 @@ int ObTransService::remove_ls(const share::ObLSID &ls_id, const bool graceful) #ifdef CHECK_TX_PARTS_CONTAIN_ #error "redefine CHECK_TX_PARTS_CONTAIN_" #else -#define CHECK_TX_PARTS_CONTAIN_(parts, id, epoch, ls_id, exist) \ - if (OB_SUCC(ret)) { \ - exist = false; \ - ARRAY_FOREACH_NORET(parts, idx) { \ - if (parts.at(idx).id == ls_id) { \ - if (ObTxPart::is_without_ctx(parts.at(idx).epoch)) { \ - /* target LS was dropped */ \ - /* can not accept access any more */ \ - ret = OB_PARTITION_IS_BLOCKED; \ - } else { exist = true; } \ - break; \ - } \ - } \ +#define CHECK_TX_PARTS_CONTAIN_(parts, id, epoch, ls_id, ret_epoch, exist) \ + if (OB_SUCC(ret)) { \ + exist = false; \ + ARRAY_FOREACH_NORET(parts, idx) { \ + if (parts.at(idx).id == ls_id) { \ + exist = true; \ + if (ObTxPart::is_without_ctx(parts.at(idx).epoch)) { \ + /* target LS was dropped */ \ + /* can not accept access any more */ \ + ret = OB_PARTITION_IS_BLOCKED; \ + } else { \ + ret_epoch = parts.at(idx).epoch; \ + } \ + break; \ + } \ + } \ } #endif @@ -999,7 +1002,8 @@ int ObTransService::get_read_store_ctx(const ObTxReadSnapshot &snapshot, if (OB_SUCC(ret) && snap_tx_id.is_valid()) { // inner tx read, we verify txCtx's status bool exist = false; - CHECK_TX_PARTS_CONTAIN_(snapshot.parts_, left_, right_, ls_id, exist); + int64_t part_epoch = 0; + CHECK_TX_PARTS_CONTAIN_(snapshot.parts_, left_, right_, ls_id, part_epoch, exist); if (OB_SUCC(ret) && (exist || read_latest)) { if (OB_FAIL(get_tx_ctx_(ls_id, store_ctx.ls_, snap_tx_id, tx_ctx))) { if (OB_TRANS_CTX_NOT_EXIST == ret && !exist) { @@ -1011,6 +1015,10 @@ int ObTransService::get_read_store_ctx(const ObTxReadSnapshot &snapshot, TRANS_LOG(WARN, "get tx ctx fail", K(ret), K(store_ctx), K(snapshot), K(ls_id), K(exist), K(read_latest)); } + } else if (exist && tx_ctx->epoch_ != part_epoch) { + ret = OB_TRANS_CTX_NOT_EXIST; + TRANS_LOG(WARN, "exist txCtx epoch mismatch within snapshot", K(ret), + K(part_epoch), K(tx_ctx->epoch_), K(ls_id), KPC(tx_ctx), K(snapshot)); } else if (OB_FAIL(tx_ctx->check_status())) { TRANS_LOG(WARN, "check status fail", K(ret), K(store_ctx), KPC(tx_ctx)); } else { @@ -1196,7 +1204,8 @@ int ObTransService::acquire_tx_ctx(const share::ObLSID &ls_id, const ObTxDesc &t { int ret = OB_SUCCESS; bool exist = false; - CHECK_TX_PARTS_CONTAIN_(tx.parts_, id_, epoch_, ls_id, exist); + int64_t part_epoch = 0; + CHECK_TX_PARTS_CONTAIN_(tx.parts_, id_, epoch_, ls_id, part_epoch, exist); if (OB_FAIL(ret)) { } else if (exist) { if (OB_FAIL(get_tx_ctx_(ls_id, ls, tx.tx_id_, ctx))) { @@ -1204,6 +1213,12 @@ int ObTransService::acquire_tx_ctx(const share::ObLSID &ls_id, const ObTxDesc &t if (ret == OB_TRANS_CTX_NOT_EXIST) { TRANS_LOG(WARN, "participant lost update", K(ls_id), K_(tx.tx_id)); } + } else if (ctx->epoch_ != part_epoch) { + ret = OB_TRANS_CTX_NOT_EXIST; + TRANS_LOG(WARN, "exist txCtx epoch mismatch within txDesc", K(ret), + K(part_epoch), K(ctx->epoch_), K(ls_id), K(ctx), K(tx)); + revert_tx_ctx_(ls, ctx); + ctx = NULL; } } else if (OB_FAIL(create_tx_ctx_(ls_id, ls, tx, ctx, special))) { TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(ls_id), K(tx), K(special)); @@ -2139,32 +2154,19 @@ int ObTransService::check_ls_status(const share::ObLSID &ls_id){ int ObTransService::check_ls_status_(const share::ObLSID &ls_id, bool &leader) { int ret = OB_SUCCESS; - ObLSService *ls_svr = MTL(ObLSService *); - common::ObRole role = common::ObRole::INVALID_ROLE; - storage::ObLSHandle handle; - ObLS *ls = nullptr; - int64_t UNUSED = 0; - - if (OB_ISNULL(ls_svr)) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "log stream service is NULL", K(ret)); - } else if (OB_FAIL(ls_svr->get_ls(ls_id, handle, ObLSGetMod::TRANS_MOD))) { + int64_t epoch = 0; + ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; + if (OB_FAIL(tx_ctx_mgr_.get_ls_tx_ctx_mgr(ls_id, ls_tx_ctx_mgr))) { TRANS_LOG(WARN, "get id service log stream failed"); - } else if (OB_ISNULL(ls = handle.get_ls())) { + } else if (OB_ISNULL(ls_tx_ctx_mgr)) { ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "id service log stream not exist"); - } else if (OB_FAIL(ls->get_log_handler()->get_role(role, UNUSED))) { - if (OB_NOT_RUNNING == ret) { - ret = OB_LS_NOT_EXIST; - } else { - TRANS_LOG(WARN, "get ls role fail", K(ret)); - } - } else if (common::ObRole::LEADER == role) { - leader = true; - } else { - leader = false; + TRANS_LOG(WARN, "ls ctx mgr is null", K(ls_id), KPC(this)); + } else if (OB_FAIL(ls_tx_ctx_mgr->get_ls_log_adapter()->get_role(leader, epoch))) { + TRANS_LOG(WARN, "get ls role fail", K(ret)); + } + if (ls_tx_ctx_mgr) { + tx_ctx_mgr_.revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr); } - return ret; } diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 81eaad30fb..c12086155b 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1585,7 +1585,13 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id, int ret = OB_SUCCESS; int64_t retry_cnt = 0; ObPartTransCtx *ctx = NULL; - if (OB_FAIL(get_tx_ctx_(ls, tx_id, ctx))) { + bool is_leader = false; + // check ls is leader, to fast fail when lease expired but role change has not armed + if (OB_FAIL(check_ls_status_(ls, is_leader))) { + TRANS_LOG(WARN, "check ls leader failed", K(ret), K(ls)); + } else if (!is_leader) { + ret = OB_NOT_MASTER; + } else if (OB_FAIL(get_tx_ctx_(ls, tx_id, ctx))) { if (OB_NOT_MASTER == ret) { } else if (OB_TRANS_CTX_NOT_EXIST == ret && verify_epoch <= 0 && !for_transfer) { int tx_state = ObTxData::RUNNING;