diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index a42cf6b935..6e41cd64f5 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1108,7 +1108,6 @@ int ObTransService::create_tx_ctx_(const share::ObLSID &ls_id, { int ret = OB_SUCCESS; bool existed = false; - int64_t epoch = 0; ObTxCreateArg arg(tx.can_elr_, /* can_elr */ false, /* for_replay */ tx.tenant_id_, @@ -1379,7 +1378,6 @@ int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_id, int64_t &snapshot) { int ret = OB_SUCCESS; - int64_t epoch = 0; bool leader = false; int64_t snapshot0 = 0; ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; @@ -1485,19 +1483,14 @@ int ObTransService::batch_post_tx_msg_(ObTxRollbackSPMsg &msg, { int ret = OB_SUCCESS; int last_ret = OB_SUCCESS; - const ObTxDesc *tx_ptr = msg.tx_ptr_; ARRAY_FOREACH_NORET(list, idx) { auto &p = list.at(idx); msg.receiver_ = p.left_; msg.epoch_ = p.right_; - if (msg.epoch_ > 0) { - msg.tx_ptr_ = NULL; - } if (OB_FAIL(rpc_->post_msg(p.left_, msg))) { TRANS_LOG(WARN, "post msg falied", K(ret), K(msg), K(p)); last_ret = ret; } - msg.tx_ptr_ = tx_ptr; } return last_ret; } @@ -1682,22 +1675,45 @@ int ObTransService::handle_trans_abort_request(ObTxAbortMsg &abort_req, ObTransR return ret; } +int ObTransService::create_tx_ctx_(ObTxRollbackSPMsg &msg, ObPartTransCtx *&ctx) +{ + int ret = OB_SUCCESS; + bool existed = false; + ObTxCreateArg arg(msg.can_elr_, /* can_elr */ + false, /* for_replay */ + msg.tenant_id_, + msg.tx_id_, + msg.receiver_, + msg.cluster_id_, + msg.cluster_version_, + msg.session_id_, /*session_id*/ + msg.tx_addr_, + msg.tx_expire_ts_, + this); + ret = tx_ctx_mgr_.create_tx_ctx(arg, existed, ctx); + if (OB_FAIL(ret)) { + TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(msg), K(arg)); + ctx = NULL; + } + TRANS_LOG(TRACE, "create tx ctx for savepoint rollback", K(ret), K(msg), K(arg)); + return ret; +} + int ObTransService::handle_sp_rollback_request(ObTxRollbackSPMsg &msg, obrpc::ObTxRpcRollbackSPResult &result) { int ret = OB_SUCCESS; int64_t ctx_born_epoch = -1; + ObFunction create_tx_ctx_func = [this, &msg](ObPartTransCtx *&ctx) -> int { + return this->create_tx_ctx_(msg, ctx); + }; ret = ls_rollback_to_savepoint_(msg.tx_id_, msg.receiver_, msg.epoch_, msg.op_sn_, msg.savepoint_, ctx_born_epoch, - msg.tx_ptr_); - if (OB_NOT_NULL(msg.tx_ptr_)) { - ob_free((void*)msg.tx_ptr_); - msg.tx_ptr_ = NULL; - } + create_tx_ctx_func); result.status_ = ret; result.addr_ = self_; result.born_epoch_ = ctx_born_epoch; diff --git a/src/storage/tx/ob_trans_service_v4.h b/src/storage/tx/ob_trans_service_v4.h index 9a42d47283..e3a598431e 100644 --- a/src/storage/tx/ob_trans_service_v4.h +++ b/src/storage/tx/ob_trans_service_v4.h @@ -173,7 +173,7 @@ int rollback_savepoint_slowpath_(ObTxDesc &tx, int create_tx_ctx_(const share::ObLSID &ls_id, const ObTxDesc &tx, ObPartTransCtx *&ctx); - +int create_tx_ctx_(ObTxRollbackSPMsg &msg, ObPartTransCtx *&ctx); int create_tx_ctx_(const share::ObLSID &ls_id, ObLS *ls, const ObTxDesc &tx, @@ -281,7 +281,7 @@ int ls_rollback_to_savepoint_(const ObTransID &tx_id, const int64_t op_sn, const int64_t savepoint, int64_t &ctx_born_epoch, - const ObTxDesc *tx, + ObFunction &func, int64_t expire_ts = -1); int sync_rollback_savepoint__(ObTxDesc &tx, ObTxRollbackSPMsg &msg, diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 44fa54231b..e153420db6 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1277,13 +1277,16 @@ int ObTransService::rollback_savepoint_(ObTxDesc &tx, slowpath = false; ObTxPart &p = parts[0]; int64_t born_epoch = 0; + ObFunction create_tx_ctx_func = [this, &p, &tx](ObPartTransCtx *&ctx) -> int { + return this->create_tx_ctx_(p.id_, tx, ctx); + }; if (OB_FAIL(ls_rollback_to_savepoint_(tx.tx_id_, p.id_, p.epoch_, tx.op_sn_, savepoint, born_epoch, - &tx, + create_tx_ctx_func, expire_ts))) { if (OB_NOT_MASTER == ret) { slowpath = true; @@ -1337,7 +1340,7 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id, const int64_t op_sn, const int64_t savepoint, int64_t &ctx_born_epoch, - const ObTxDesc *tx, + ObFunction &func, int64_t expire_ts) { int ret = OB_SUCCESS; @@ -1346,8 +1349,8 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id, if (OB_FAIL(get_tx_ctx_(ls, tx_id, ctx))) { if (OB_NOT_MASTER == ret) { } else if (OB_TRANS_CTX_NOT_EXIST == ret && verify_epoch <= 0) { - if (OB_FAIL(create_tx_ctx_(ls, *tx, ctx))) { - TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(ls), KPC(tx)); + if (OB_FAIL(func(ctx))) { + TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(ls), K(tx_id)); } } else { TRANS_LOG(WARN, "get transaction context error", K(ret), K(tx_id), K(ls)); @@ -1398,32 +1401,12 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx, msg.tx_id_ = tx.tx_id_; msg.savepoint_ = savepoint; msg.op_sn_ = tx.op_sn_; + msg.can_elr_ = tx.can_elr_; + msg.session_id_ = tx.sess_id_; + msg.tx_addr_ = tx.addr_; + msg.tx_expire_ts_ = tx.get_expire_ts(); msg.epoch_ = -1; msg.request_id_ = tx.op_sn_; - // prepare msg.tx_ptr_ if required - // TODO(yunxing.cyx) : in 4.1 rework here, won't serialize txDesc - ObTxDesc *tmp_tx_desc = NULL; - ARRAY_FOREACH_NORET(parts, i) { - if (parts[i].epoch_ <= 0) { - int64_t len = tx.get_serialize_size() + sizeof(ObTxDesc); - char *buf = (char*)ob_malloc(len); - int64_t pos = sizeof(ObTxDesc); - if (OB_FAIL(tx.serialize(buf, len, pos))) { - TRANS_LOG(WARN, "serialize tx fail", KR(ret), K(tx)); - ob_free(buf); - } else { - tmp_tx_desc = new(buf)ObTxDesc(); - pos = sizeof(ObTxDesc); - if (OB_FAIL(tmp_tx_desc->deserialize(buf, len, pos))) { - TRANS_LOG(WARN, "deserialize tx fail", KR(ret)); - } else { - tmp_tx_desc->parts_.reset(); - msg.tx_ptr_ = tmp_tx_desc; - } - } - break; - } - } int64_t start_ts = ObTimeUtility::current_time(); int retries = 0; if (OB_SUCC(ret)) { @@ -1444,11 +1427,6 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx, // clear interrupt flag tx.flags_.INTERRUPTED_ = false; } - if (OB_NOT_NULL(tmp_tx_desc)) { - msg.tx_ptr_ = NULL; - tmp_tx_desc->~ObTxDesc(); - ob_free(tmp_tx_desc); - } auto elapsed_us = ObTimeUtility::current_time() - start_ts; TRANS_LOG(INFO, "rollback savepoint slowpath", K(ret), K_(tx.tx_id), K(start_ts), K(retries), diff --git a/src/storage/tx/ob_tx_msg.cpp b/src/storage/tx/ob_tx_msg.cpp index 67f6522a6b..b31dd17125 100644 --- a/src/storage/tx/ob_tx_msg.cpp +++ b/src/storage/tx/ob_tx_msg.cpp @@ -53,56 +53,7 @@ OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoReqMsg, ObTxMsg, xid_, upstream_, ap OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoRespMsg, ObTxMsg); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareVersionReqMsg, ObTxMsg); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareVersionRespMsg, ObTxMsg, prepare_version_, prepare_info_array_); - -OB_DEF_SERIALIZE_SIZE(ObTxRollbackSPMsg) -{ - int len = 0; - len += ObTxMsg::get_serialize_size(); - LST_DO_CODE(OB_UNIS_ADD_LEN, savepoint_, op_sn_, branch_id_); - if (OB_NOT_NULL(tx_ptr_)) { - OB_UNIS_ADD_LEN(true); - OB_UNIS_ADD_LEN(*tx_ptr_); - } else { - OB_UNIS_ADD_LEN(false); - } - return len; -} - -OB_DEF_SERIALIZE(ObTxRollbackSPMsg) -{ - int ret = ObTxMsg::serialize(buf, buf_len, pos); - if (OB_SUCC(ret)) { - LST_DO_CODE(OB_UNIS_ENCODE, savepoint_, op_sn_, branch_id_); - if (OB_NOT_NULL(tx_ptr_)) { - OB_UNIS_ENCODE(true); - OB_UNIS_ENCODE(*tx_ptr_); - } else { - OB_UNIS_ENCODE(false); - } - } - return ret; -} - -OB_DEF_DESERIALIZE(ObTxRollbackSPMsg) -{ - int ret = ObTxMsg::deserialize(buf, data_len, pos); - if (OB_SUCC(ret)) { - LST_DO_CODE(OB_UNIS_DECODE, savepoint_, op_sn_, branch_id_); - bool has_tx_ptr = false; - OB_UNIS_DECODE(has_tx_ptr); - if (has_tx_ptr) { - void *buffer = ob_malloc(sizeof(ObTxDesc)); - if (OB_ISNULL(buffer)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - } else { - ObTxDesc *tmp = new(buffer)ObTxDesc(); - OB_UNIS_DECODE(*tmp); - tx_ptr_ = tmp; - } - } - } - return ret; -} +OB_SERIALIZE_MEMBER_INHERIT(ObTxRollbackSPMsg, ObTxMsg, savepoint_, op_sn_, can_elr_, session_id_, tx_addr_, tx_expire_ts_); bool ObTxMsg::is_valid() const { @@ -208,7 +159,9 @@ bool ObTxRollbackSPMsg::is_valid() const { bool ret = false; if (ObTxMsg::is_valid() && type_ == ROLLBACK_SAVEPOINT - && savepoint_ > -1 && op_sn_ > -1) { + && savepoint_ > -1 && op_sn_ > -1 + && session_id_ > 0 && tx_addr_.is_valid() + && tx_expire_ts_ > 0) { ret = true; } return ret; diff --git a/src/storage/tx/ob_tx_msg.h b/src/storage/tx/ob_tx_msg.h index 62b31f29d6..b72a7d282f 100644 --- a/src/storage/tx/ob_tx_msg.h +++ b/src/storage/tx/ob_tx_msg.h @@ -229,26 +229,21 @@ namespace transaction ObTxMsg(ROLLBACK_SAVEPOINT), savepoint_(-1), op_sn_(-1), - //todo:后续branch_id使用方式确定后,需要相应修改 - branch_id_(-1), - tx_ptr_(NULL) + can_elr_(false), + session_id_(0), + tx_addr_(), + tx_expire_ts_(-1) {} - ~ObTxRollbackSPMsg() { - if (OB_NOT_NULL(tx_ptr_)) { - tx_ptr_->~ObTxDesc(); - ob_free((void*)tx_ptr_); - tx_ptr_ = NULL; - } - } + ~ObTxRollbackSPMsg() {} int64_t savepoint_; int64_t op_sn_; - //todo:后期设计中操作编号是否等于branch_id - int64_t branch_id_; - const ObTxDesc *tx_ptr_; + bool can_elr_; + uint32_t session_id_; + ObAddr tx_addr_; + int64_t tx_expire_ts_; bool is_valid() const; INHERIT_TO_STRING_KV("txMsg", ObTxMsg, - K_(savepoint), K_(op_sn), K_(branch_id), - KP_(tx_ptr)); + K_(savepoint), K_(op_sn), K_(can_elr), K_(session_id), K_(tx_addr), K_(tx_expire_ts)); OB_UNIS_VERSION(1); }; diff --git a/unittest/storage/tx/test_ob_tx_msg.cpp b/unittest/storage/tx/test_ob_tx_msg.cpp index cac8446385..1c7644fc73 100644 --- a/unittest/storage/tx/test_ob_tx_msg.cpp +++ b/unittest/storage/tx/test_ob_tx_msg.cpp @@ -121,8 +121,10 @@ public: msg.request_id_ = op_sn_; msg.savepoint_ = 1; msg.op_sn_ = op_sn_; - msg.branch_id_ = 1; - msg.tx_ptr_ = tx; + msg.can_elr_ = true; + msg.session_id_ = 202; + msg.tx_addr_ = ObAddr(ObAddr::VER::IPV4, "127.1.1.2", 8919); + msg.tx_expire_ts_ = 120000; } void build_tx_keepalive_msg(ObTxKeepaliveMsg &msg) { @@ -432,11 +434,10 @@ TEST_F(TestObTxMsg, trans_rollback_sp_msg) EXPECT_EQ(msg.cluster_id_, msg1.cluster_id_); EXPECT_EQ(msg.savepoint_, msg1.savepoint_); EXPECT_EQ(msg.op_sn_, msg1.op_sn_); - EXPECT_EQ(msg.branch_id_, msg1.branch_id_); - EXPECT_EQ(msg.tx_ptr_->parts_[0].id_, msg1.tx_ptr_->parts_[0].id_); - if (OB_NOT_NULL(msg.tx_ptr_)) { - msg.tx_ptr_ = NULL; - } + EXPECT_EQ(msg.can_elr_, msg1.can_elr_); + EXPECT_EQ(msg.session_id_, msg1.session_id_); + EXPECT_EQ(msg.tx_addr_, msg1.tx_addr_); + EXPECT_EQ(msg.tx_expire_ts_, msg1.tx_expire_ts_); } TEST_F(TestObTxMsg, trans_keepalive_msg)