Skip retain ctx when record min_start_scn in ctx

This commit is contained in:
ZenoWang
2024-02-10 07:53:18 +00:00
committed by ob-robot
parent 87f572c4b6
commit a7d0961542
6 changed files with 61 additions and 21 deletions

View File

@ -100,6 +100,14 @@ int ObCtxTxData::insert_into_tx_table()
return ret;
}
bool ObCtxTxData::is_decided() const
{
// ATTENTION! : decided means the callback function of commit_log/abort_log has been called and the tx_data has been
// inserted into TxDataTable. The read_only_ flag is set as true after inserting into tx data table.
RLockGuard guard(lock_);
return read_only_;
}
int ObCtxTxData::recover_tx_data(ObTxData *tmp_tx_data)
{
int ret = OB_SUCCESS;

View File

@ -38,6 +38,7 @@ public:
bool is_read_only() const { return read_only_; }
bool has_recovered_from_tx_table() const { return recovered_from_tx_table_; }
bool is_decided() const;
share::SCN get_max_replayed_rollback_scn() const { return max_replayed_rollback_scn_; }
void set_max_replayed_rollback_scn(const share::SCN &scn) { max_replayed_rollback_scn_ = scn; }
int insert_into_tx_table();

View File

@ -1143,14 +1143,15 @@ int ObLSTxCtxMgr::check_scheduler_status(SCN &min_start_scn, MinStartScnStatus &
IteratePartCtxAskSchedulerStatusFunctor functor;
if (OB_FAIL(ls_tx_ctx_map_.for_each(functor))) {
TRANS_LOG(WARN, "for each transaction context error", KR(ret), "manager", *this);
} else if (!min_start_scn.is_valid()) {
// The default min_start_scn must be valid, or skip writting HAS_CTX/NO_CTX CLOG
status = MinStartScnStatus::UNKOWN;
} else {
if (0 == ls_tx_ctx_map_.count()) {
min_start_scn.reset();
status = MinStartScnStatus::NO_CTX;
} else {
min_start_scn = functor.get_min_start_scn();
status = functor.get_min_start_status();
}
// use smaller one between max_decided_scn and min_start_scn of all tx ctx
TRANS_LOG(DEBUG, "set min start scn", K(min_start_scn), K(functor.get_min_start_scn()));
min_start_scn = std::min(min_start_scn, functor.get_min_start_scn());
status = functor.get_min_start_status();
}
return ret;

View File

@ -1378,6 +1378,8 @@ public:
IteratePartCtxAskSchedulerStatusFunctor()
{
SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/);
first_err_code_ = OB_SUCCESS;
has_start_scn_ctx_cnt_ = 0;
min_start_scn_.set_max();
}
@ -1390,24 +1392,29 @@ public:
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", KR(ret), K(tx_id), "ctx", OB_P(tx_ctx));
} else {
share::SCN ctx_start_scn = tx_ctx->get_start_log_ts();
if (!ctx_start_scn.is_valid()) {
ctx_start_scn.set_max();
// logic for get min_start_scn
if (tx_ctx->is_decided()) {
TRANS_LOG(DEBUG, "skip record committed tx", KPC(tx_ctx));
} else if (tx_ctx->get_start_log_ts().is_valid()) {
has_start_scn_ctx_cnt_++;
min_start_scn_ = MIN(min_start_scn_, tx_ctx->get_start_log_ts());
}
if (OB_FALSE_IT(min_start_scn_ = MIN(min_start_scn_, ctx_start_scn))) {
// do nothing
} else if (OB_FAIL(tx_ctx->check_scheduler_status())) {
TRANS_LOG(WARN, "check scheduler status error", KR(ret), "ctx", *tx_ctx);
} else {
// do nothing
// logic for gc tx ctx
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(tx_ctx->check_scheduler_status())) {
TRANS_LOG(WARN, "check scheduler status error", KR(tmp_ret), "ctx", *tx_ctx);
}
}
if (OB_FAIL(ret)) {
min_start_scn_.reset();
has_start_scn_ctx_cnt_ = 0;
if (OB_SUCCESS == first_err_code_) {
// record first error code if exist
first_err_code_ = ret;
}
}
return true;
}
@ -1417,15 +1424,29 @@ public:
{
MinStartScnStatus start_status = MinStartScnStatus::HAS_CTX;
if (!min_start_scn_.is_valid()) {
if (OB_SUCCESS != first_err_code_) {
start_status = MinStartScnStatus::UNKOWN;
} else if (min_start_scn_.is_max()) {
} else if (!min_start_scn_.is_valid()) {
start_status = MinStartScnStatus::UNKOWN;
} else if (0 == has_start_scn_ctx_cnt_ || min_start_scn_.is_max()) {
start_status = MinStartScnStatus::NO_CTX;
if ((0 == has_start_scn_ctx_cnt_) && (!min_start_scn_.is_max())) {
TRANS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "unexpected values pair", K(has_start_scn_ctx_cnt_), K(min_start_scn_));
}
}
TRANS_LOG(DEBUG,
"get min start status",
K(first_err_code_),
K(has_start_scn_ctx_cnt_),
K(min_start_scn_),
K(start_status));
return start_status;
}
private:
int first_err_code_;
int64_t has_start_scn_ctx_cnt_;
share::SCN min_start_scn_;
};

View File

@ -352,6 +352,7 @@ public:
// ========================================================
// newly added for 4.0
bool is_decided() const { return ctx_tx_data_.is_decided(); }
int retry_dup_trx_before_prepare(
const share::SCN &before_prepare_version,
const ObDupTableBeforePrepareRequest::BeforePrepareScnSrc before_prepare_src);

View File

@ -156,8 +156,8 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx)
iter_ret = OB_SUCCESS;
cur_ls_ptr = nullptr;
while (OB_SUCCESS == (iter_ret = iter_ptr->get_next(cur_ls_ptr))) {
SCN min_start_scn;
SCN min_start_scn = SCN::invalid_scn();
SCN max_decided_scn = SCN::invalid_scn();
MinStartScnStatus status = MinStartScnStatus::UNKOWN;
common::ObRole role;
int64_t base_proposal_id, proposal_id;
@ -173,6 +173,14 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx)
if (can_tx_gc) {
// TODO shanyan.g close ctx gc temporarily because of logical bug
//
// ATTENTION : get_max_decided_scn must before iterating all trans ctx.
// set max_decided_scn as default value
if (OB_TMP_FAIL(cur_ls_ptr->get_log_handler()->get_max_decided_scn(max_decided_scn))) {
TRANS_LOG(WARN, "get max decided scn failed", KR(tmp_ret), K(min_start_scn));
max_decided_scn.set_invalid();
}
min_start_scn = max_decided_scn;
do_tx_gc_(cur_ls_ptr, min_start_scn, status);
}