[BUG] fix the defensive code when memtable set start scn if backoffed

This commit is contained in:
Handora
2024-04-30 13:18:36 +00:00
committed by ob-robot
parent 3ca0506f99
commit 108e3f48fd
10 changed files with 190 additions and 77 deletions

View File

@ -113,7 +113,7 @@ int ObMemtable::flush(share::ObLSID ls_id)
int ret = OB_SUCCESS;
int64_t cur_time = ObTimeUtility::current_time();
if (is_flushed_) {
if (get_is_flushed()) {
ret = OB_NO_NEED_UPDATE;
} else {
if (mt_stat_.create_flush_dag_time_ == 0 &&

View File

@ -1511,7 +1511,7 @@ bool ObDDLKV::is_frozen_memtable()
K(logstream_freeze_clock),
KPC(this));
}
const bool bool_ret = logstream_freeze_clock > get_freeze_clock() || is_tablet_freeze_;
const bool bool_ret = logstream_freeze_clock > get_freeze_clock() || get_is_tablet_freeze();
if (bool_ret && 0 == get_frozen_time()) {
set_frozen_time(ObClockGenerator::getClock());
@ -1574,8 +1574,12 @@ void ObDDLKV::print_ready_for_flush()
void ObDDLKV::set_allow_freeze(const bool allow_freeze)
{
int ret = OB_SUCCESS;
if (allow_freeze_ != allow_freeze) {
ATOMIC_STORE(&allow_freeze_, allow_freeze);
if (get_allow_freeze_() != allow_freeze) {
if (allow_freeze) {
set_allow_freeze_();
} else {
clear_allow_freeze_();
}
}
}

View File

@ -1763,13 +1763,18 @@ int64_t ObMemtable::get_btree_alloc_memory() const
void ObMemtable::set_allow_freeze(const bool allow_freeze)
{
int ret = OB_SUCCESS;
if (allow_freeze_ != allow_freeze) {
if (get_allow_freeze_() != allow_freeze) {
const common::ObTabletID tablet_id = key_.tablet_id_;
const int64_t retire_clock = local_allocator_.get_retire_clock();
ObTenantFreezer *freezer = nullptr;
freezer = MTL(ObTenantFreezer *);
ATOMIC_STORE(&allow_freeze_, allow_freeze);
if (allow_freeze) {
set_allow_freeze_();
} else {
clear_allow_freeze_();
}
if (allow_freeze) {
if (OB_FAIL(freezer->unset_tenant_slow_freeze(tablet_id))) {
LOG_WARN("unset tenant slow freeze failed.", KPC(this));
@ -1871,7 +1876,7 @@ bool ObMemtable::ready_for_flush_()
} else if (OB_FAIL(handle.get_data_memtable(first_frozen_memtable))) {
TRANS_LOG(WARN, "fail to get memtable", K(ret));
} else if (first_frozen_memtable == this) {
(void)unset_logging_blocked();
(void)clear_logging_blocked();
TRANS_LOG(WARN, "unset logging_block in ready_for_flush", KPC(this));
}
}
@ -3115,7 +3120,7 @@ bool ObMemtable::is_frozen_memtable()
K(logstream_freeze_clock),
KPC(this));
}
const bool bool_ret = logstream_freeze_clock > get_freeze_clock() || is_tablet_freeze_;
const bool bool_ret = logstream_freeze_clock > get_freeze_clock() || get_is_tablet_freeze();
if (bool_ret && 0 == get_frozen_time()) {
set_frozen_time(ObClockGenerator::getClock());

View File

@ -131,7 +131,6 @@ public:
virtual int64_t dec_write_ref() { return OB_INVALID_COUNT; }
virtual int64_t get_write_ref() const { return OB_INVALID_COUNT; }
virtual uint32_t get_freeze_flag() { return 0; }
virtual bool is_tablet_freeze() { return false; }
virtual int64_t get_occupied_size() const { return 0; }
virtual int estimate_phy_size(const ObStoreRowkey *start_key,
const ObStoreRowkey *end_key,

View File

@ -30,7 +30,7 @@ int ObITabletMemtable::inc_unsubmitted_cnt()
int64_t unsubmitted_cnt = inc_unsubmitted_cnt_();
TRANS_LOG(DEBUG, "inc_unsubmitted_cnt", K(ls_id), KPC(this), K(lbt()));
if (ATOMIC_LOAD(&unset_active_memtable_logging_blocked_)) {
if (get_unset_active_memtable_logging_blocked()) {
TRANS_LOG(WARN, "cannot inc unsubmitted_cnt", K(unsubmitted_cnt), K(ls_id), KPC(this));
}
@ -198,8 +198,19 @@ int ObITabletMemtable::set_start_scn(const share::SCN start_scn)
} else if (ObScnRange::MAX_SCN == start_scn) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(start_scn));
} else if (start_scn >= get_end_scn()
|| (max_end_scn_ != SCN::min_scn() && start_scn >= max_end_scn_)
} else if (
// Case1: the start_scn should not greater than its end_scn. Because the
// boundary of the memtable is left-open and right-closed.
start_scn >= get_end_scn()
// Case2: the start_scn should not greater than its max_end_scn except is
// has been backoffed. Because the memtable might have failed to sync all
// the logs and its right boundary belongs to its previos memtable.
|| (max_end_scn_ != SCN::min_scn()
&& start_scn >= max_end_scn_
&& !get_has_backoffed())
// Case3: the start_scn should not greater than its rec_scn. Because the
// rec_scn is updated with real log and the start_scn should not be
// greater than the real log of its next memtable.
|| start_scn >= rec_scn_) {
ret = OB_SCN_OUT_OF_BOUND;
TRANS_LOG(ERROR, "cannot set start ts now", K(ret), K(start_scn), K(ls_id), KPC(this));
@ -252,11 +263,20 @@ int ObITabletMemtable::set_max_end_scn(const SCN scn, bool allow_backoff)
} else if (ObScnRange::MAX_SCN == scn) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(scn));
} else if (scn <= get_start_scn() || scn > get_end_scn()) {
} else if (
// Case1: the max_end_scn should not smaller than its start_scn except it
// is during backoff. Because the memtable might have failed to sync all
// the logs and the max_end_scn belongs to its previous memtable
(!allow_backoff &&
scn <= get_start_scn())
// Case2: the max_end_scn should not smaller than its end_scn. Because the
// end_scn is resolved only when the max_end_scn is decided.
|| scn > get_end_scn()) {
ret = OB_SCN_OUT_OF_BOUND;
TRANS_LOG(WARN, "cannot set max end log ts smaller to start log ts",
K(ret), K(scn), K(ls_id), KPC(this));
} else if (allow_backoff) {
set_has_backoffed();
TRANS_LOG(INFO, "set max_end_scn force", K(scn), K(max_end_scn_.atomic_get()), K(key_), KPC(this));
if (scn != max_end_scn_.atomic_get()) {
max_end_scn_.dec_update(scn);

View File

@ -22,6 +22,52 @@
namespace oceanbase {
namespace storage {
#define ATOMIC_ADD_TAG(tag) \
while (true) { \
const uint8_t old = ATOMIC_LOAD(&(flag_)); \
const uint8_t tmp = (old | (tag)); \
if (ATOMIC_BCAS(&(flag_), old, tmp)) { \
break; \
} \
}
#define ATOMIC_SUB_TAG(tag) \
while (true) { \
const uint8_t old = ATOMIC_LOAD(&(flag_)); \
const uint8_t tmp = (old & (~(tag))); \
if (ATOMIC_BCAS(&(flag_), old, tmp)) { \
break; \
} \
}
#define OB_MEMTABLE_DEFINE_FLAG_OPERATOR(cls, CLS) \
OB_INLINE void set_##cls() \
{ \
ATOMIC_ADD_TAG(MEMTABLE_##CLS##_MASK); \
} \
OB_INLINE void clear_##cls() \
{ \
ATOMIC_SUB_TAG(MEMTABLE_##CLS##_MASK); \
} \
OB_INLINE bool get_##cls() const \
{ \
return ATOMIC_LOAD(&flag_) & MEMTABLE_##CLS##_MASK; \
} \
#define OB_MEMTABLE_DEFINE_FLAG_OPERATOR_(cls, CLS) \
OB_INLINE void set_##cls##_() \
{ \
ATOMIC_ADD_TAG(MEMTABLE_##CLS##_MASK); \
} \
OB_INLINE void clear_##cls##_() \
{ \
ATOMIC_SUB_TAG(MEMTABLE_##CLS##_MASK); \
} \
OB_INLINE bool get_##cls##_() const \
{ \
return ATOMIC_LOAD(&flag_) & MEMTABLE_##CLS##_MASK; \
} \
class ObTabletMemtableMgr;
/*
@ -117,7 +163,7 @@ public:
{
int ret = OB_SUCCESS;
// logstream freeze
if (!is_tablet_freeze()) {
if (!get_is_tablet_freeze()) {
share::ObLSID ls_id;
if (OB_FAIL(get_ls_id(ls_id))) {
TRANS_LOG(WARN, "failed to get ls id", KPC(this));
@ -135,9 +181,7 @@ public:
public:
ObITabletMemtable()
: allow_freeze_(true),
is_tablet_freeze_(false),
freeze_clock_(0),
: freeze_clock_(0),
init_timestamp_(0),
max_schema_version_(0),
freezer_(nullptr),
@ -145,10 +189,13 @@ public:
freeze_scn_(),
max_end_scn_(),
rec_scn_(),
allow_freeze_(true),
is_tablet_freeze_(false),
is_flushed_(false),
logging_blocked_(false),
resolved_active_memtable_left_boundary_(true),
unset_active_memtable_logging_blocked_(false),
has_backoffed_(false),
read_barrier_(false),
unsubmitted_cnt_(0),
logging_blocked_start_time_(0),
@ -173,6 +220,7 @@ public:
logging_blocked_ = false;
resolved_active_memtable_left_boundary_ = true;
unset_active_memtable_logging_blocked_ = false;
has_backoffed_ = false;
read_barrier_ = false;
freeze_clock_ = 0;
freeze_state_ = TabletMemtableFreezeState::INVALID;
@ -243,25 +291,33 @@ public:
virtual int set_frozen() = 0;
// *************** pure virtual functions *****************
public:
// ************* memtable flag operator *************
OB_MEMTABLE_DEFINE_FLAG_OPERATOR(is_tablet_freeze, IS_TABLET_FREEZE);
OB_MEMTABLE_DEFINE_FLAG_OPERATOR(is_flushed, IS_FLUSHED);
OB_MEMTABLE_DEFINE_FLAG_OPERATOR(resolved_active_memtable_left_boundary,
RESOLVED_ACTIVE_MEMTABLE_LEFT_BOUNDARY);
OB_MEMTABLE_DEFINE_FLAG_OPERATOR(unset_active_memtable_logging_blocked,
UNSET_ACTIVE_MEMTABLE_LOGGING_BLOCKED);
OB_MEMTABLE_DEFINE_FLAG_OPERATOR(has_backoffed, HAS_BACKOFFED);
// ************* memtable flag operator *************
public:
// *************** setter *****************
void set_max_schema_version(const int64_t schema_version);
void set_is_tablet_freeze() { is_tablet_freeze_ = true; }
void set_freeze_clock(const uint32_t freeze_clock) { ATOMIC_STORE(&freeze_clock_, freeze_clock); }
void set_is_flushed() { is_flushed_ = true; }
void set_read_barrier() { read_barrier_ = true; }
void reset_mt_stat() { mt_stat_.reset(); }
void unset_active_memtable_logging_blocked() { ATOMIC_STORE(&unset_active_memtable_logging_blocked_, true); }
void set_frozen_time(const int64_t timestamp) const { mt_stat_.frozen_time_ = timestamp; }
void set_last_print_time(const int64_t timestamp) const { mt_stat_.last_print_time_ = timestamp; }
void set_ready_for_flush_time(const int64_t timestamp) { mt_stat_.ready_for_flush_time_= timestamp; }
void set_create_flush_dag_time(const int64_t timestamp) { mt_stat_.create_flush_dag_time_ = timestamp; }
void set_release_time(const int64_t timestamp) { mt_stat_.release_time_ = timestamp; }
void set_push_table_into_gc_queue_time(const int64_t timestamp) { mt_stat_.push_table_into_gc_queue_time_ = timestamp; }
void set_resolved_active_memtable_left_boundary(bool flag)
{
ATOMIC_STORE(&resolved_active_memtable_left_boundary_, flag);
}
void set_freeze_state(const TabletMemtableFreezeState state)
{
if (state >= TabletMemtableFreezeState::ACTIVE && state <= TabletMemtableFreezeState::RELEASED) {
@ -271,12 +327,12 @@ public:
void set_logging_blocked()
{
logging_blocked_start_time_ = ObClockGenerator::getClock();
ATOMIC_STORE(&logging_blocked_, true);
set_logging_blocked_();
}
void unset_logging_blocked()
void clear_logging_blocked()
{
if (get_logging_blocked()) {
ATOMIC_STORE(&logging_blocked_, false);
clear_logging_blocked_();
int64_t cost_time = ObClockGenerator::getClock() - logging_blocked_start_time_;
TRANS_LOG(INFO, "the cost time of logging blocked: ", K(cost_time), K(this), K(key_.tablet_id_));
}
@ -285,12 +341,9 @@ public:
public:
// *************** getter *****************
bool get_is_flushed() { return is_flushed_; }
bool is_tablet_freeze() { return is_tablet_freeze_; }
bool &get_read_barrier() { return read_barrier_; }
bool allow_freeze() const { return ATOMIC_LOAD(&allow_freeze_); }
bool get_logging_blocked() { return ATOMIC_LOAD(&logging_blocked_); }
bool get_resolved_active_memtable_left_boundary() { return ATOMIC_LOAD(&resolved_active_memtable_left_boundary_); }
bool allow_freeze() const { return get_allow_freeze_(); }
bool get_logging_blocked() { return get_logging_blocked_(); }
uint32_t get_freeze_clock() const { return ATOMIC_LOAD(&freeze_clock_); }
int64_t get_unsubmitted_cnt() const { return ATOMIC_LOAD(&unsubmitted_cnt_); }
int64_t get_max_schema_version() const ;
@ -315,6 +368,7 @@ public:
K(logging_blocked_),
K(resolved_active_memtable_left_boundary_),
K(unset_active_memtable_logging_blocked_),
K(has_backoffed_),
K(read_barrier_),
K(freeze_clock_),
K(freeze_state_),
@ -333,7 +387,14 @@ public:
K(mt_stat_.create_flush_dag_time_),
K(mt_stat_.release_time_),
K(mt_stat_.push_table_into_gc_queue_time_),
K(mt_stat_.last_print_time_))
K(mt_stat_.last_print_time_));
protected:
// ************* memtable flag inner operator *************
OB_MEMTABLE_DEFINE_FLAG_OPERATOR_(allow_freeze, ALLOW_FREEZE);
OB_MEMTABLE_DEFINE_FLAG_OPERATOR_(logging_blocked, LOGGING_BLOCKED);
// ************* memtable flag inner operator *************
protected:
void resolve_left_boundary_for_active_memtable_();
@ -346,8 +407,6 @@ protected:
void unset_logging_blocked_for_active_memtable_();
protected:
bool allow_freeze_;
bool is_tablet_freeze_;
mutable uint32_t freeze_clock_;
int64_t init_timestamp_;
int64_t max_schema_version_; // to record the max schema version of memtable & schema_change_clog
@ -358,10 +417,39 @@ protected:
share::SCN rec_scn_;
private:
bool is_flushed_;
bool logging_blocked_; // flag whether the memtable can submit log, cannot submit if true
bool resolved_active_memtable_left_boundary_;
bool unset_active_memtable_logging_blocked_;
static const uint8_t MEMTABLE_ALLOW_FREEZE_MASK = 1 << 0;
static const uint8_t MEMTABLE_IS_TABLET_FREEZE_MASK = 1 << 1;
static const uint8_t MEMTABLE_IS_FLUSHED_MASK = 1 << 2;
static const uint8_t MEMTABLE_LOGGING_BLOCKED_MASK = 1 << 3;
static const uint8_t MEMTABLE_RESOLVED_ACTIVE_MEMTABLE_LEFT_BOUNDARY_MASK = 1 << 4;
static const uint8_t MEMTABLE_UNSET_ACTIVE_MEMTABLE_LOGGING_BLOCKED_MASK = 1 << 5;
static const uint8_t MEMTABLE_HAS_BACKOFFED_MASK = 1 << 6;
union {
// NB: not allow to use it directly
uint8_t flag_; // extend it if necessary
struct {
// whether the memtable allow freeze
bool allow_freeze_ :1;
// whether the memtable is tabelt freezed
bool is_tablet_freeze_ :1;
// whether the memtable has finished the MINI
bool is_flushed_ :1;
// whether the memtable allow submit log
bool logging_blocked_ :1;
// whether the memtable has resolved the left
// boundary of the active memtable
bool resolved_active_memtable_left_boundary_ :1;
// whether the memtable has unset the logging
// block state of the active memtable
bool unset_active_memtable_logging_blocked_ :1;
// whether the memtable has backoffed its
// right boundary because of sync_log_fail
bool has_backoffed_ :1;
};
};
bool read_barrier_ CACHE_ALIGNED;
int64_t unsubmitted_cnt_;
int64_t logging_blocked_start_time_; // record the start time of logging blocked
@ -371,9 +459,6 @@ private:
ObMemtableMgrHandle memtable_mgr_handle_;
};
} // namespace storage
} // namespace oceanbase

View File

@ -233,7 +233,7 @@ void ObStorageTableGuard::reset()
void ObStorageTableGuard::double_check_inc_write_ref(
const uint32_t old_freeze_flag,
const bool is_tablet_freeze,
ObIMemtable *memtable,
ObMemtable *memtable,
bool &need_retry)
{
if (OB_ISNULL(memtable)) {
@ -241,7 +241,7 @@ void ObStorageTableGuard::double_check_inc_write_ref(
} else {
memtable->inc_write_ref();
const uint32 new_freeze_flag = memtable->get_freeze_flag();
const bool new_is_tablet_freeze = memtable->is_tablet_freeze();
const bool new_is_tablet_freeze = memtable->get_is_tablet_freeze();
// do double-check to prevent concurrency problems
if (old_freeze_flag != new_freeze_flag || is_tablet_freeze != new_is_tablet_freeze) {
memtable->dec_write_ref();
@ -284,7 +284,7 @@ int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObMemtable *memtable, boo
if (OB_ISNULL(memtable)) {
LOG_INFO("table is null, need to refresh", K(need_retry), K(ls_id), K(tablet_id));
} else if (FALSE_IT(old_freeze_flag = memtable->get_freeze_flag())) {
} else if (FALSE_IT(is_tablet_freeze = memtable->is_tablet_freeze())) {
} else if (FALSE_IT(is_tablet_freeze = memtable->get_is_tablet_freeze())) {
} else if (memtable->is_active_memtable()) {
// the most recent memtable is active
// no need to create a new memtable
@ -305,7 +305,7 @@ int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObMemtable *memtable, boo
LOG_WARN("fail to get memtable from ObTableHandle", K(ret), K(need_retry), K(ls_id), K(tablet_id));
} else {
if (memtable != old_memtable) {
is_tablet_freeze = memtable->is_tablet_freeze();
is_tablet_freeze = memtable->get_is_tablet_freeze();
}
double_check_inc_write_ref(old_freeze_flag, is_tablet_freeze, memtable, need_retry);
}

View File

@ -75,7 +75,7 @@ private:
void double_check_inc_write_ref(
const uint32_t old_freeze_flag,
const bool is_tablet_freeze,
ObIMemtable *memtable,
memtable::ObMemtable *memtable,
bool &bool_ret);
int check_freeze_to_inc_write_ref(memtable::ObMemtable *table, bool &bool_ret);
int create_data_memtable_(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, bool &no_need_create);

View File

@ -168,7 +168,7 @@ int ObTabletMemtableMgr::try_resolve_boundary_on_create_memtable_for_leader_(
}
}
if (!can_resolve && !double_check) {
last_frozen_tablet_memtable->set_resolved_active_memtable_left_boundary(false);
last_frozen_tablet_memtable->clear_resolved_active_memtable_left_boundary();
}
double_check = !double_check;
} while (!can_resolve && double_check);
@ -186,7 +186,7 @@ int ObTabletMemtableMgr::try_resolve_boundary_on_create_memtable_for_leader_(
} else if (can_resolve) {
SCN new_start_scn;
last_frozen_tablet_memtable->resolve_right_boundary();
last_frozen_tablet_memtable->set_resolved_active_memtable_left_boundary(true);
last_frozen_tablet_memtable->set_resolved_active_memtable_left_boundary();
if (new_tablet_memtable != last_frozen_tablet_memtable) {
new_start_scn = MAX(last_frozen_tablet_memtable->get_end_scn(),
last_frozen_tablet_memtable->get_migration_clog_checkpoint_scn());
@ -413,7 +413,7 @@ void ObTabletMemtableMgr::resolve_direct_load_memtable_boundary_(ObITabletMemtab
new_memtable_start_scn =
MAX(frozen_tablet_memtable->get_end_scn(), frozen_tablet_memtable->get_migration_clog_checkpoint_scn());
(void)active_tablet_memtable->resolve_left_boundary(new_memtable_start_scn);
(void)frozen_tablet_memtable->set_resolved_active_memtable_left_boundary(true);
(void)frozen_tablet_memtable->set_resolved_active_memtable_left_boundary();
}
}
@ -589,7 +589,7 @@ int ObTabletMemtableMgr::resolve_left_boundary_for_active_memtable(ObITabletMemt
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)) {
tablet_memtable->set_resolved_active_memtable_left_boundary(true);
tablet_memtable->set_resolved_active_memtable_left_boundary();
}
return ret;
@ -612,13 +612,13 @@ int ObTabletMemtableMgr::unset_logging_blocked_for_active_memtable(ObITabletMemt
LOG_WARN("fail to get active memtable", K(ret));
} else {
// allow the new memtable to submit log
active_tablet_memtable->unset_logging_blocked();
active_tablet_memtable->clear_logging_blocked();
}
if (OB_ENTRY_NOT_EXIST== ret) {
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)) {
tablet_memtable->unset_active_memtable_logging_blocked();
tablet_memtable->set_unset_active_memtable_logging_blocked();
}
return ret;

View File

@ -68,7 +68,7 @@ namespace storage
bool ObITabletMemtable::can_be_minor_merged()
{
return is_tablet_freeze_;
return get_is_tablet_freeze();
}
}
@ -368,8 +368,8 @@ int TestCompactionPolicy::mock_memtable(
memtable->snapshot_version_ = snapshot_scn;
memtable->write_ref_cnt_ = 0;
memtable->unsubmitted_cnt_ = 0;
memtable->is_tablet_freeze_ = true;
memtable->set_resolved_active_memtable_left_boundary(true);
memtable->set_is_tablet_freeze();
memtable->set_resolved_active_memtable_left_boundary();
memtable->set_frozen();
memtable->location_ = storage::checkpoint::ObFreezeCheckpointLocation::PREPARE;
}