submit clear log with the larger scn than any commit log scn from pariticipants

This commit is contained in:
obdev
2023-02-09 14:39:44 +00:00
committed by ob-robot
parent 0ab76c186b
commit fc69016702
10 changed files with 142 additions and 21 deletions

View File

@ -1028,6 +1028,25 @@ int ObLSTxCtxMgr::check_scheduler_status(SCN &min_start_scn, MinStartScnStatus &
return ret; return ret;
} }
int ObLSTxCtxMgr::get_max_decided_scn(share::SCN &scn)
{
RLockGuard guard(rwlock_);
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "ObLSTxCtxMgr not inited");
ret = OB_NOT_INIT;
// There is no need to check whether it is master
// this interface is called by leader or follower
} else if (is_stopped_()) {
ret = OB_STATE_NOT_MATCH;
TRANS_LOG(WARN, "this ls has beend stopped", KPC(this));
} else if (OB_FAIL(tx_log_adapter_->get_max_decided_scn(scn))) {
TRANS_LOG(WARN, "get max decided scn failed", K(ret));
}
return ret;
}
int ObLSTxCtxMgr::check_modify_schema_elapsed(const ObTabletID &tablet_id, int ObLSTxCtxMgr::check_modify_schema_elapsed(const ObTabletID &tablet_id,
const int64_t schema_version, const int64_t schema_version,
ObTransID &block_tx_id) ObTransID &block_tx_id)

View File

@ -480,6 +480,8 @@ public:
// ObPartTransCtx -> ObLSTxCtxMgr, It will be a deadlock with normal order. // ObPartTransCtx -> ObLSTxCtxMgr, It will be a deadlock with normal order.
int update_aggre_log_ts_wo_lock(share::SCN rec_log_ts); int update_aggre_log_ts_wo_lock(share::SCN rec_log_ts);
int get_max_decided_scn(share::SCN & scn);
TO_STRING_KV(KP(this), TO_STRING_KV(KP(this),
K_(ls_id), K_(ls_id),
K_(tenant_id), K_(tenant_id),
@ -943,6 +945,8 @@ public:
// @param [in] ls_id: the specified ls_id // @param [in] ls_id: the specified ls_id
int check_scheduler_status(share::ObLSID ls_id); int check_scheduler_status(share::ObLSID ls_id);
int get_max_decided_scn(const share::ObLSID &ls_id, share::SCN & scn);
private: private:
int create_ls_(const int64_t tenant_id, int create_ls_(const int64_t tenant_id,
const share::ObLSID &ls_id, const share::ObLSID &ls_id,

View File

@ -298,6 +298,7 @@ void ObPartTransCtx::default_init_()
is_incomplete_replay_ctx_ = false; is_incomplete_replay_ctx_ = false;
is_submitting_redo_log_for_freeze_ = false; is_submitting_redo_log_for_freeze_ = false;
start_working_log_ts_ = SCN::min_scn(); start_working_log_ts_ = SCN::min_scn();
max_2pc_commit_scn_.reset();
coord_prepare_info_arr_.reset(); coord_prepare_info_arr_.reset();
reserve_allocator_.reset(); reserve_allocator_.reset();
elr_handler_.reset(); elr_handler_.reset();
@ -3126,8 +3127,7 @@ int ObPartTransCtx::submit_clear_log_()
ObTxClearLog clear_log(exec_info_.incremental_participants_); ObTxClearLog clear_log(exec_info_.incremental_participants_);
ObTxLogCb *log_cb = NULL; ObTxLogCb *log_cb = NULL;
const int64_t replay_hint = static_cast<int64_t>(trans_id_.get_id()); const int64_t replay_hint = static_cast<int64_t>(trans_id_.get_id());
ObTxLogBlockHeader ObTxLogBlockHeader log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_);
log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_);
if (OB_FAIL(log_block.init(replay_hint, log_block_header))) { if (OB_FAIL(log_block.init(replay_hint, log_block_header))) {
TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this)); TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this));
@ -3147,7 +3147,9 @@ int ObPartTransCtx::submit_clear_log_()
} else if (OB_FAIL(acquire_ctx_ref_())) { } else if (OB_FAIL(acquire_ctx_ref_())) {
TRANS_LOG(ERROR, "acquire ctx ref failed", KR(ret), K(*this)); TRANS_LOG(ERROR, "acquire ctx ref failed", KR(ret), K(*this));
} else if (OB_FAIL(ls_tx_ctx_mgr_->get_ls_log_adapter()->submit_log( } else if (OB_FAIL(ls_tx_ctx_mgr_->get_ls_log_adapter()->submit_log(
log_block.get_buf(), log_block.get_size(), SCN::min_scn(), log_cb, false))) { log_block.get_buf(), log_block.get_size(),
share::SCN::max(ctx_tx_data_.get_end_log_ts(), max_2pc_commit_scn_), log_cb,
false))) {
TRANS_LOG(WARN, "submit log to clog adapter failed", KR(ret), K(*this)); TRANS_LOG(WARN, "submit log to clog adapter failed", KR(ret), K(*this));
return_log_cb_(log_cb); return_log_cb_(log_cb);
log_cb = NULL; log_cb = NULL;

View File

@ -608,6 +608,7 @@ private:
const ObTxMsg &recv_msg, const ObTxMsg &recv_msg,
const common::ObAddr &self_addr, const common::ObAddr &self_addr,
ObITransRpc* rpc); ObITransRpc* rpc);
static int get_max_decided_scn_(const share::ObLSID &ls_id, share::SCN &scn);
int get_2pc_participants_copy(share::ObLSArray &copy_participants); int get_2pc_participants_copy(share::ObLSArray &copy_participants);
// for xa // for xa
int post_tx_sub_prepare_resp_(const int status); int post_tx_sub_prepare_resp_(const int status);
@ -800,6 +801,7 @@ private:
ObTxState upstream_state_; ObTxState upstream_state_;
const ObTxMsg * msg_2pc_cache_; const ObTxMsg * msg_2pc_cache_;
share::SCN max_2pc_commit_scn_;
ObLSLogInfoArray coord_prepare_info_arr_; ObLSLogInfoArray coord_prepare_info_arr_;
TransModulePageAllocator reserve_allocator_; TransModulePageAllocator reserve_allocator_;
// tmp scheduler addr is used to post response for the second phase of xa commit/rollback // tmp scheduler addr is used to post response for the second phase of xa commit/rollback

View File

@ -553,6 +553,31 @@ int ObTransService::get_min_undecided_log_ts(const ObLSID &ls_id, SCN &log_ts)
return ret; return ret;
} }
int ObTransService::get_max_decided_scn(const share::ObLSID &ls_id, share::SCN &scn)
{
int ret = OB_SUCCESS;
ObLSTxCtxMgr *ls_tx_mgr_ptr = nullptr;
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "ObTransService not inited");
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(!is_running_)) {
TRANS_LOG(WARN, "ObTransService is not running");
ret = OB_NOT_RUNNING;
} else if (!ls_id.is_valid()) {
TRANS_LOG(WARN, "invalid argument", K(ls_id));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(tx_ctx_mgr_.get_ls_tx_ctx_mgr(ls_id, ls_tx_mgr_ptr))) {
TRANS_LOG(WARN, "get ls tx ctx mgr failed", K(ret));
} else {
if (OB_FAIL(ls_tx_mgr_ptr->get_max_decided_scn(scn))) {
TRANS_LOG(WARN, "get max decided scn failed", K(ret));
}
tx_ctx_mgr_.revert_ls_tx_ctx_mgr(ls_tx_mgr_ptr);
}
return ret;
}
int ObTransService::handle_redo_sync_task_(ObDupTableRedoSyncTask *task, bool &need_release_task) int ObTransService::handle_redo_sync_task_(ObDupTableRedoSyncTask *task, bool &need_release_task)
{ {
UNUSED(task); UNUSED(task);

View File

@ -224,6 +224,7 @@ public:
const bool is_rollback, const bool is_rollback,
const int64_t expire_ts); const int64_t expire_ts);
int get_max_commit_version(share::SCN &commit_version) const; int get_max_commit_version(share::SCN &commit_version) const;
int get_max_decided_scn(const share::ObLSID &ls_id, share::SCN & scn);
#include "ob_trans_service_v4.h" #include "ob_trans_service_v4.h"
#include "ob_tx_free_route_api.h" #include "ob_tx_free_route_api.h"
private: private:

View File

@ -10,6 +10,7 @@
* See the Mulan PubL v2 for more details. * See the Mulan PubL v2 for more details.
*/ */
#include "storage/tx/ob_trans_service.h"
#include "storage/tx/ob_trans_part_ctx.h" #include "storage/tx/ob_trans_part_ctx.h"
namespace oceanbase namespace oceanbase
@ -147,6 +148,12 @@ int ObPartTransCtx::post_msg_(const ObTwoPhaseCommitMsgType& msg_type,
Ob2pcCommitRespMsg commit_resp; Ob2pcCommitRespMsg commit_resp;
build_tx_common_msg_(receiver, commit_resp); build_tx_common_msg_(receiver, commit_resp);
commit_resp.commit_version_ = ctx_tx_data_.get_commit_version(); commit_resp.commit_version_ = ctx_tx_data_.get_commit_version();
if (max_2pc_commit_scn_.is_valid()) {
commit_resp.commit_log_scn_ =
share::SCN::max(max_2pc_commit_scn_, ctx_tx_data_.get_end_log_ts());
} else {
commit_resp.commit_log_scn_ = ctx_tx_data_.get_end_log_ts();
}
if (OB_FAIL(post_msg_(receiver, commit_resp))) { if (OB_FAIL(post_msg_(receiver, commit_resp))) {
TRANS_LOG(WARN, "rpc post msg failed", K(ret), K(*this), K(receiver), K(msg_type)); TRANS_LOG(WARN, "rpc post msg failed", K(ret), K(*this), K(receiver), K(msg_type));
} }
@ -172,6 +179,7 @@ int ObPartTransCtx::post_msg_(const ObTwoPhaseCommitMsgType& msg_type,
case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ: { case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ: {
Ob2pcClearReqMsg clear_req; Ob2pcClearReqMsg clear_req;
build_tx_common_msg_(receiver, clear_req); build_tx_common_msg_(receiver, clear_req);
clear_req.max_commit_log_scn_ = share::SCN::max(max_2pc_commit_scn_, ctx_tx_data_.get_end_log_ts());
if (OB_FAIL(post_msg_(receiver, clear_req))) { if (OB_FAIL(post_msg_(receiver, clear_req))) {
TRANS_LOG(WARN, "rpc post msg failed", K(ret), K(*this), K(receiver), K(msg_type)); TRANS_LOG(WARN, "rpc post msg failed", K(ret), K(*this), K(receiver), K(msg_type));
} }
@ -283,7 +291,11 @@ int ObPartTransCtx::post_orphan_msg_(const ObTwoPhaseCommitMsgType &msg_type,
build_tx_common_msg_(recv_msg, build_tx_common_msg_(recv_msg,
self_addr, self_addr,
clear_req); clear_req);
ret = rpc->post_msg(recv_msg.get_sender_addr(), clear_req); if (OB_FAIL(MTL(ObTransService*)->get_max_decided_scn(clear_req.sender_, clear_req.max_commit_log_scn_))) {
TRANS_LOG(WARN, "get max get_max_decided_scn failed", K(ret), K(clear_req));
} else {
ret = rpc->post_msg(recv_msg.get_sender_addr(), clear_req);
}
break; break;
} }
@ -539,7 +551,7 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type)
case ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_RESP: { case ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_RESP: {
const Ob2pcCommitRespMsg &msg = *(static_cast<const Ob2pcCommitRespMsg *>(msg_2pc_cache_)); const Ob2pcCommitRespMsg &msg = *(static_cast<const Ob2pcCommitRespMsg *>(msg_2pc_cache_));
max_2pc_commit_scn_ = share::SCN::max(msg.commit_log_scn_, max_2pc_commit_scn_);
if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) { if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) {
TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), K(*this)); TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), K(*this));
} }
@ -562,7 +574,13 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type)
} }
case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ: { case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ: {
const Ob2pcClearReqMsg &msg = *(static_cast<const Ob2pcClearReqMsg *>(msg_2pc_cache_)); const Ob2pcClearReqMsg &msg = *(static_cast<const Ob2pcClearReqMsg *>(msg_2pc_cache_));
if (msg.max_commit_log_scn_ < max_2pc_commit_scn_
|| msg.max_commit_log_scn_ < ctx_tx_data_.get_end_log_ts()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected max commit log scn in clear request", K(ret), KPC(this));
} else {
max_2pc_commit_scn_ = share::SCN::max(msg.max_commit_log_scn_, max_2pc_commit_scn_);
}
break; break;
} }
case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_RESP: { case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_RESP: {

View File

@ -44,10 +44,10 @@ OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRespMsg, ObTxMsg, prepare_version_, prep
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPreCommitReqMsg, ObTxMsg, commit_version_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPreCommitReqMsg, ObTxMsg, commit_version_);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPreCommitRespMsg, ObTxMsg, commit_version_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPreCommitRespMsg, ObTxMsg, commit_version_);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcCommitReqMsg, ObTxMsg, commit_version_, prepare_info_array_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcCommitReqMsg, ObTxMsg, commit_version_, prepare_info_array_);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcCommitRespMsg, ObTxMsg, commit_version_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcCommitRespMsg, ObTxMsg, commit_version_, commit_log_scn_);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcAbortReqMsg, ObTxMsg, upstream_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcAbortReqMsg, ObTxMsg, upstream_);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcAbortRespMsg, ObTxMsg); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcAbortRespMsg, ObTxMsg);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcClearReqMsg, ObTxMsg); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcClearReqMsg, ObTxMsg, max_commit_log_scn_);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcClearRespMsg, ObTxMsg); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcClearRespMsg, ObTxMsg);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoReqMsg, ObTxMsg, xid_, upstream_, app_trace_info_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoReqMsg, ObTxMsg, xid_, upstream_, app_trace_info_);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoRespMsg, ObTxMsg); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoRespMsg, ObTxMsg);
@ -288,13 +288,16 @@ bool Ob2pcCommitReqMsg::is_valid() const
return ret; return ret;
} }
bool Ob2pcCommitRespMsg::is_valid() const bool Ob2pcCommitRespMsg::is_valid() const {
{
bool ret = false; bool ret = false;
if (ObTxMsg::is_valid() && type_ == TX_2PC_COMMIT_RESP if (ObTxMsg::is_valid() && type_ == TX_2PC_COMMIT_RESP &&
&& commit_version_.is_valid()) { commit_version_.is_valid()) {
ret = true; ret = true;
} }
if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0 &&
!commit_log_scn_.is_valid()) {
ret = false;
}
return ret; return ret;
} }
@ -316,12 +319,15 @@ bool Ob2pcAbortRespMsg::is_valid() const
return ret; return ret;
} }
bool Ob2pcClearReqMsg::is_valid() const bool Ob2pcClearReqMsg::is_valid() const {
{
bool ret = false; bool ret = false;
if (ObTxMsg::is_valid() && type_ == TX_2PC_CLEAR_REQ) { if (ObTxMsg::is_valid() && type_ == TX_2PC_CLEAR_REQ) {
ret = true; ret = true;
} }
if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0 &&
!max_commit_log_scn_.is_valid()) {
ret = false;
}
return ret; return ret;
} }

View File

@ -358,13 +358,13 @@ namespace transaction
struct Ob2pcCommitRespMsg : public ObTxMsg struct Ob2pcCommitRespMsg : public ObTxMsg
{ {
public: public:
Ob2pcCommitRespMsg() : Ob2pcCommitRespMsg() : ObTxMsg(TX_2PC_COMMIT_RESP) {}
ObTxMsg(TX_2PC_COMMIT_RESP)
{}
public: public:
bool is_valid() const; bool is_valid() const;
share::SCN commit_version_; share::SCN commit_version_;
INHERIT_TO_STRING_KV("txMsg", ObTxMsg, K_(commit_version)); share::SCN commit_log_scn_;
INHERIT_TO_STRING_KV("txMsg", ObTxMsg, K_(commit_version), K_(commit_log_scn));
OB_UNIS_VERSION(1); OB_UNIS_VERSION(1);
}; };
@ -401,6 +401,8 @@ namespace transaction
{} {}
public: public:
bool is_valid() const; bool is_valid() const;
share::SCN max_commit_log_scn_;
INHERIT_TO_STRING_KV("txMsg", ObTxMsg, K_(max_commit_log_scn));
OB_UNIS_VERSION(1); OB_UNIS_VERSION(1);
}; };

View File

@ -389,6 +389,7 @@ public:
const static int64_t TASK_QUEUE_CNT = 128; const static int64_t TASK_QUEUE_CNT = 128;
ObSpScLinkQueue apply_task_queue_arr[TASK_QUEUE_CNT]; ObSpScLinkQueue apply_task_queue_arr[TASK_QUEUE_CNT];
ObSpScLinkQueue replay_task_queue_arr[TASK_QUEUE_CNT]; ObSpScLinkQueue replay_task_queue_arr[TASK_QUEUE_CNT];
share::SCN max_submit_scn_ = share::SCN::invalid_scn();
void run1() { void run1() {
while(true) { while(true) {
@ -466,6 +467,7 @@ public:
ApplyCbTask *apply_task = new ApplyCbTask(); ApplyCbTask *apply_task = new ApplyCbTask();
apply_task->replay_hint_ = replay_hint; apply_task->replay_hint_ = replay_hint;
apply_task->cb_ = cb; apply_task->cb_ = cb;
max_submit_scn_ = share::SCN::max(max_submit_scn_, scn);
apply_task_queue_arr[queue_idx].push(apply_task); apply_task_queue_arr[queue_idx].push(apply_task);
ATOMIC_INC(&inflight_cnt_); ATOMIC_INC(&inflight_cnt_);
@ -499,9 +501,49 @@ public:
return OB_SUCCESS; return OB_SUCCESS;
} }
int get_max_decided_scn(share::SCN &scn) int get_max_decided_scn(share::SCN &scn) {
{ int ret = OB_SUCCESS;
UNUSED(scn); share::SCN min_unreplayed_scn;
share::SCN min_unapplyed_scn;
min_unreplayed_scn.invalid_scn();
min_unapplyed_scn.invalid_scn();
for (int64_t i = 0; i < TASK_QUEUE_CNT; ++i) {
if (!replay_task_queue_arr[i].empty()) {
share::SCN tmp_scn;
tmp_scn.convert_for_gts(
static_cast<ReplayCbTask *>(replay_task_queue_arr[i].top())->log_ts_);
if (min_unreplayed_scn.is_valid() && tmp_scn.is_valid()) {
min_unreplayed_scn = share::SCN::min(tmp_scn, min_unreplayed_scn);
} else if (tmp_scn.is_valid()) {
min_unreplayed_scn = tmp_scn;
}
}
}
for (int64_t i = 0; i < TASK_QUEUE_CNT; ++i) {
if (!apply_task_queue_arr[i].empty()) {
share::SCN tmp_scn;
tmp_scn = (static_cast<ObTxBaseLogCb *>(
(static_cast<ApplyCbTask *>(apply_task_queue_arr[i].top()))
->cb_))
->get_log_ts();
if (min_unapplyed_scn.is_valid() && tmp_scn.is_valid()) {
min_unapplyed_scn = share::SCN::min(tmp_scn, min_unapplyed_scn);
} else if (tmp_scn.is_valid()) {
min_unapplyed_scn = tmp_scn;
}
}
}
if (min_unapplyed_scn.is_valid() && min_unapplyed_scn.is_valid()) {
scn = share::SCN::max(min_unapplyed_scn, min_unapplyed_scn);
} else {
scn = max_submit_scn_;
}
if (scn.is_valid()) {
share::SCN::minus(scn, 1);
}
return OB_SUCCESS; return OB_SUCCESS;
} }