diff --git a/src/storage/tx/ob_ctx_tx_data.cpp b/src/storage/tx/ob_ctx_tx_data.cpp index 962b054139..076b2aae3c 100644 --- a/src/storage/tx/ob_ctx_tx_data.cpp +++ b/src/storage/tx/ob_ctx_tx_data.cpp @@ -100,6 +100,14 @@ int ObCtxTxData::insert_into_tx_table() return ret; } +bool ObCtxTxData::is_decided() const +{ + // ATTENTION! : decided means the callback function of commit_log/abort_log has been called and the tx_data has been + // inserted into TxDataTable. The read_only_ flag is set as true after inserting into tx data table. + RLockGuard guard(lock_); + return read_only_; +} + int ObCtxTxData::recover_tx_data(ObTxData *tmp_tx_data) { int ret = OB_SUCCESS; diff --git a/src/storage/tx/ob_ctx_tx_data.h b/src/storage/tx/ob_ctx_tx_data.h index 89b7c437b4..6e158b523b 100644 --- a/src/storage/tx/ob_ctx_tx_data.h +++ b/src/storage/tx/ob_ctx_tx_data.h @@ -38,6 +38,7 @@ public: bool is_read_only() const { return read_only_; } bool has_recovered_from_tx_table() const { return recovered_from_tx_table_; } + bool is_decided() const; share::SCN get_max_replayed_rollback_scn() const { return max_replayed_rollback_scn_; } void set_max_replayed_rollback_scn(const share::SCN &scn) { max_replayed_rollback_scn_ = scn; } int insert_into_tx_table(); diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index 6fe39eb2e2..3d76ee5b01 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -1143,14 +1143,15 @@ int ObLSTxCtxMgr::check_scheduler_status(SCN &min_start_scn, MinStartScnStatus & IteratePartCtxAskSchedulerStatusFunctor functor; if (OB_FAIL(ls_tx_ctx_map_.for_each(functor))) { TRANS_LOG(WARN, "for each transaction context error", KR(ret), "manager", *this); + } else if (!min_start_scn.is_valid()) { + // The default min_start_scn must be valid, or skip writting HAS_CTX/NO_CTX CLOG + status = MinStartScnStatus::UNKOWN; } else { - if (0 == ls_tx_ctx_map_.count()) { - min_start_scn.reset(); - status = MinStartScnStatus::NO_CTX; - } else { - min_start_scn = functor.get_min_start_scn(); - status = functor.get_min_start_status(); - } + // use smaller one between max_decided_scn and min_start_scn of all tx ctx + TRANS_LOG(DEBUG, "set min start scn", K(min_start_scn), K(functor.get_min_start_scn())); + min_start_scn = std::min(min_start_scn, functor.get_min_start_scn()); + + status = functor.get_min_start_status(); } return ret; diff --git a/src/storage/tx/ob_trans_functor.h b/src/storage/tx/ob_trans_functor.h index 7d135885dd..3928a21899 100644 --- a/src/storage/tx/ob_trans_functor.h +++ b/src/storage/tx/ob_trans_functor.h @@ -1378,6 +1378,8 @@ public: IteratePartCtxAskSchedulerStatusFunctor() { SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/); + first_err_code_ = OB_SUCCESS; + has_start_scn_ctx_cnt_ = 0; min_start_scn_.set_max(); } @@ -1390,24 +1392,29 @@ public: ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid argument", KR(ret), K(tx_id), "ctx", OB_P(tx_ctx)); } else { - share::SCN ctx_start_scn = tx_ctx->get_start_log_ts(); - if (!ctx_start_scn.is_valid()) { - ctx_start_scn.set_max(); + // logic for get min_start_scn + if (tx_ctx->is_decided()) { + TRANS_LOG(DEBUG, "skip record committed tx", KPC(tx_ctx)); + } else if (tx_ctx->get_start_log_ts().is_valid()) { + has_start_scn_ctx_cnt_++; + min_start_scn_ = MIN(min_start_scn_, tx_ctx->get_start_log_ts()); } - if (OB_FALSE_IT(min_start_scn_ = MIN(min_start_scn_, ctx_start_scn))) { - // do nothing - } else if (OB_FAIL(tx_ctx->check_scheduler_status())) { - TRANS_LOG(WARN, "check scheduler status error", KR(ret), "ctx", *tx_ctx); - } else { - // do nothing + // logic for gc tx ctx + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(tx_ctx->check_scheduler_status())) { + TRANS_LOG(WARN, "check scheduler status error", KR(tmp_ret), "ctx", *tx_ctx); } } if (OB_FAIL(ret)) { min_start_scn_.reset(); + has_start_scn_ctx_cnt_ = 0; + if (OB_SUCCESS == first_err_code_) { + // record first error code if exist + first_err_code_ = ret; + } } - return true; } @@ -1417,15 +1424,29 @@ public: { MinStartScnStatus start_status = MinStartScnStatus::HAS_CTX; - if (!min_start_scn_.is_valid()) { + if (OB_SUCCESS != first_err_code_) { start_status = MinStartScnStatus::UNKOWN; - } else if (min_start_scn_.is_max()) { + } else if (!min_start_scn_.is_valid()) { + start_status = MinStartScnStatus::UNKOWN; + } else if (0 == has_start_scn_ctx_cnt_ || min_start_scn_.is_max()) { start_status = MinStartScnStatus::NO_CTX; + if ((0 == has_start_scn_ctx_cnt_) && (!min_start_scn_.is_max())) { + TRANS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "unexpected values pair", K(has_start_scn_ctx_cnt_), K(min_start_scn_)); + } } + + TRANS_LOG(DEBUG, + "get min start status", + K(first_err_code_), + K(has_start_scn_ctx_cnt_), + K(min_start_scn_), + K(start_status)); return start_status; } private: + int first_err_code_; + int64_t has_start_scn_ctx_cnt_; share::SCN min_start_scn_; }; diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index c1c0bc15ce..f578cae1cf 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -352,6 +352,7 @@ public: // ======================================================== // newly added for 4.0 + bool is_decided() const { return ctx_tx_data_.is_decided(); } int retry_dup_trx_before_prepare( const share::SCN &before_prepare_version, const ObDupTableBeforePrepareRequest::BeforePrepareScnSrc before_prepare_src); diff --git a/src/storage/tx/ob_tx_loop_worker.cpp b/src/storage/tx/ob_tx_loop_worker.cpp index c439922170..a266746497 100644 --- a/src/storage/tx/ob_tx_loop_worker.cpp +++ b/src/storage/tx/ob_tx_loop_worker.cpp @@ -156,8 +156,8 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx) iter_ret = OB_SUCCESS; cur_ls_ptr = nullptr; while (OB_SUCCESS == (iter_ret = iter_ptr->get_next(cur_ls_ptr))) { - - SCN min_start_scn; + SCN min_start_scn = SCN::invalid_scn(); + SCN max_decided_scn = SCN::invalid_scn(); MinStartScnStatus status = MinStartScnStatus::UNKOWN; common::ObRole role; int64_t base_proposal_id, proposal_id; @@ -173,6 +173,14 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx) if (can_tx_gc) { // TODO shanyan.g close ctx gc temporarily because of logical bug // + + // ATTENTION : get_max_decided_scn must before iterating all trans ctx. + // set max_decided_scn as default value + if (OB_TMP_FAIL(cur_ls_ptr->get_log_handler()->get_max_decided_scn(max_decided_scn))) { + TRANS_LOG(WARN, "get max decided scn failed", KR(tmp_ret), K(min_start_scn)); + max_decided_scn.set_invalid(); + } + min_start_scn = max_decided_scn; do_tx_gc_(cur_ls_ptr, min_start_scn, status); }