diff --git a/src/sql/engine/expr/ob_expr_to_outfile_row.cpp b/src/sql/engine/expr/ob_expr_to_outfile_row.cpp index 35fc325b8d..2bc0444350 100644 --- a/src/sql/engine/expr/ob_expr_to_outfile_row.cpp +++ b/src/sql/engine/expr/ob_expr_to_outfile_row.cpp @@ -55,9 +55,9 @@ int ObExprToOutfileRow::calc_result_typeN(ObExprResType &type, } else { // deduce type and length type.set_varbinary(); - // the result is only used to select into outile, so we don't care the accurate length - type.set_length(OB_MAX_ROW_LENGTH); - // field_str + // the result is only used to select into outile, so we don't care the accurate length + type.set_length(OB_MAX_ROW_LENGTH); + // field_str types[PARAM_FIELD].set_calc_type(ObVarcharType); types[PARAM_FIELD].set_calc_collation_type(types[PARAM_FIELD].get_collation_type()); // line_str @@ -78,9 +78,9 @@ int ObExprToOutfileRow::calc_result_typeN(ObExprResType &type, int ObExprToOutfileRow::cg_expr(ObExprCGCtx &, const ObRawExpr &, ObExpr &expr) const { int ret = OB_SUCCESS; - CK(expr.arg_cnt_ > PARAM_SELECT_ITEM); - if (OB_SUCC(ret)) { - for (int i = PARAM_FIELD; i < PARAM_SELECT_ITEM; i++) { + CK(expr.arg_cnt_ > PARAM_SELECT_ITEM); + if (OB_SUCC(ret)) { + for (int i = PARAM_FIELD; i < PARAM_SELECT_ITEM; i++) { if (!expr.args_[i]->is_static_const_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("non-const format not supported", K(ret)); @@ -172,9 +172,9 @@ int ObExprToOutfileRow::to_outfile_str(const ObExpr &expr, ObEvalCtx &ctx, ObDat } if (OB_SUCC(ret)) { do { - int64_t pos = 0; - char *buf = out_info->buf_; - int64_t buf_len = out_info->buf_len_; + int64_t pos = 0; + char *buf = out_info->buf_; + int64_t buf_len = out_info->buf_len_; for (int64_t i = PARAM_SELECT_ITEM; OB_SUCC(ret) && i < expr.arg_cnt_; i++) { ObDatum &v = expr.locate_param_datum(ctx, i); const ObObjMeta &obj_meta = expr.args_[i]->obj_meta_; diff --git a/src/storage/ls/ob_ls_tx_service.cpp b/src/storage/ls/ob_ls_tx_service.cpp index 4c60c8a893..1e238771a9 100644 --- a/src/storage/ls/ob_ls_tx_service.cpp +++ b/src/storage/ls/ob_ls_tx_service.cpp @@ -176,14 +176,15 @@ int ObLSTxService::revert_store_ctx(storage::ObStoreCtx &store_ctx) const return ret; } -int ObLSTxService::check_scheduler_status(share::ObLSID ls_id) +int ObLSTxService::check_scheduler_status(int64_t &min_start_scn, + transaction::MinStartScnStatus &status) { int ret = OB_SUCCESS; if (OB_ISNULL(trans_service_)) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "not init", K(ret)); } else { - ret = trans_service_->check_scheduler_status(ls_id); + ret = mgr_->check_scheduler_status(min_start_scn, status); } return ret; } diff --git a/src/storage/ls/ob_ls_tx_service.h b/src/storage/ls/ob_ls_tx_service.h index 8ea16a2bd9..23f908f7c0 100644 --- a/src/storage/ls/ob_ls_tx_service.h +++ b/src/storage/ls/ob_ls_tx_service.h @@ -21,6 +21,7 @@ #include "storage/tablelock/ob_table_lock_common.h" #include "logservice/ob_log_base_type.h" #include "logservice/rcservice/ob_role_change_handler.h" +#include "storage/tx/ob_keep_alive_ls_handler.h" namespace oceanbase { @@ -91,7 +92,7 @@ public: // submit next log when all trx in frozen memtable have submitted log int traverse_trans_to_submit_next_log(); // check schduler status for gc - int check_scheduler_status(share::ObLSID ls_id); + int check_scheduler_status(int64_t &min_start_scn, transaction::MinStartScnStatus &status); // for ls gc // @return OB_SUCCESS, all the tx of this ls cleaned up diff --git a/src/storage/tx/ob_keep_alive_ls_handler.cpp b/src/storage/tx/ob_keep_alive_ls_handler.cpp index ee37871a3a..b7f8af3101 100644 --- a/src/storage/tx/ob_keep_alive_ls_handler.cpp +++ b/src/storage/tx/ob_keep_alive_ls_handler.cpp @@ -23,11 +23,11 @@ using namespace share; namespace transaction { -OB_SERIALIZE_MEMBER(ObKeepAliveLogBody, compat_bit_); +OB_SERIALIZE_MEMBER(ObKeepAliveLogBody, compat_bit_, min_start_scn_, min_start_status_); int64_t ObKeepAliveLogBody::get_max_serialize_size() { - ObKeepAliveLogBody max_log_body(INT64_MAX); + ObKeepAliveLogBody max_log_body(INT64_MAX, INT64_MAX, MinStartScnStatus::MAX); return max_log_body.get_serialize_size(); } @@ -35,8 +35,7 @@ int ObKeepAliveLSHandler::init(const ObLSID &ls_id, logservice::ObLogHandler *lo { int ret = OB_SUCCESS; logservice::ObLogBaseHeader base_header(ObLogBaseType::KEEP_ALIVE_LOG_BASE_TYPE, - ObReplayBarrierType::NO_NEED_BARRIER); - ObKeepAliveLogBody log_body; + ObReplayBarrierType::NO_NEED_BARRIER,INT64_MAX); submit_buf_len_ = base_header.get_serialize_size() + ObKeepAliveLogBody::get_max_serialize_size(); submit_buf_pos_ = 0; @@ -49,12 +48,6 @@ int ObKeepAliveLSHandler::init(const ObLSID &ls_id, logservice::ObLogHandler *lo ret = OB_ALLOCATE_MEMORY_FAILED; TRANS_LOG(WARN, "[Keep Alive] submit_buf alloc failed", K(ret), KP(submit_buf_), K(base_header)); - } else if (OB_FAIL(base_header.serialize(submit_buf_, submit_buf_len_, submit_buf_pos_))) { - TRANS_LOG(WARN, "[Keep Alive] serialize base header error", K(ret), - K(base_header.get_serialize_size()), K(submit_buf_len_), K(submit_buf_pos_)); - } else if (OB_FAIL(log_body.serialize(submit_buf_, submit_buf_len_, submit_buf_pos_))) { - TRANS_LOG(WARN, "[Keep Alive] serialize keep alive log body failed", K(ret), - K(log_body.get_serialize_size()), K(submit_buf_len_), K(submit_buf_pos_)); } else { ls_id_ = ls_id; is_busy_ = false; @@ -93,15 +86,19 @@ void ObKeepAliveLSHandler::reset() is_stopped_ = false; last_gts_ = 0; ls_id_.reset(); + tmp_keep_alive_info_.reset(); + durable_keep_alive_info_.reset(); stat_info_.reset(); } -int ObKeepAliveLSHandler::try_submit_log() +int ObKeepAliveLSHandler::try_submit_log(int64_t min_start_scn, MinStartScnStatus min_start_status) { int ret = OB_SUCCESS; palf::LSN lsn; int64_t ts_ns = 0; + SpinWLockGuard guard(lock_); + if (OB_ISNULL(log_handler_ptr_)) { stat_info_.other_error_cnt += 1; ret = OB_NOT_INIT; @@ -111,7 +108,7 @@ int ObKeepAliveLSHandler::try_submit_log() } else if (ATOMIC_LOAD(&is_busy_)) { stat_info_.cb_busy_cnt += 1; // ret = OB_TX_NOLOGCB; - } else if (!check_gts_()) { + } else if (!check_gts_() && min_start_status == MinStartScnStatus::UNKOWN) { stat_info_.near_to_gts_cnt += 1; // ret = OB_OP_NOT_ALLOW; } else { @@ -119,6 +116,9 @@ int ObKeepAliveLSHandler::try_submit_log() if (ATOMIC_LOAD(&is_stopped_)) { ATOMIC_STORE(&is_busy_, false); TRANS_LOG(INFO, "ls hash stopped", K(ret)); + } else if (OB_FAIL(serialize_keep_alive_log_(min_start_scn, min_start_status))) { + ATOMIC_STORE(&is_busy_, false); + TRANS_LOG(WARN, "[Keep Alive] serialize keep alive log failed", K(ret), K(ls_id_)); } else if (OB_FAIL(log_handler_ptr_->append(submit_buf_, submit_buf_pos_, last_gts_, true, this, lsn, ts_ns))) { stat_info_.other_error_cnt += 1; @@ -126,27 +126,99 @@ int ObKeepAliveLSHandler::try_submit_log() TRANS_LOG(WARN, "[Keep Alive] submit keep alive log failed", K(ret), K(ls_id_)); } else { stat_info_.submit_succ_cnt += 1; - stat_info_.last_log_ts_ = ts_ns; - stat_info_.last_lsn_ = lsn; + tmp_keep_alive_info_.log_ts_ = ts_ns; + tmp_keep_alive_info_.lsn_ = lsn; + tmp_keep_alive_info_.min_start_status_ = min_start_status; + tmp_keep_alive_info_.min_start_scn_ = min_start_scn; + TRANS_LOG(DEBUG, "[Keep Alive] submit keep alive log success", K(ret), K(ls_id_), + K(tmp_keep_alive_info_), K(min_start_scn), K(min_start_status)); } } return ret; } +int ObKeepAliveLSHandler::on_success() +{ + int ret = OB_SUCCESS; + + SpinWLockGuard guard(lock_); + + durable_keep_alive_info_ = tmp_keep_alive_info_; + stat_info_.stat_keepalive_info_ = durable_keep_alive_info_; + + ATOMIC_STORE(&is_busy_,false); + + return ret; +} + +int ObKeepAliveLSHandler::on_failure() +{ + int ret = OB_SUCCESS; + + ATOMIC_STORE(&is_busy_,false); + + return ret; +} + +int ObKeepAliveLSHandler::replay(const void *buffer, + const int64_t nbytes, + const palf::LSN &lsn, + const int64_t ts_ns) +{ + int ret = OB_SUCCESS; + + logservice::ObLogBaseHeader base_header; + ObKeepAliveLogBody log_body; + + int64_t pos = 0; + if (OB_FAIL(base_header.deserialize(static_cast(buffer), nbytes, pos))) { + TRANS_LOG(WARN, "[Keep Alive] deserialize base header error", K(ret), K(nbytes), K(pos)); + } else if (OB_FAIL(log_body.deserialize(static_cast(buffer), nbytes, pos))) { + TRANS_LOG(WARN, "[Keep Alive] deserialize log body error", K(ret), K(nbytes), K(pos)); + } else { + SpinWLockGuard guard(lock_); + durable_keep_alive_info_.log_ts_ = ts_ns; + durable_keep_alive_info_.lsn_ = lsn; + durable_keep_alive_info_.min_start_scn_ = log_body.get_min_start_scn(); + durable_keep_alive_info_.min_start_status_ = log_body.get_min_start_status(); + stat_info_.stat_keepalive_info_ = durable_keep_alive_info_; + } + + if (OB_SUCC(ret)) { + TRANS_LOG(DEBUG, "[Keep Alive] replay keep alive log success", K(ret), K(base_header), + K(log_body)); + } + + return ret; +} + void ObKeepAliveLSHandler::print_stat_info() { - + SpinRLockGuard guard(lock_); TRANS_LOG(INFO, "[Keep Alive Stat] LS Keep Alive Info", "tenant_id", MTL_ID(), "LS_ID", ls_id_, "Not_Master_Cnt", stat_info_.not_master_cnt, "Near_To_GTS_Cnt", stat_info_.near_to_gts_cnt, "Other_Error_Cnt", stat_info_.other_error_cnt, "Submit_Succ_Cnt", stat_info_.submit_succ_cnt, - "last_log_ts", stat_info_.last_log_ts_, - "last_lsn", stat_info_.last_lsn_, - "last_gts", last_gts_); - stat_info_.reset(); + "last_log_ts", stat_info_.stat_keepalive_info_.log_ts_, + "last_lsn", stat_info_.stat_keepalive_info_.lsn_, + "last_gts", last_gts_, + "min_start_scn", stat_info_.stat_keepalive_info_.min_start_scn_, + "min_start_status", stat_info_.stat_keepalive_info_.min_start_status_); + stat_info_.clear_cnt(); +} + +void ObKeepAliveLSHandler::get_min_start_scn(int64_t &min_start_scn, + int64_t &keep_alive_scn, + MinStartScnStatus &status) +{ + SpinRLockGuard guard(lock_); + + min_start_scn = durable_keep_alive_info_.min_start_scn_; + keep_alive_scn = durable_keep_alive_info_.log_ts_; + status = durable_keep_alive_info_.min_start_status_; } bool ObKeepAliveLSHandler::check_gts_() @@ -176,5 +248,31 @@ bool ObKeepAliveLSHandler::check_gts_() return need_submit; } +int ObKeepAliveLSHandler::serialize_keep_alive_log_(int64_t min_start_scn, MinStartScnStatus status) +{ + int ret = OB_SUCCESS; + + const int64_t replay_hint = ls_id_.hash(); + logservice::ObLogBaseHeader base_header(ObLogBaseType::KEEP_ALIVE_LOG_BASE_TYPE, + ObReplayBarrierType::NO_NEED_BARRIER, replay_hint); + ObKeepAliveLogBody log_body(1, min_start_scn, status); + + if (OB_ISNULL(submit_buf_)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "[Keep Alive] invalid submit buf", K(ret), KP(submit_buf_), K(submit_buf_len_), + K(submit_buf_pos_)); + } else if (OB_FALSE_IT(submit_buf_pos_ = 0)) { + } else if (OB_FAIL(base_header.serialize(submit_buf_, submit_buf_len_, submit_buf_pos_))) { + TRANS_LOG(WARN, "[Keep Alive] serialize base header error", K(ret), + K(base_header.get_serialize_size()), K(submit_buf_len_), K(submit_buf_pos_)); + } else if (OB_FAIL(log_body.serialize(submit_buf_, submit_buf_len_, submit_buf_pos_))) { + TRANS_LOG(WARN, "[Keep Alive] serialize keep alive log body failed", K(ret), + K(log_body.get_serialize_size()), K(submit_buf_len_), K(submit_buf_pos_)); + } + + TRANS_LOG(DEBUG, "[Keep Alive] serialize keep alive log", K(ret), K(ls_id_), K(log_body)); + return ret; +} + } // namespace transaction } // namespace oceanbase diff --git a/src/storage/tx/ob_keep_alive_ls_handler.h b/src/storage/tx/ob_keep_alive_ls_handler.h index 6579e65a40..45525169b3 100644 --- a/src/storage/tx/ob_keep_alive_ls_handler.h +++ b/src/storage/tx/ob_keep_alive_ls_handler.h @@ -29,20 +29,58 @@ class ObLogHandler; namespace transaction { + +enum class MinStartScnStatus +{ + UNKOWN = 0, // collect failed + NO_CTX, + HAS_CTX, + + MAX +}; + class ObKeepAliveLogBody { public: OB_UNIS_VERSION(1); public: - ObKeepAliveLogBody() : compat_bit_(1) {} - ObKeepAliveLogBody(int64_t compat_bit) : compat_bit_(compat_bit) {} + ObKeepAliveLogBody() + : compat_bit_(1), min_start_scn_(OB_INVALID_TIMESTAMP), + min_start_status_(MinStartScnStatus::UNKOWN) + {} + ObKeepAliveLogBody(int64_t compat_bit, int64_t min_start_scn, MinStartScnStatus min_status) + : compat_bit_(compat_bit), min_start_scn_(min_start_scn), min_start_status_(min_status) + {} static int64_t get_max_serialize_size(); - TO_STRING_KV(K_(compat_bit)); + int64_t get_min_start_scn() { return min_start_scn_; }; + MinStartScnStatus get_min_start_status() { return min_start_status_; } + + TO_STRING_KV(K_(compat_bit), K_(min_start_scn), K_(min_start_status)); private: int64_t compat_bit_; // not used, only for compatibility + int64_t min_start_scn_; + MinStartScnStatus min_start_status_; +}; + +struct KeepAliveLsInfo +{ + int64_t log_ts_; + palf::LSN lsn_; + int64_t min_start_scn_; + MinStartScnStatus min_start_status_; + + void reset() + { + log_ts_ = OB_INVALID_TIMESTAMP; + lsn_.reset(); + min_start_scn_ = OB_INVALID_TIMESTAMP; + min_start_status_ = MinStartScnStatus::UNKOWN; + } + + TO_STRING_KV(K(log_ts_), K(lsn_), K(min_start_scn_), K(min_start_status_)); }; class ObLSKeepAliveStatInfo @@ -56,8 +94,16 @@ public: near_to_gts_cnt = 0; other_error_cnt = 0; submit_succ_cnt = 0; - last_log_ts_ = 0; - last_lsn_.reset(); + stat_keepalive_info_.reset(); + } + + void clear_cnt() + { + cb_busy_cnt = 0; + not_master_cnt = 0; + near_to_gts_cnt = 0; + other_error_cnt = 0; + submit_succ_cnt = 0; } int64_t cb_busy_cnt; @@ -65,8 +111,7 @@ public: int64_t near_to_gts_cnt; int64_t other_error_cnt; int64_t submit_succ_cnt; - int64_t last_log_ts_; - palf::LSN last_lsn_; + KeepAliveLsInfo stat_keepalive_info_; private: // none @@ -91,22 +136,15 @@ public: void reset(); - int try_submit_log(); + int try_submit_log(int64_t min_start_scn, MinStartScnStatus status); void print_stat_info(); public: bool is_busy() { return ATOMIC_LOAD(&is_busy_); } - int on_success() {ATOMIC_STORE(&is_busy_, false); return OB_SUCCESS;} - int on_failure() {ATOMIC_STORE(&is_busy_, false); return OB_SUCCESS;} + int on_success(); + int on_failure(); - int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const int64_t ts_ns) - { - UNUSED(buffer); - UNUSED(nbytes); - UNUSED(lsn); - UNUSED(ts_ns); - return OB_SUCCESS; - } + int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const int64_t ts_ns); void switch_to_follower_forcedly() { ATOMIC_STORE(&is_master_, false); @@ -117,9 +155,13 @@ public: int64_t get_rec_log_ts() { return INT64_MAX; } int flush(int64_t rec_log_ts) { return OB_SUCCESS;} + void get_min_start_scn(int64_t &min_start_scn, int64_t &keep_alive_scn, MinStartScnStatus &status); private: bool check_gts_(); + int serialize_keep_alive_log_(int64_t min_start_scn, MinStartScnStatus status); private : + SpinRWLock lock_; + bool is_busy_; bool is_master_; bool is_stopped_; @@ -133,6 +175,9 @@ private : int64_t last_gts_; + KeepAliveLsInfo tmp_keep_alive_info_; + KeepAliveLsInfo durable_keep_alive_info_; + ObLSKeepAliveStatInfo stat_info_; }; diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index de6642d811..7adbc32b3b 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -998,7 +998,7 @@ int ObLSTxCtxMgr::get_min_undecided_log_ts(int64_t &log_ts) return ret; } -int ObLSTxCtxMgr::check_scheduler_status() +int ObLSTxCtxMgr::check_scheduler_status(int64_t &min_start_scn, MinStartScnStatus &status) { int ret = OB_SUCCESS; ObTimeGuard tg("ObLSTxCtxMgr::check_scheduler_status", 100000); @@ -1006,6 +1006,9 @@ int ObLSTxCtxMgr::check_scheduler_status() IteratePartCtxAskSchedulerStatusFunctor functor; if (OB_FAIL(ls_tx_ctx_map_.for_each(functor))) { TRANS_LOG(WARN, "for each transaction context error", KR(ret), "manager", *this); + } else { + min_start_scn = functor.get_min_start_scn(); + status = functor.get_min_start_status(); } return ret; @@ -2162,6 +2165,8 @@ int ObTxCtxMgr::check_scheduler_status(share::ObLSID ls_id) { int ret = OB_SUCCESS; ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; + int64_t min_start_scn = OB_INVALID_TIMESTAMP; + MinStartScnStatus min_status = MinStartScnStatus::UNKOWN; if (IS_NOT_INIT) { TRANS_LOG(WARN, "ObTxCtxMgr not inited"); @@ -2173,7 +2178,7 @@ int ObTxCtxMgr::check_scheduler_status(share::ObLSID ls_id) TRANS_LOG(WARN, "get participant transaction context mgr error", K(ls_id)); ret = OB_PARTITION_NOT_EXIST; } else { - if (OB_FAIL(ls_tx_ctx_mgr->check_scheduler_status())) { + if (OB_FAIL(ls_tx_ctx_mgr->check_scheduler_status(min_start_scn, min_status))) { TRANS_LOG(WARN, "check_scheduler_status failed", KR(ret), K(ls_id)); } else { TRANS_LOG(DEBUG, "check_scheduler_status success", K(ls_id)); diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index 5eae745c4c..ab5ded6ba7 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -21,6 +21,7 @@ #include "storage/tx/ob_tx_ls_log_writer.h" #include "storage/tx/ob_tx_retain_ctx_mgr.h" #include "storage/tablelock/ob_lock_table.h" +#include "storage/tx/ob_keep_alive_ls_handler.h" namespace oceanbase { @@ -287,7 +288,7 @@ public: ObTransID &block_tx_id); // check schduler status for tx gc - int check_scheduler_status(); + int check_scheduler_status(int64_t &min_start_scn, MinStartScnStatus &status); // Get this ObLSTxCtxMgr's ls_id_ const share::ObLSID &get_ls_id() const { return ls_id_; } diff --git a/src/storage/tx/ob_trans_functor.h b/src/storage/tx/ob_trans_functor.h index b7e8c177f9..a04e21e7bb 100644 --- a/src/storage/tx/ob_trans_functor.h +++ b/src/storage/tx/ob_trans_functor.h @@ -29,6 +29,7 @@ #include "storage/tx_table/ob_tx_table_define.h" #include "storage/tx/ob_tx_stat.h" #include "storage/tx/ob_trans_service.h" +#include "storage/tx/ob_keep_alive_ls_handler.h" namespace oceanbase { @@ -1093,6 +1094,7 @@ public: IteratePartCtxAskSchedulerStatusFunctor() { SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/); + min_start_scn_ = INT64_MAX; } ~IteratePartCtxAskSchedulerStatusFunctor() { PRINT_FUNC_STAT; } @@ -1103,14 +1105,43 @@ public: if (OB_UNLIKELY(!tx_id.is_valid() || OB_ISNULL(tx_ctx))) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid argument", KR(ret), K(tx_id), "ctx", OB_P(tx_ctx)); - } else if (OB_FAIL(tx_ctx->check_scheduler_status())) { - TRANS_LOG(WARN, "check scheduler status error", KR(ret), "ctx", *tx_ctx); } else { - // do nothing + int64_t ctx_start_scn = tx_ctx->get_start_log_ts(); + if (ctx_start_scn < 0) { + ctx_start_scn = INT64_MAX; + } + if (OB_FALSE_IT(min_start_scn_ = MIN(min_start_scn_, ctx_start_scn))) { + // do nothing + } else if (OB_FAIL(tx_ctx->check_scheduler_status())) { + TRANS_LOG(WARN, "check scheduler status error", KR(ret), "ctx", *tx_ctx); + } else { + // do nothing + } + } + + if (OB_FAIL(ret)) { + min_start_scn_ = OB_INVALID_TIMESTAMP; } return true; } + + int64_t get_min_start_scn() { return min_start_scn_; } + + MinStartScnStatus get_min_start_status() + { + MinStartScnStatus start_status = MinStartScnStatus::HAS_CTX; + + if (OB_INVALID_TIMESTAMP == min_start_scn_) { + start_status = MinStartScnStatus::UNKOWN; + } else if (INT64_MAX == min_start_scn_) { + start_status = MinStartScnStatus::NO_CTX; + } + return start_status; + } + +private: + int64_t min_start_scn_; }; } // transaction diff --git a/src/storage/tx/ob_tx_loop_worker.cpp b/src/storage/tx/ob_tx_loop_worker.cpp index fdcc138789..25138535d8 100644 --- a/src/storage/tx/ob_tx_loop_worker.cpp +++ b/src/storage/tx/ob_tx_loop_worker.cpp @@ -125,6 +125,7 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx) { int ret = OB_SUCCESS; int iter_ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; ObSharedGuard ls_iter_guard; ObLSIterator *iter_ptr = nullptr; @@ -146,16 +147,45 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx) cur_ls_ptr = nullptr; while (OB_SUCCESS == (iter_ret = iter_ptr->get_next(cur_ls_ptr))) { - // keep alive, interval = 100ms - do_keep_alive_(cur_ls_ptr); + int64_t min_start_scn = OB_INVALID_TIMESTAMP; + MinStartScnStatus status = MinStartScnStatus::UNKOWN; + common::ObRole role; + int64_t base_proposal_id, proposal_id; + + if (OB_TMP_FAIL(cur_ls_ptr->get_log_handler()->get_role(role, base_proposal_id))) { + TRANS_LOG(WARN, "get role failed", K(tmp_ret), K(cur_ls_ptr->get_ls_id())); + status = MinStartScnStatus::UNKOWN; + } else if (role == common::ObRole::FOLLOWER) { + status = MinStartScnStatus::UNKOWN; + } // tx gc, interval = 15s if (can_tx_gc) { // TODO shanyan.g close ctx gc temporarily because of logical bug // https://work.aone.alibaba-inc.com/issue/42892101 - do_tx_gc_(cur_ls_ptr); + do_tx_gc_(cur_ls_ptr, min_start_scn, status); } + if(MinStartScnStatus::UNKOWN != status) { + // do nothing + } else if (OB_TMP_FAIL(cur_ls_ptr->get_log_handler()->get_role(role, proposal_id))) { + TRANS_LOG(WARN, "get role failed", K(tmp_ret), K(cur_ls_ptr->get_ls_id())); + status = MinStartScnStatus::UNKOWN; + } else if (role == common::ObRole::FOLLOWER) { + status = MinStartScnStatus::UNKOWN; + } else if (base_proposal_id != proposal_id) { + status = MinStartScnStatus::UNKOWN; + } + + if (MinStartScnStatus::UNKOWN == status) { + min_start_scn = OB_INVALID_TIMESTAMP; + } else if (MinStartScnStatus::NO_CTX == status) { + min_start_scn = 0; + } + + // keep alive, interval = 100ms + do_keep_alive_(cur_ls_ptr, min_start_scn, status); + // TODO shanyan.g // 1) We use max(max_commit_ts, gts_cache) as read snapshot, // but now we adopt updating max_commit_ts periodly to avoid getting gts cache cost @@ -171,11 +201,11 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx) return ret; } -void ObTxLoopWorker::do_keep_alive_(ObLS *ls_ptr) +void ObTxLoopWorker::do_keep_alive_(ObLS *ls_ptr, int64_t min_start_scn, MinStartScnStatus status) { int ret = OB_SUCCESS; - if (ls_ptr->get_keep_alive_ls_handler()->try_submit_log()) { + if (ls_ptr->get_keep_alive_ls_handler()->try_submit_log(min_start_scn, status)) { TRANS_LOG(WARN, "[Tx Loop Worker] try submit keep alive log failed", K(ret)); } else if (REACH_TIME_INTERVAL(KEEP_ALIVE_PRINT_INFO_INTERVAL)) { ls_ptr->get_keep_alive_ls_handler()->print_stat_info(); @@ -186,11 +216,11 @@ void ObTxLoopWorker::do_keep_alive_(ObLS *ls_ptr) UNUSED(ret); } -void ObTxLoopWorker::do_tx_gc_(ObLS *ls_ptr) +void ObTxLoopWorker::do_tx_gc_(ObLS *ls_ptr, int64_t &min_start_scn, MinStartScnStatus &status) { int ret = OB_SUCCESS; - if (OB_FAIL(ls_ptr->get_tx_svr()->check_scheduler_status(ls_ptr->get_ls_id()))) { + if (OB_FAIL(ls_ptr->get_tx_svr()->check_scheduler_status(min_start_scn, status))) { TRANS_LOG(WARN, "[Tx Loop Worker] check tx scheduler failed", K(ret), K(MTL_ID()), K(*ls_ptr)); } else { TRANS_LOG(INFO, "[Tx Loop Worker] check tx scheduler success", K(MTL_ID()), K(*ls_ptr)); diff --git a/src/storage/tx/ob_tx_loop_worker.h b/src/storage/tx/ob_tx_loop_worker.h index e8840fd6d8..20a5e47eac 100644 --- a/src/storage/tx/ob_tx_loop_worker.h +++ b/src/storage/tx/ob_tx_loop_worker.h @@ -56,8 +56,8 @@ public: private: int scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx); - void do_keep_alive_(ObLS *ls); // 100ms - void do_tx_gc_(ObLS *ls); // 15s + void do_keep_alive_(ObLS *ls, int64_t min_start_scn, MinStartScnStatus status); // 100ms + void do_tx_gc_(ObLS *ls, int64_t &min_start_scn, MinStartScnStatus &status); // 15s void update_max_commit_ts_(ObLS *ls); void do_retain_ctx_gc_(ObLS * ls); // 15s