add weak read semantic to snapshot_version_barrier

This commit is contained in:
Handora
2023-04-24 09:45:08 +00:00
committed by ob-robot
parent c6cffaa327
commit 57c3448d2d
10 changed files with 77 additions and 46 deletions

View File

@ -616,7 +616,6 @@ public:
share::SCN snapshot_scn;
snapshot_scn.convert_for_tx(snapshot_version);
EXPECT_EQ(OB_SUCCESS, row->row_compact(memtable,
for_replay,
snapshot_scn,
&allocator2_));
TRANS_LOG(INFO, "====================== end compact row =====================",

View File

@ -94,7 +94,6 @@ int ObMvccEngine::try_compact_row_when_mvcc_read_(const SCN &snapshot_version,
} else {
ObRowLatchGuard guard(row.latch_);
if (OB_FAIL(row.row_compact(memtable_,
true/*for_replay*/,
snapshot_version,
engine_allocator_))) {
TRANS_LOG(WARN, "row compact error", K(ret), K(snapshot_version));

View File

@ -95,11 +95,11 @@ int ObMvccValueIterator::lock_for_read_(const ObQueryFlag &flag)
if (NULL != version_iter_) {
if (ctx_->is_weak_read()) {
version_iter_->set_safe_read_barrier(true);
version_iter_->set_snapshot_version_barrier(ctx_->snapshot_.version_);
}
if (!flag.is_prewarm()
&& !version_iter_->is_elr()) {
version_iter_->set_snapshot_version_barrier(ctx_->snapshot_.version_);
version_iter_->set_snapshot_version_barrier(ctx_->snapshot_.version_,
ObMvccTransNode::WEAK_READ_BIT);
} else if (!flag.is_prewarm() && !version_iter_->is_elr()) {
version_iter_->set_snapshot_version_barrier(ctx_->snapshot_.version_,
ObMvccTransNode::NORMAL_READ_BIT);
}
}

View File

@ -131,9 +131,18 @@ bool ObMvccTransNode::is_safe_read_barrier() const
|| (flag & F_STRONG_CONSISTENT_READ_BARRIER));
}
void ObMvccTransNode::set_snapshot_version_barrier(const SCN version)
void ObMvccTransNode::set_snapshot_version_barrier(const SCN scn_version,
const int64_t flag)
{
snapshot_version_barrier_ = version;
ATOMIC_STORE(&snapshot_version_barrier_, scn_version.get_val_for_tx() | flag);
}
void ObMvccTransNode::get_snapshot_version_barrier(int64_t &version,
int64_t &flag)
{
int64_t flaged_version = ATOMIC_LOAD(&snapshot_version_barrier_);
version = flaged_version & (~SNAPSHOT_VERSION_BARRIER_BIT);
flag = flaged_version & SNAPSHOT_VERSION_BARRIER_BIT;
}
void ObMvccTransNode::get_trans_id_and_seq_no(ObTransID &tx_id,
@ -205,7 +214,8 @@ int64_t ObMvccTransNode::to_string(char *buf, const int64_t buf_len) const
"version=%ld "
"type=%d "
"flag=%d "
"snapshot_version_barrier=%s "
"snapshot_barrier=%ld "
"snapshot_barrier_flag=%ld "
"mtd=%s "
"seq_no=%ld",
this,
@ -219,7 +229,9 @@ int64_t ObMvccTransNode::to_string(char *buf, const int64_t buf_len) const
version_,
type_,
flag_,
to_cstring(snapshot_version_barrier_),
snapshot_version_barrier_
& (~SNAPSHOT_VERSION_BARRIER_BIT),
snapshot_version_barrier_ >> 62,
to_cstring(*mtd),
seq_no_);
return pos;
@ -487,7 +499,6 @@ bool ObMvccRow::need_compact(const bool for_read, const bool for_replay)
}
int ObMvccRow::row_compact(ObMemtable *memtable,
const bool for_replay,
const SCN snapshot_version,
ObIAllocator *node_alloc)
{
@ -498,9 +509,10 @@ int ObMvccRow::row_compact(ObMemtable *memtable,
KP(node_alloc), KP(memtable));
} else {
ObMemtableRowCompactor row_compactor;
if (OB_FAIL(row_compactor.init(this, memtable, node_alloc, for_replay))) {
if (OB_FAIL(row_compactor.init(this, memtable, node_alloc))) {
TRANS_LOG(WARN, "row compactor init error", K(ret));
} else if (OB_FAIL(row_compactor.compact(snapshot_version))) {
} else if (OB_FAIL(row_compactor.compact(snapshot_version,
ObMvccTransNode::COMPACT_READ_BIT))) {
TRANS_LOG(WARN, "row compact error", K(ret), K(snapshot_version));
} else {
// do nothing
@ -732,13 +744,17 @@ int ObMvccRow::trans_commit(const SCN commit_version, ObMvccTransNode &node)
} else {
// Check safety condition for ELR
if (NULL != node.prev_ && node.prev_->is_safe_read_barrier()) {
if (commit_version <= node.prev_->snapshot_version_barrier_) {
if ((node.is_elr() || node.is_delayed_cleanout()) && node.prev_->type_ == NDT_COMPACT) {
int64_t snapshot_version_barrier = 0;
int64_t flag = 0;
(void)node.prev_->get_snapshot_version_barrier(snapshot_version_barrier, flag);
if (commit_version.get_val_for_tx() <= snapshot_version_barrier) {
if ((node.is_elr() || node.is_delayed_cleanout()) && node.prev_->type_ == NDT_COMPACT) {
// do nothing
} else {
// ignore ret
TRANS_LOG(ERROR, "unexpected commit version", K(commit_version), K(*this),
"cur_node", node, "prev_node", *(node.prev_));
TRANS_LOG(ERROR, "unexpected commit version", K(snapshot_version_barrier),
"cur_node", node, "prev_node", *(node.prev_), K(flag), K(*this),
K(commit_version));
}
}
}

View File

@ -72,7 +72,7 @@ public:
modify_count_(0),
acc_checksum_(0),
version_(0),
snapshot_version_barrier_(share::SCN::min_scn()),
snapshot_version_barrier_(0),
type_(NDT_NORMAL),
flag_(0) {}
@ -88,7 +88,7 @@ public:
uint32_t modify_count_;
uint32_t acc_checksum_;
int64_t version_;
share::SCN snapshot_version_barrier_;
int64_t snapshot_version_barrier_;
uint8_t type_;
uint8_t flag_;
char buf_[0];
@ -112,14 +112,17 @@ public:
void remove_callback();
// ===================== ObMvccTransNode Tx Node Meta =====================
// ObMvccRow records safe_read_barrier and snapshot_version_barrier to detect
// unexpected behaviors. The safe_read_barrier means the type of the last read
// operation performed on the row. And the snapshot_version_barrier means the
// version of the read operation,
// ObMvccRow records snapshot_version_barrier to detect unexpected concurrency
// control behaviors. The snapshot_version_barrier means the snapshot of the
// latest read operation, and if a commit version appears after the read
// operation with the commit version smaller than the snapshot version, we
// should report the unexpected bahavior.
void set_safe_read_barrier(const bool is_weak_consistent_read);
void clear_safe_read_barrier();
bool is_safe_read_barrier() const;
void set_snapshot_version_barrier(const share::SCN version);
void set_snapshot_version_barrier(const share::SCN version,
const int64_t flag);
void get_snapshot_version_barrier(int64_t &version, int64_t &flag);
// ===================== ObMvccTransNode Flag Interface =====================
OB_INLINE void set_committed()
@ -182,6 +185,7 @@ public:
share::SCN get_tx_end_scn() { return tx_end_scn_.atomic_load(); }
private:
// the row flag of the mvcc tx node
static const uint8_t F_INIT;
static const uint8_t F_WEAK_CONSISTENT_READ_BARRIER;
static const uint8_t F_STRONG_CONSISTENT_READ_BARRIER;
@ -190,6 +194,13 @@ private:
static const uint8_t F_ABORTED;
static const uint8_t F_DELAYED_CLEANOUT;
static const uint8_t F_MUTEX;
public:
// the snapshot flag of the snapshot version barrier
static const int64_t NORMAL_READ_BIT = 0x0L;
static const int64_t WEAK_READ_BIT = 0x1L << 62;
static const int64_t COMPACT_READ_BIT = 0x2L << 62;
static const int64_t SNAPSHOT_VERSION_BARRIER_BIT = 0x3L << 62;
};
////////////////////////////////////////////////////////////////////////////////////////////////////
@ -303,7 +314,6 @@ struct ObMvccRow
// snapshot_version is the version for row compact
// node_alloc is the allocator for compact node allocation
int row_compact(ObMemtable *memtable,
const bool for_replay,
const share::SCN snapshot_version,
common::ObIAllocator *node_alloc);

View File

@ -977,12 +977,18 @@ int ObMvccRowCallback::trans_commit()
(void)ATOMIC_FAA(&value_.update_since_compact_, 1);
if (value_.need_compact(for_read, ctx_.is_for_replay())) {
if (ctx_.is_for_replay()) {
if (ctx_.get_replay_compact_version().is_valid_and_not_min() && SCN::max_scn() != ctx_.get_replay_compact_version()) {
memtable_->row_compact(&value_, ctx_.is_for_replay(), ctx_.get_replay_compact_version());
if (ctx_.get_replay_compact_version().is_valid_and_not_min()
&& SCN::max_scn() != ctx_.get_replay_compact_version()) {
memtable_->row_compact(&value_,
ctx_.get_replay_compact_version(),
ObMvccTransNode::WEAK_READ_BIT
| ObMvccTransNode::COMPACT_READ_BIT);
}
} else {
SCN snapshot_version_for_compact = SCN::minus(SCN::max_scn(), 100);
memtable_->row_compact(&value_, ctx_.is_for_replay(), snapshot_version_for_compact);
memtable_->row_compact(&value_,
snapshot_version_for_compact,
ObMvccTransNode::NORMAL_READ_BIT);
}
}
}

View File

@ -1375,17 +1375,17 @@ ObDatumRange &ObMemtable::m_get_real_range(ObDatumRange &real_range, const ObDat
}
int ObMemtable::row_compact(ObMvccRow *row,
const bool for_replay,
const SCN snapshot_version)
const SCN snapshot_version,
const int64_t flag)
{
int ret = OB_SUCCESS;
ObMemtableRowCompactor row_compactor;
if (OB_ISNULL(row)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "row is NULL");
} else if (OB_FAIL(row_compactor.init(row, this, &local_allocator_, for_replay))) {
} else if (OB_FAIL(row_compactor.init(row, this, &local_allocator_))) {
TRANS_LOG(WARN, "row compactor init error", K(ret));
} else if (OB_FAIL(row_compactor.compact(snapshot_version))) {
} else if (OB_FAIL(row_compactor.compact(snapshot_version, flag))) {
TRANS_LOG(WARN, "row_compact fail", K(ret), K(*row), K(snapshot_version));
} else {
// do nothing

View File

@ -325,7 +325,9 @@ public:
inline bool not_empty() const { return INT64_MAX != get_protection_clock(); };
void set_max_schema_version(const int64_t schema_version);
virtual int64_t get_max_schema_version() const override;
int row_compact(ObMvccRow *value, const bool for_replay, const share::SCN snapshot_version);
int row_compact(ObMvccRow *value,
const share::SCN snapshot_version,
const int64_t flag);
int64_t get_hash_item_count() const;
int64_t get_hash_alloc_memory() const;
int64_t get_btree_item_count() const;

View File

@ -40,16 +40,14 @@ ObMemtableRowCompactor::ObMemtableRowCompactor()
: is_inited_(false),
row_(NULL),
memtable_(NULL),
node_alloc_(NULL),
for_replay_(false)
node_alloc_(NULL)
{}
ObMemtableRowCompactor::~ObMemtableRowCompactor() {}
int ObMemtableRowCompactor::init(ObMvccRow *row,
ObMemtable *mt,
ObIAllocator *node_alloc,
const bool for_replay)
ObIAllocator *node_alloc)
{
int ret = OB_SUCCESS;
if (is_inited_) {
@ -63,7 +61,6 @@ int ObMemtableRowCompactor::init(ObMvccRow *row,
row_ = row;
memtable_ = mt;
node_alloc_ = node_alloc;
for_replay_ = for_replay;
}
return ret;
}
@ -72,7 +69,8 @@ int ObMemtableRowCompactor::init(ObMvccRow *row,
// So modification is guaranteed to be safety with another modification,
// while we need pay attention to the concurrency between lock_for_read
// and modification(such as compact)
int ObMemtableRowCompactor::compact(const SCN snapshot_version)
int ObMemtableRowCompactor::compact(const SCN snapshot_version,
const int64_t flag)
{
int ret = OB_SUCCESS;
@ -91,7 +89,7 @@ int ObMemtableRowCompactor::compact(const SCN snapshot_version)
find_start_pos_(snapshot_version, start);
tg.click();
ObMvccTransNode *compact_node = construct_compact_node_(snapshot_version, start);
ObMvccTransNode *compact_node = construct_compact_node_(snapshot_version, flag, start);
tg.click();
if (OB_NOT_NULL(compact_node)) {
@ -198,6 +196,7 @@ int ObMemtableRowCompactor::try_cleanout_tx_node_during_compact_(ObTxTableGuard
}
ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const SCN snapshot_version,
const int64_t flag,
ObMvccTransNode *save)
{
int ret = OB_SUCCESS;
@ -350,7 +349,7 @@ ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const SCN snaps
trans_node->type_ = NDT_COMPACT;
trans_node->flag_ = save->flag_;
trans_node->scn_ = save->scn_;
trans_node->set_snapshot_version_barrier(snapshot_version);
trans_node->set_snapshot_version_barrier(snapshot_version, flag);
TRANS_LOG(DEBUG, "success to compact row, ", K(trans_node->tx_id_), K(dml_flag), K(compact_row_cnt), KPC(save));
}
}

View File

@ -52,14 +52,15 @@ private:
public:
int init(ObMvccRow *row,
ObMemtable *mt,
common::ObIAllocator *node_alloc,
const bool for_replay);
common::ObIAllocator *node_alloc);
// compact and refresh the update counter by snapshot version
int compact(const share::SCN snapshot_version);
int compact(const share::SCN snapshot_version,
const int64_t flag);
private:
void find_start_pos_(const share::SCN snapshot_version,
ObMvccTransNode *&save);
ObMvccTransNode *construct_compact_node_(const share::SCN snapshot_version,
const int64_t flag,
ObMvccTransNode *save);
int try_cleanout_tx_node_during_compact_(storage::ObTxTableGuard &tx_table_guard,
ObMvccTransNode *tnode);
@ -70,7 +71,6 @@ private:
ObMvccRow *row_;
ObMemtable *memtable_;
common::ObIAllocator *node_alloc_;
bool for_replay_;
};