BUGFIX: add defensive code for ObSequence

This commit is contained in:
obdev
2024-06-17 14:20:34 +00:00
committed by ob-robot
parent b859aea8d6
commit fd886d1ef2
6 changed files with 77 additions and 29 deletions

View File

@ -13,6 +13,7 @@
#ifndef OB_SEQUENCE_H_
#define OB_SEQUENCE_H_
#include "lib/literals/ob_literals.h"
#include "lib/ob_define.h"
#include "lib/utility/ob_macro_utils.h"
#include "common/ob_clock_generator.h"
@ -23,14 +24,16 @@ namespace common
{
class ObSequence
{
static const int64_t MAX_STEP_US = 1_day;
public:
static int64_t get_max_seq_no();
static int64_t inc_and_get_max_seq_no();
static int64_t get_and_inc_max_seq_no();
static int64_t get_and_inc_max_seq_no(int64_t n);
static int get_and_inc_max_seq_no(const int64_t n, int64_t &seq);
static void inc();
static void update_max_seq_no(int64_t seq_no);
private:
// change us to ns
static int64_t max_seq_no_ CACHE_ALIGNED;
ObSequence() = delete;
};
@ -50,9 +53,18 @@ inline int64_t ObSequence::get_and_inc_max_seq_no()
return ATOMIC_FAA(&max_seq_no_, 1);
}
inline int64_t ObSequence::get_and_inc_max_seq_no(int64_t n)
inline int ObSequence::get_and_inc_max_seq_no(const int64_t n, int64_t &seq)
{
return ATOMIC_FAA(&max_seq_no_, n);
int ret = OB_SUCCESS;
if (n > MAX_STEP_US || n < 0) {
ret = OB_ERR_UNEXPECTED;
if (REACH_TIME_INTERVAL(10_s)) {
COMMON_LOG(ERROR, "seq no update encounter fatal error.", K(ret), K(n));
}
} else {
seq = ATOMIC_FAA(&max_seq_no_, n);
}
return ret;
}
inline void ObSequence::inc()
@ -62,9 +74,15 @@ inline void ObSequence::inc()
inline void ObSequence::update_max_seq_no(int64_t seq_no)
{
int ret = OB_SUCCESS;
int64_t now = ObClockGenerator::getClock();
int64_t new_seq_no = std::max(now, seq_no + 1);
if (new_seq_no - now > MAX_STEP_US) {
ret = OB_ERR_UNEXPECTED;
if (REACH_TIME_INTERVAL(10_s)) {
COMMON_LOG(WARN, "seq no is far from physical time.", K(ret), K(now), K(seq_no));
}
}
inc_update(&max_seq_no_, new_seq_no);
}

View File

@ -13,6 +13,7 @@
#include <gtest/gtest.h>
#include "lib/ob_errno.h"
#include "lib/oblog/ob_log.h"
#define private public
#include "common/storage/ob_sequence.h"
#include "lib/time/ob_time_utility.h"
@ -67,6 +68,29 @@ TEST_F(TestObSequence, update_max_seq_no)
seq_no = ObSequence::get_max_seq_no();
ObSequence::update_max_seq_no(seq_no + 1000000);
EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no + 1000001);
// just print error log but update success
ObSequence::update_max_seq_no(seq_no + 1_day);
EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no + 1_day + 1);
TRANS_LOG(INFO, "sequence", K(ObSequence::get_max_seq_no()));
}
TEST_F(TestObSequence, get_and_inc_max_seq_no)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
int64_t seq_no = 0;
EXPECT_EQ(OB_SUCCESS, ObSequence::get_and_inc_max_seq_no(1, seq_no));
EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no + 1);
EXPECT_EQ(OB_SUCCESS, ObSequence::get_and_inc_max_seq_no(0, seq_no));
EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no);
EXPECT_NE(OB_SUCCESS, ObSequence::get_and_inc_max_seq_no(-1, seq_no));
EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no);
EXPECT_NE(OB_SUCCESS, ObSequence::get_and_inc_max_seq_no(1_day + 1, seq_no));
EXPECT_EQ(ObSequence::get_max_seq_no(), seq_no);
}
}//end of unittest

View File

@ -249,9 +249,12 @@ int ObExtInfoCbRegister::register_cb(
LOG_WARN("init_header_ fail", K(ret), K(ext_info_data));
} else if (OB_FAIL(build_data_iter(ext_info_data))) {
LOG_WARN("build data iter fail", K(ret));
} else if (FALSE_IT(seq_no_cnt_ = data_size_/OB_EXT_INFO_LOG_BLOCK_MAX_SIZE + 1)) {
} else if (OB_FAIL(tx_desc->get_and_inc_tx_seq(parent_seq_no.get_branch(),
seq_no_cnt_,
seq_no_st_))) {
LOG_WARN("get and inc tx seq failed", K(ret), K_(seq_no_cnt));
} else {
seq_no_cnt_ = data_size_/OB_EXT_INFO_LOG_BLOCK_MAX_SIZE + 1;
seq_no_st_ = tx_desc->get_and_inc_tx_seq(parent_seq_no.get_branch(), seq_no_cnt_);
transaction::ObTxSEQ seq_no_cur = seq_no_st_;
ObString data;
ObIAllocator &allocator = lob_mngr->get_ext_info_log_allocator();

View File

@ -1464,7 +1464,11 @@ int ObLobManager::init_out_row_ctx(
// use shema chunk size for full insert and default
N += ((len + param.update_len_) / (param.get_schema_chunk_size() / 2) + 2);
if (nullptr != param.tx_desc_) {
param.seq_no_st_ = param.tx_desc_->get_and_inc_tx_seq(param.parent_seq_no_.get_branch(), N);
if (OB_FAIL(param.tx_desc_->get_and_inc_tx_seq(param.parent_seq_no_.get_branch(),
N,
param.seq_no_st_))) {
LOG_WARN("get and inc tx seq failed", K(ret), K(N));
}
} else {
// do nothing, for direct load has no tx desc, do not use seq no
LOG_DEBUG("tx_desc is null", K(param));
@ -2467,11 +2471,14 @@ int ObLobManager::process_diff(ObLobAccessParam& param, ObLobLocatorV2& delta_lo
LOG_WARN("chunk size not match", K(ret), K(iter.get_chunk_size()), K(store_chunk_size), KPC(param.lob_common_), K(param));
} else {
int64_t seq_cnt = iter.get_modified_chunk_cnt();
param.seq_no_st_ = param.tx_desc_->get_and_inc_tx_seq(param.parent_seq_no_.get_branch(), seq_cnt);
param.used_seq_cnt_ = 0;
param.total_seq_cnt_ = seq_cnt;
param.op_type_ = ObLobDataOutRowCtx::OpType::DIFF;
if (OB_FAIL(init_out_row_ctx(param, 0, param.op_type_))) {
if (OB_FAIL(param.tx_desc_->get_and_inc_tx_seq(param.parent_seq_no_.get_branch(),
seq_cnt,
param.seq_no_st_))) {
LOG_WARN("get and inc tx seq failed", K(ret), K(seq_cnt));
} else if (OB_FAIL(init_out_row_ctx(param, 0, param.op_type_))) {
LOG_WARN("init lob data out row ctx failed", K(ret));
}

View File

@ -808,7 +808,7 @@ LST_DO(DEF_FREE_ROUTE_DECODE, (;), static, dynamic, parts, extra);
void clear_interrupt() { flags_.INTERRUPTED_ = false; }
void mark_part_abort(const ObTransID tx_id, const int abort_cause);
int64_t get_coord_epoch() const;
ObTxSEQ get_and_inc_tx_seq(int16_t branch, int N) const;
int get_and_inc_tx_seq(const int16_t branch, const int N, ObTxSEQ &tx_seq) const;
ObTxSEQ inc_and_get_tx_seq(int16_t branch) const;
ObTxSEQ get_tx_seq(int64_t seq_abs = 0) const;
ObTxSEQ get_min_tx_seq() const;
@ -1072,14 +1072,20 @@ inline ObTxSEQ ObTxDesc::get_min_tx_seq() const
}
}
inline ObTxSEQ ObTxDesc::get_and_inc_tx_seq(int16_t branch, int N) const
inline int ObTxDesc::get_and_inc_tx_seq(const int16_t branch,
const int N,
ObTxSEQ &tx_seq) const
{
int64_t seq = ObSequence::get_and_inc_max_seq_no(N);
if (OB_LIKELY(support_branch())) {
return ObTxSEQ(seq - seq_base_, branch);
int ret = OB_SUCCESS;
int64_t seq = 0;
if (OB_FAIL(ObSequence::get_and_inc_max_seq_no(N, seq))) {
TRANS_LOG(ERROR, "inc max seq no failed", K(ret), K(N));
} else if (OB_LIKELY(support_branch())) {
tx_seq = ObTxSEQ(seq - seq_base_, branch);
} else {
return ObTxSEQ::mk_v0(seq);
tx_seq = ObTxSEQ::mk_v0(seq);
}
return ret;
}
inline ObTxSEQ ObTxDesc::inc_and_get_tx_seq(int16_t branch) const

View File

@ -523,11 +523,9 @@ int ObTransService::update_logic_clock_(const int64_t logic_clock, const ObTxDes
{
// if logic clock drift too much, disconnect required
int ret = OB_SUCCESS;
int64_t one_day_us = 24L * 3600 * 1000 * 1000;
if (logic_clock - ObClockGenerator::getClock() > one_day_us) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "logic clock is fast more than 1 day", KR(ret), K(logic_clock));
} else if (check_fallback && (ObClockGenerator::getClock() - logic_clock > one_day_us)) {
if (logic_clock - ObClockGenerator::getClock() > 1_day ) {
TRANS_LOG(WARN, "logic clock is fast more than 1 day", K(logic_clock), KPC(tx));
} else if (check_fallback && (ObClockGenerator::getClock() - logic_clock > 1_day)) {
TRANS_LOG(WARN, "logic clock is slow more than 1 day", K(logic_clock), KPC(tx));
if (OB_NOT_NULL(tx)) { tx->print_trace_(); }
}
@ -756,15 +754,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
#define TXN_ENCODE_LOGIC_CLOCK \
if (OB_SUCC(ret)) { \
const int64_t logic_clock = ObSequence::inc_and_get_max_seq_no(); \
const int64_t one_day_us = 24L * 3600 * 1000 * 1000; \
const int64_t cur_us = ObClockGenerator::getClock(); \
if (cur_us - logic_clock > one_day_us) { \
ret = OB_ERR_UNEXPECTED; \
TRANS_LOG(ERROR, "logic-clock slow than one day", K(ret), K(logic_clock)); \
} else if (logic_clock - cur_us > one_day_us) { \
ret = OB_ERR_UNEXPECTED; \
TRANS_LOG(ERROR, "logic-clock fast than one day", K(ret), K(logic_clock)); \
} else if (OB_FAIL(encode_i64(buf, len, pos, logic_clock))) { \
if (OB_FAIL(encode_i64(buf, len, pos, logic_clock))) { \
TRANS_LOG(WARN, "encode logic clock fail", K(ret)); \
} \
}