diff --git a/deps/oblib/src/common/storage/ob_sequence.h b/deps/oblib/src/common/storage/ob_sequence.h index cce557a32d..b1a0d7b0b8 100644 --- a/deps/oblib/src/common/storage/ob_sequence.h +++ b/deps/oblib/src/common/storage/ob_sequence.h @@ -13,6 +13,7 @@ #ifndef OB_SEQUENCE_H_ #define OB_SEQUENCE_H_ +#include "lib/literals/ob_literals.h" #include "lib/ob_define.h" #include "lib/utility/ob_macro_utils.h" #include "common/ob_clock_generator.h" @@ -23,14 +24,16 @@ namespace common { class ObSequence { + static const int64_t MAX_STEP_US = 1_day; public: static int64_t get_max_seq_no(); static int64_t inc_and_get_max_seq_no(); static int64_t get_and_inc_max_seq_no(); - static int64_t get_and_inc_max_seq_no(int64_t n); + static int get_and_inc_max_seq_no(const int64_t n, int64_t &seq); static void inc(); static void update_max_seq_no(int64_t seq_no); private: + // change us to ns static int64_t max_seq_no_ CACHE_ALIGNED; ObSequence() = delete; }; @@ -50,9 +53,18 @@ inline int64_t ObSequence::get_and_inc_max_seq_no() return ATOMIC_FAA(&max_seq_no_, 1); } -inline int64_t ObSequence::get_and_inc_max_seq_no(int64_t n) +inline int ObSequence::get_and_inc_max_seq_no(const int64_t n, int64_t &seq) { - return ATOMIC_FAA(&max_seq_no_, n); + int ret = OB_SUCCESS; + if (n > MAX_STEP_US || n < 0) { + ret = OB_ERR_UNEXPECTED; + if (REACH_TIME_INTERVAL(10_s)) { + COMMON_LOG(ERROR, "seq no update encounter fatal error.", K(ret), K(n)); + } + } else { + seq = ATOMIC_FAA(&max_seq_no_, n); + } + return ret; } inline void ObSequence::inc() @@ -62,9 +74,15 @@ inline void ObSequence::inc() inline void ObSequence::update_max_seq_no(int64_t seq_no) { + int ret = OB_SUCCESS; int64_t now = ObClockGenerator::getClock(); int64_t new_seq_no = std::max(now, seq_no + 1); - + if (new_seq_no - now > MAX_STEP_US) { + ret = OB_ERR_UNEXPECTED; + if (REACH_TIME_INTERVAL(10_s)) { + COMMON_LOG(WARN, "seq no is far from physical time.", K(ret), K(now), K(seq_no)); + } + } inc_update(&max_seq_no_, new_seq_no); } diff --git a/deps/oblib/unittest/common/test_ob_sequence.cpp b/deps/oblib/unittest/common/test_ob_sequence.cpp index f234b519b8..69572ed2f9 100644 --- a/deps/oblib/unittest/common/test_ob_sequence.cpp +++ b/deps/oblib/unittest/common/test_ob_sequence.cpp @@ -13,6 +13,7 @@ #include #include "lib/ob_errno.h" #include "lib/oblog/ob_log.h" +#define private public #include "common/storage/ob_sequence.h" #include "lib/time/ob_time_utility.h" @@ -67,6 +68,29 @@ TEST_F(TestObSequence, update_max_seq_no) seq_no = ObSequence::get_max_seq_no(); ObSequence::update_max_seq_no(seq_no + 1000000); EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no + 1000001); + + // just print error log but update success + ObSequence::update_max_seq_no(seq_no + 1_day); + EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no + 1_day + 1); + TRANS_LOG(INFO, "sequence", K(ObSequence::get_max_seq_no())); +} + +TEST_F(TestObSequence, get_and_inc_max_seq_no) +{ + TRANS_LOG(INFO, "called", "func", test_info_->name()); + + int64_t seq_no = 0; + EXPECT_EQ(OB_SUCCESS, ObSequence::get_and_inc_max_seq_no(1, seq_no)); + EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no + 1); + + EXPECT_EQ(OB_SUCCESS, ObSequence::get_and_inc_max_seq_no(0, seq_no)); + EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no); + + EXPECT_NE(OB_SUCCESS, ObSequence::get_and_inc_max_seq_no(-1, seq_no)); + EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no); + + EXPECT_NE(OB_SUCCESS, ObSequence::get_and_inc_max_seq_no(1_day + 1, seq_no)); + EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no); } }//end of unittest diff --git a/src/storage/lob/ob_ext_info_callback.cpp b/src/storage/lob/ob_ext_info_callback.cpp index d18377a723..920176fc4b 100644 --- a/src/storage/lob/ob_ext_info_callback.cpp +++ b/src/storage/lob/ob_ext_info_callback.cpp @@ -249,9 +249,12 @@ int ObExtInfoCbRegister::register_cb( LOG_WARN("init_header_ fail", K(ret), K(ext_info_data)); } else if (OB_FAIL(build_data_iter(ext_info_data))) { LOG_WARN("build data iter fail", K(ret)); + } else if (FALSE_IT(seq_no_cnt_ = data_size_/OB_EXT_INFO_LOG_BLOCK_MAX_SIZE + 1)) { + } else if (OB_FAIL(tx_desc->get_and_inc_tx_seq(parent_seq_no.get_branch(), + seq_no_cnt_, + seq_no_st_))) { + LOG_WARN("get and inc tx seq failed", K(ret), K_(seq_no_cnt)); } else { - seq_no_cnt_ = data_size_/OB_EXT_INFO_LOG_BLOCK_MAX_SIZE + 1; - seq_no_st_ = tx_desc->get_and_inc_tx_seq(parent_seq_no.get_branch(), seq_no_cnt_); transaction::ObTxSEQ seq_no_cur = seq_no_st_; ObString data; ObIAllocator &allocator = lob_mngr->get_ext_info_log_allocator(); diff --git a/src/storage/lob/ob_lob_manager.cpp b/src/storage/lob/ob_lob_manager.cpp index 6d299cfdab..411768cd01 100644 --- a/src/storage/lob/ob_lob_manager.cpp +++ b/src/storage/lob/ob_lob_manager.cpp @@ -1464,7 +1464,11 @@ int ObLobManager::init_out_row_ctx( // use shema chunk size for full insert and default N += ((len + param.update_len_) / (param.get_schema_chunk_size() / 2) + 2); if (nullptr != param.tx_desc_) { - param.seq_no_st_ = param.tx_desc_->get_and_inc_tx_seq(param.parent_seq_no_.get_branch(), N); + if (OB_FAIL(param.tx_desc_->get_and_inc_tx_seq(param.parent_seq_no_.get_branch(), + N, + param.seq_no_st_))) { + LOG_WARN("get and inc tx seq failed", K(ret), K(N)); + } } else { // do nothing, for direct load has no tx desc, do not use seq no LOG_DEBUG("tx_desc is null", K(param)); @@ -2467,11 +2471,14 @@ int ObLobManager::process_diff(ObLobAccessParam& param, ObLobLocatorV2& delta_lo LOG_WARN("chunk size not match", K(ret), K(iter.get_chunk_size()), K(store_chunk_size), KPC(param.lob_common_), K(param)); } else { int64_t seq_cnt = iter.get_modified_chunk_cnt(); - param.seq_no_st_ = param.tx_desc_->get_and_inc_tx_seq(param.parent_seq_no_.get_branch(), seq_cnt); param.used_seq_cnt_ = 0; param.total_seq_cnt_ = seq_cnt; param.op_type_ = ObLobDataOutRowCtx::OpType::DIFF; - if (OB_FAIL(init_out_row_ctx(param, 0, param.op_type_))) { + if (OB_FAIL(param.tx_desc_->get_and_inc_tx_seq(param.parent_seq_no_.get_branch(), + seq_cnt, + param.seq_no_st_))) { + LOG_WARN("get and inc tx seq failed", K(ret), K(seq_cnt)); + } else if (OB_FAIL(init_out_row_ctx(param, 0, param.op_type_))) { LOG_WARN("init lob data out row ctx failed", K(ret)); } diff --git a/src/storage/tx/ob_trans_define_v4.h b/src/storage/tx/ob_trans_define_v4.h index 27ac0ec2c8..ab16b7dee2 100644 --- a/src/storage/tx/ob_trans_define_v4.h +++ b/src/storage/tx/ob_trans_define_v4.h @@ -808,7 +808,7 @@ LST_DO(DEF_FREE_ROUTE_DECODE, (;), static, dynamic, parts, extra); void clear_interrupt() { flags_.INTERRUPTED_ = false; } void mark_part_abort(const ObTransID tx_id, const int abort_cause); int64_t get_coord_epoch() const; - ObTxSEQ get_and_inc_tx_seq(int16_t branch, int N) const; + int get_and_inc_tx_seq(const int16_t branch, const int N, ObTxSEQ &tx_seq) const; ObTxSEQ inc_and_get_tx_seq(int16_t branch) const; ObTxSEQ get_tx_seq(int64_t seq_abs = 0) const; ObTxSEQ get_min_tx_seq() const; @@ -1072,14 +1072,20 @@ inline ObTxSEQ ObTxDesc::get_min_tx_seq() const } } -inline ObTxSEQ ObTxDesc::get_and_inc_tx_seq(int16_t branch, int N) const +inline int ObTxDesc::get_and_inc_tx_seq(const int16_t branch, + const int N, + ObTxSEQ &tx_seq) const { - int64_t seq = ObSequence::get_and_inc_max_seq_no(N); - if (OB_LIKELY(support_branch())) { - return ObTxSEQ(seq - seq_base_, branch); + int ret = OB_SUCCESS; + int64_t seq = 0; + if (OB_FAIL(ObSequence::get_and_inc_max_seq_no(N, seq))) { + TRANS_LOG(ERROR, "inc max seq no failed", K(ret), K(N)); + } else if (OB_LIKELY(support_branch())) { + tx_seq = ObTxSEQ(seq - seq_base_, branch); } else { - return ObTxSEQ::mk_v0(seq); + tx_seq = ObTxSEQ::mk_v0(seq); } + return ret; } inline ObTxSEQ ObTxDesc::inc_and_get_tx_seq(int16_t branch) const diff --git a/src/storage/tx/ob_tx_free_route.cpp b/src/storage/tx/ob_tx_free_route.cpp index 35c5854237..09f1d50f37 100644 --- a/src/storage/tx/ob_tx_free_route.cpp +++ b/src/storage/tx/ob_tx_free_route.cpp @@ -523,11 +523,9 @@ int ObTransService::update_logic_clock_(const int64_t logic_clock, const ObTxDes { // if logic clock drift too much, disconnect required int ret = OB_SUCCESS; - int64_t one_day_us = 24L * 3600 * 1000 * 1000; - if (logic_clock - ObClockGenerator::getClock() > one_day_us) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "logic clock is fast more than 1 day", KR(ret), K(logic_clock)); - } else if (check_fallback && (ObClockGenerator::getClock() - logic_clock > one_day_us)) { + if (logic_clock - ObClockGenerator::getClock() > 1_day ) { + TRANS_LOG(WARN, "logic clock is fast more than 1 day", K(logic_clock), KPC(tx)); + } else if (check_fallback && (ObClockGenerator::getClock() - logic_clock > 1_day)) { TRANS_LOG(WARN, "logic clock is slow more than 1 day", K(logic_clock), KPC(tx)); if (OB_NOT_NULL(tx)) { tx->print_trace_(); } } @@ -756,15 +754,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id #define TXN_ENCODE_LOGIC_CLOCK \ if (OB_SUCC(ret)) { \ const int64_t logic_clock = ObSequence::inc_and_get_max_seq_no(); \ - const int64_t one_day_us = 24L * 3600 * 1000 * 1000; \ - const int64_t cur_us = ObClockGenerator::getClock(); \ - if (cur_us - logic_clock > one_day_us) { \ - ret = OB_ERR_UNEXPECTED; \ - TRANS_LOG(ERROR, "logic-clock slow than one day", K(ret), K(logic_clock)); \ - } else if (logic_clock - cur_us > one_day_us) { \ - ret = OB_ERR_UNEXPECTED; \ - TRANS_LOG(ERROR, "logic-clock fast than one day", K(ret), K(logic_clock)); \ - } else if (OB_FAIL(encode_i64(buf, len, pos, logic_clock))) { \ + if (OB_FAIL(encode_i64(buf, len, pos, logic_clock))) { \ TRANS_LOG(WARN, "encode logic clock fail", K(ret)); \ } \ }