[BUG]fix query engine not atomic flag

This commit is contained in:
Handora
2023-02-07 20:24:18 +08:00
committed by ob-robot
parent b34367a7ab
commit a00463ed37
2 changed files with 94 additions and 84 deletions

View File

@ -136,28 +136,6 @@ void ObMvccTransNode::set_snapshot_version_barrier(const SCN version)
snapshot_version_barrier_ = version; snapshot_version_barrier_ = version;
} }
void ObMvccTransNode::set_elr()
{
while (true) {
const uint8_t flag = ATOMIC_LOAD(&flag_);
const uint8_t tmp = (flag | F_ELR);
if (ATOMIC_BCAS(&flag_, flag, tmp)) {
break;
}
}
}
void ObMvccTransNode::set_committed()
{
while (true) {
const uint8_t flag = ATOMIC_LOAD(&flag_);
const uint8_t tmp = (flag | F_COMMITTED);
if (ATOMIC_BCAS(&flag_, flag, tmp)) {
break;
}
}
}
void ObMvccTransNode::get_trans_id_and_seq_no(ObTransID &tx_id, void ObMvccTransNode::get_trans_id_and_seq_no(ObTransID &tx_id,
int64_t &seq_no) int64_t &seq_no)
{ {
@ -165,47 +143,6 @@ void ObMvccTransNode::get_trans_id_and_seq_no(ObTransID &tx_id,
seq_no = seq_no_; seq_no = seq_no_;
} }
void ObMvccTransNode::clear_aborted()
{
const uint8_t consistent_flag = F_ABORTED;
while (true) {
const uint8_t flag = ATOMIC_LOAD(&flag_);
const uint8_t tmp = (flag & (~consistent_flag));
if (ATOMIC_BCAS(&flag_, flag, tmp)) {
break;
}
}
}
void ObMvccTransNode::set_aborted()
{
while (true) {
const uint8_t flag = ATOMIC_LOAD(&flag_);
const uint8_t tmp = (flag | F_ABORTED);
if (ATOMIC_BCAS(&flag_, flag, tmp)) {
break;
}
}
}
void ObMvccTransNode::set_delayed_cleanout(const bool delayed_cleanout)
{
while (true) {
const uint8_t flag = ATOMIC_LOAD(&flag_);
const uint8_t tmp = delayed_cleanout
? flag | F_DELAYED_CLEANOUT
: flag & ~F_DELAYED_CLEANOUT;
if (ATOMIC_BCAS(&flag_, flag, tmp)) {
break;
}
}
}
bool ObMvccTransNode::is_delayed_cleanout() const
{
return ATOMIC_LOAD(&flag_) & F_DELAYED_CLEANOUT;
}
int ObMvccTransNode::fill_trans_version(const SCN version) int ObMvccTransNode::fill_trans_version(const SCN version)
{ {
trans_version_.atomic_store(version); trans_version_.atomic_store(version);
@ -1185,6 +1122,5 @@ void ObMvccRow::print_row()
} }
} }
} }
}; // end namespace mvcc }; // end namespace mvcc
}; // end namespace oceanbase }; // end namespace oceanbase

View File

@ -37,6 +37,24 @@ class ObIMemtableCtx;
class ObMemtableKey; class ObMemtableKey;
class ObMvccRowCallback; class ObMvccRowCallback;
#define ATOMIC_ADD_TAG(tag) \
while (true) { \
const uint8_t old = ATOMIC_LOAD(&(flag_)); \
const uint8_t tmp = (old | (tag)); \
if (ATOMIC_BCAS(&(flag_), old, tmp)) { \
break; \
} \
}
#define ATOMIC_SUB_TAG(tag) \
while (true) { \
const uint8_t old = ATOMIC_LOAD(&(flag_)); \
const uint8_t tmp = (old & (~(tag))); \
if (ATOMIC_BCAS(&(flag_), old, tmp)) { \
break; \
} \
}
// ObMvccTransNode is the multi-version data used for mvcc and stored on // ObMvccTransNode is the multi-version data used for mvcc and stored on
// memtable. It only saves updated columns for write and write by aggregating tx // memtable. It only saves updated columns for write and write by aggregating tx
// nodes to data contains all columns. // nodes to data contains all columns.
@ -104,15 +122,46 @@ public:
void set_snapshot_version_barrier(const share::SCN version); void set_snapshot_version_barrier(const share::SCN version);
// ===================== ObMvccTransNode Flag Interface ===================== // ===================== ObMvccTransNode Flag Interface =====================
void set_committed(); OB_INLINE void set_committed()
bool is_committed() const { return ATOMIC_LOAD(&flag_) & F_COMMITTED; } {
void set_elr(); ATOMIC_ADD_TAG(F_COMMITTED);
bool is_elr() const { return ATOMIC_LOAD(&flag_) & F_ELR; } }
void set_aborted(); OB_INLINE bool is_committed() const
void clear_aborted(); {
bool is_aborted() const { return ATOMIC_LOAD(&flag_) & F_ABORTED; } return ATOMIC_LOAD(&flag_) & F_COMMITTED;
void set_delayed_cleanout(const bool delayed_cleanout); }
bool is_delayed_cleanout() const; OB_INLINE void set_elr()
{
ATOMIC_ADD_TAG(F_ELR);
}
OB_INLINE bool is_elr() const
{
return ATOMIC_LOAD(&flag_) & F_ELR;
}
OB_INLINE void set_aborted()
{
ATOMIC_ADD_TAG(F_ABORTED);
}
OB_INLINE void clear_aborted()
{
ATOMIC_SUB_TAG(F_ABORTED);
}
OB_INLINE bool is_aborted() const
{
return ATOMIC_LOAD(&flag_) & F_ABORTED;
}
OB_INLINE void set_delayed_cleanout(const bool delayed_cleanout)
{
if (OB_LIKELY(delayed_cleanout)) {
ATOMIC_ADD_TAG(F_DELAYED_CLEANOUT);
} else {
ATOMIC_SUB_TAG(F_DELAYED_CLEANOUT);
}
}
OB_INLINE bool is_delayed_cleanout() const
{
return ATOMIC_LOAD(&flag_) & F_DELAYED_CLEANOUT;
}
// ===================== ObMvccTransNode Setter/Getter ===================== // ===================== ObMvccTransNode Setter/Getter =====================
blocksstable::ObDmlFlag get_dml_flag() const; blocksstable::ObDmlFlag get_dml_flag() const;
@ -314,18 +363,43 @@ struct ObMvccRow
// ===================== ObMvccRow Event Statistic ===================== // ===================== ObMvccRow Event Statistic =====================
void lock_begin(ObIMemtableCtx &ctx) const; void lock_begin(ObIMemtableCtx &ctx) const;
void mvcc_write_end(ObIMemtableCtx &ctx, int64_t ret) const; void mvcc_write_end(ObIMemtableCtx &ctx, int64_t ret) const;
// ===================== ObMvccRow Flag Interface ===================== // ===================== ObMvccRow Flag Interface =====================
bool is_btree_indexed() { return flag_ & F_BTREE_INDEX; } OB_INLINE bool is_btree_indexed() const
void set_btree_indexed() { flag_ |= F_BTREE_INDEX; } {
void clear_btree_indexed() { flag_ &= static_cast<uint8_t>(~F_BTREE_INDEX); } return ATOMIC_LOAD(&flag_) & F_BTREE_INDEX;
bool is_btree_tag_del() { return flag_ & F_BTREE_TAG_DEL; } }
void set_btree_tag_del() { flag_ |= F_BTREE_TAG_DEL; } OB_INLINE void set_btree_indexed()
void clear_btree_tag_del() { flag_ &= static_cast<uint8_t>(~F_BTREE_TAG_DEL); } {
void set_hash_indexed() { flag_ |= F_HASH_INDEX; } ATOMIC_ADD_TAG(F_BTREE_INDEX);
bool is_lower_lock_scaned() const { return flag_ & F_LOWER_LOCK_SCANED; } }
void set_lower_lock_scaned() { flag_ |= F_LOWER_LOCK_SCANED; } OB_INLINE void clear_btree_indexed()
{
ATOMIC_SUB_TAG(F_BTREE_INDEX);
}
OB_INLINE bool is_btree_tag_del() const
{
return ATOMIC_LOAD(&flag_) & F_BTREE_TAG_DEL;
}
OB_INLINE void set_btree_tag_del()
{
ATOMIC_ADD_TAG(F_BTREE_TAG_DEL);
}
OB_INLINE void clear_btree_tag_del()
{
ATOMIC_SUB_TAG(F_BTREE_TAG_DEL);
}
OB_INLINE void set_hash_indexed()
{
ATOMIC_ADD_TAG(F_HASH_INDEX);
}
OB_INLINE bool is_lower_lock_scaned() const
{
return ATOMIC_LOAD(&flag_) & F_LOWER_LOCK_SCANED;
}
OB_INLINE void set_lower_lock_scaned()
{
ATOMIC_ADD_TAG(F_LOWER_LOCK_SCANED);
}
// ===================== ObMvccRow Helper Function ===================== // ===================== ObMvccRow Helper Function =====================
int64_t to_string(char *buf, const int64_t buf_len) const; int64_t to_string(char *buf, const int64_t buf_len) const;
int64_t to_string(char *buf, const int64_t buf_len, const bool verbose) const; int64_t to_string(char *buf, const int64_t buf_len, const bool verbose) const;