[BUG] use tx status to kill the concurrent read and write

This commit is contained in:
Handora 2022-11-11 07:39:57 +00:00 committed by wangzelin.wzl
parent 0c0b91bf39
commit e1e46e7e5d
6 changed files with 56 additions and 88 deletions

View File

@ -559,8 +559,10 @@ int ObTableScanIterator::check_txn_status_if_read_uncommitted_()
auto &acc_ctx = ctx_guard_.get_store_ctx().mvcc_acc_ctx_;
auto &snapshot = acc_ctx.snapshot_;
if (snapshot.tx_id_.is_valid() && acc_ctx.mem_ctx_) {
if (acc_ctx.mem_ctx_->is_trans_rollbacked()) {
ret = acc_ctx.mem_ctx_->get_trans_status_retcode();
if (acc_ctx.mem_ctx_->is_tx_rollbacked()) {
// The txn has been killed during normal processing. So we return
// OB_TRANS_KILLED to prompt this abnormal state.
ret = OB_TRANS_KILLED;
STORAGE_LOG(WARN, "txn termianted when table scan", K(ret), K(acc_ctx));
}
}

View File

@ -44,12 +44,12 @@ ObMemtableCtx::ObMemtableCtx()
rwlock_(),
lock_(),
end_code_(OB_SUCCESS),
tx_status_(ObTxStatus::NORMAL),
ref_(0),
query_allocator_(),
ctx_cb_allocator_(),
log_conflict_interval_(LOG_CONFLICT_INTERVAL),
ctx_(NULL),
mutator_iter_(NULL),
truncate_cnt_(0),
lock_for_read_retry_count_(0),
lock_for_read_elapse_(0),
@ -141,10 +141,6 @@ void ObMemtableCtx::reset()
unsubmitted_cnt_ = 0;
partition_audit_info_cache_.reset();
lock_mem_ctx_.reset();
if (OB_NOT_NULL(mutator_iter_)) {
ctx_cb_allocator_.free(mutator_iter_);
mutator_iter_ = NULL;
}
//FIXME: ctx_ is not reset
log_conflict_interval_.reset();
mtstat_.reset();
@ -154,6 +150,7 @@ void ObMemtableCtx::reset()
is_master_ = true;
is_read_only_ = false;
end_code_ = OB_SUCCESS;
tx_status_ = ObTxStatus::NORMAL;
// blocked_trans_ids_.reset();
tx_table_guard_.reset();
//FIXME: ObIMemtableCtx don't have resetfunction,
@ -173,7 +170,8 @@ int64_t ObMemtableCtx::to_string(char *buf, const int64_t buf_len) const
common::databuff_printf(buf, buf_len, pos, "{");
pos += ObIMvccCtx::to_string(buf + pos, buf_len);
common::databuff_printf(buf, buf_len, pos,
" end_code=%d is_readonly=%s ref=%ld trans_id=%s ls_id=%ld "
" end_code=%d tx_status=%ld is_readonly=%s "
"ref=%ld trans_id=%s ls_id=%ld "
"callback_alloc_count=%ld callback_free_count=%ld "
"checksum=%lu tmp_checksum=%lu checksum_log_ts=%lu "
"redo_filled_count=%ld redo_sync_succ_count=%ld "
@ -182,7 +180,7 @@ int64_t ObMemtableCtx::to_string(char *buf, const int64_t buf_len) const
"cb_statistics:[main=%ld, slave=%ld, merge=%ld, "
"tx_end=%ld, rollback_to=%ld, "
"fast_commit=%ld, remove_memtable=%ld]",
end_code_, STR_BOOL(is_read_only_), ref_,
end_code_, tx_status_, STR_BOOL(is_read_only_), ref_,
NULL == ctx_ ? "" : S(ctx_->get_trans_id()),
NULL == ctx_ ? -1 : ctx_->get_ls_id().id(),
callback_alloc_count_, callback_free_count_,
@ -250,7 +248,13 @@ int ObMemtableCtx::write_auth(const bool exclusive)
TRANS_LOG(ERROR, "WriteAuth: readonly trans not support update operation",
"trans_id", ctx_->get_trans_id(), "ls_id", ctx_->get_ls_id(), K(ret));
} else if (OB_SUCCESS != ATOMIC_LOAD(&end_code_)) {
ret = get_trans_status_retcode();
ret = ATOMIC_LOAD(&end_code_);
TRANS_LOG(WARN, "WriteAuth: trans is already end", K(ret),
"trans_id", ctx_->get_trans_id(), "ls_id", ctx_->get_ls_id(), K_(end_code));
} else if (is_tx_rollbacked()) {
// The txn has been killed during normal processing. So we return
// OB_TRANS_KILLED to prompt this abnormal state.
ret = OB_TRANS_KILLED;
TRANS_LOG(WARN, "WriteAuth: trans is already end", K(ret),
"trans_id", ctx_->get_trans_id(), "ls_id", ctx_->get_ls_id(), K_(end_code));
} else if (!ATOMIC_LOAD(&is_master_)) {
@ -494,13 +498,7 @@ int ObMemtableCtx::do_trans_end(
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
WRLockGuard wrguard(rwlock_);
bool partial_rollbacked = is_partial_rollbacked_();
if (OB_UNLIKELY(partial_rollbacked) && commit) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "txn has partially rollbacked", K(ret), K(end_code), KPC(this));
ob_abort();
}
if (partial_rollbacked || OB_SUCCESS == ATOMIC_LOAD(&end_code_)) {
if (OB_SUCCESS == ATOMIC_LOAD(&end_code_)) {
ATOMIC_STORE(&end_code_, end_code);
set_commit_version(trans_version);
if (OB_FAIL(trans_mgr_.trans_end(commit))) {
@ -673,7 +671,7 @@ int ObMemtableCtx::sync_log_succ(const int64_t log_ts, const ObCallbackScope &ca
{
int ret = OB_SUCCESS;
if (is_partial_rollbacked_() || OB_SUCCESS == ATOMIC_LOAD(&end_code_)) {
if (OB_SUCCESS == ATOMIC_LOAD(&end_code_)) {
if (OB_FAIL(log_gen_.sync_log_succ(log_ts, callbacks))) {
TRANS_LOG(WARN, "sync log failed", K(ret));
}
@ -690,9 +688,9 @@ int ObMemtableCtx::sync_log_succ(const int64_t log_ts, const ObCallbackScope &ca
void ObMemtableCtx::sync_log_fail(const ObCallbackScope &callbacks)
{
if (!callbacks.is_empty()) {
set_partial_rollbacked_();
set_partial_rollbacked();
}
if (is_partial_rollbacked_() || OB_SUCCESS == ATOMIC_LOAD(&end_code_)) {
if (OB_SUCCESS == ATOMIC_LOAD(&end_code_)) {
log_gen_.sync_log_fail(callbacks);
} else {
if (!callbacks.is_empty()) {
@ -919,25 +917,6 @@ bool ObMemtableCtx::is_all_redo_submitted()
}
ObMemtableMutatorIterator *ObMemtableCtx::alloc_memtable_mutator_iter()
{
int ret = OB_SUCCESS;
ObMemtableMutatorIterator *mmi = NULL;
void *buf = ctx_cb_allocator_.alloc(sizeof(ObMemtableMutatorIterator));
if (OB_ISNULL(buf) || OB_ISNULL((mmi = new(buf) ObMemtableMutatorIterator()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
TRANS_LOG(WARN, "memtable mutator iter alloc fail", K(ret), KP(buf));
} else if (OB_ISNULL(ATOMIC_LOAD(&ctx_))) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), KP(mmi));
} else {
ATOMIC_STORE(&mutator_iter_, mmi);
}
UNUSED(ret);
return ATOMIC_LOAD(&mutator_iter_);
}
int ObMemtableCtx::remove_callbacks_for_fast_commit()
{
int ret = OB_SUCCESS;
@ -984,7 +963,7 @@ int ObMemtableCtx::clean_unlog_callbacks()
}
}
if (removed_cnt > 0) {
set_partial_rollbacked_();
set_partial_rollbacked();
}
return ret;
}
@ -1373,15 +1352,5 @@ int ObMemtableCtx::check_tx_mem_size_overflow(bool &is_overflow)
return ret;
}
inline void ObMemtableCtx::set_partial_rollbacked_()
{
if (OB_SUCCESS == ATOMIC_LOAD(&end_code_)) {
WRLockGuard wrguard(rwlock_);
if (OB_SUCCESS == ATOMIC_LOAD(&end_code_)) {
ATOMIC_STORE(&end_code_, PARTIAL_ROLLBACKED);
}
}
}
}
}

View File

@ -369,21 +369,6 @@ public:
virtual int replay_to_commit();
//method called when leader revoke
virtual int commit_to_replay();
virtual int get_trans_status() const
{
return ATOMIC_LOAD(&end_code_);
}
int get_trans_status_retcode() const
{
auto s = get_trans_status();
if (PARTIAL_ROLLBACKED == s) { return OB_TRANS_KILLED; }
return s;
}
virtual bool is_trans_rollbacked() const
{
auto s = get_trans_status();
return s != OB_SUCCESS && s != OB_TRANS_COMMITED;
}
virtual int fill_redo_log(char *buf,
const int64_t buf_len,
int64_t &buf_pos,
@ -413,8 +398,6 @@ public:
int64_t get_ref() const { return ATOMIC_LOAD(&ref_); }
uint64_t get_tenant_id() const;
bool is_can_elr() const;
ObMemtableMutatorIterator *get_memtable_mutator_iter() { return mutator_iter_; }
ObMemtableMutatorIterator *alloc_memtable_mutator_iter();
inline bool has_read_elr_data() const { return read_elr_data_; }
int remove_callbacks_for_fast_commit();
int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt);
@ -454,6 +437,17 @@ public:
int64_t get_checksum() const { return trans_mgr_.get_checksum(); }
int64_t get_tmp_checksum() const { return trans_mgr_.get_tmp_checksum(); }
int64_t get_checksum_log_ts() const { return trans_mgr_.get_checksum_log_ts(); }
public:
// tx_status
enum ObTxStatus {
PARTIAL_ROLLBACKED = -1,
NORMAL = 0,
ROLLBACKED = 1,
};
virtual int64_t get_tx_status() const { return ATOMIC_LOAD(&tx_status_); }
bool is_tx_rollbacked() const { return get_tx_status() != ObTxStatus::NORMAL; }
inline void set_partial_rollbacked() { ATOMIC_STORE(&tx_status_, ObTxStatus::PARTIAL_ROLLBACKED); }
inline void set_tx_rollbacked() { ATOMIC_STORE(&tx_status_, ObTxStatus::ROLLBACKED); }
public:
// table lock.
int enable_lock_table(storage::ObTableHandleV2 &handle);
@ -511,16 +505,14 @@ private:
{
trans_mgr_.inc_flushed_log_size(size);
}
bool is_partial_rollbacked_() { return ATOMIC_LOAD(&end_code_) == PARTIAL_ROLLBACKED; }
void set_partial_rollbacked_();
public:
inline ObRedoLogGenerator &get_redo_generator() { return log_gen_; }
private:
static const int PARTIAL_ROLLBACKED = -1;
DISALLOW_COPY_AND_ASSIGN(ObMemtableCtx);
RWLock rwlock_;
common::ObByteLock lock_;
int end_code_;
int64_t tx_status_;
int64_t ref_;
// allocate memory for callback when query executing
ObQueryAllocator query_allocator_;
@ -529,7 +521,6 @@ private:
MemtableCtxStat mtstat_;
ObTimeInterval log_conflict_interval_;
transaction::ObPartTransCtx *ctx_;
ObMemtableMutatorIterator *mutator_iter_;
transaction::ObPartitionAuditInfoCache partition_audit_info_cache_;
int64_t truncate_cnt_;
// the retry count of lock for read

View File

@ -75,8 +75,6 @@ public:
virtual int replay_to_commit() = 0;
//method called when leader revoke
virtual int commit_to_replay() = 0;
virtual int get_trans_status() const = 0;
virtual bool is_trans_rollbacked() const = 0;
virtual void set_trans_ctx(transaction::ObPartTransCtx *ctx) = 0;
virtual void inc_truncate_cnt() = 0;
virtual uint64_t get_tenant_id() const = 0;

View File

@ -292,7 +292,9 @@ int ObSingleRowGetter::get_next_row(ObNewRow *&row)
// check txn status not aborted, which cause readout incorrect result
if (store_ctx_->mvcc_acc_ctx_.snapshot_.tx_id_.is_valid() &&
store_ctx_->mvcc_acc_ctx_.mem_ctx_ &&
store_ctx_->mvcc_acc_ctx_.mem_ctx_->is_trans_rollbacked()) {
store_ctx_->mvcc_acc_ctx_.mem_ctx_->is_tx_rollbacked()) {
// The txn has been killed during normal processing. So we return
// OB_TRANS_KILLED to prompt this abnormal state.
ret = OB_TRANS_KILLED;
STORAGE_LOG(WARN, "txn has terminated", K(ret));
}

View File

@ -366,7 +366,7 @@ int ObPartTransCtx::trans_kill_()
int ret = OB_SUCCESS;
TRANS_LOG(INFO, "trans killed", K(trans_id_));
mt_ctx_.trans_kill();
mt_ctx_.set_tx_rollbacked();
if (ctx_tx_data_.get_state() == ObTxData::RUNNING) {
if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::ABORT))) {
@ -374,6 +374,8 @@ int ObPartTransCtx::trans_kill_()
}
}
mt_ctx_.trans_kill();
return ret;
}
@ -1542,7 +1544,7 @@ int ObPartTransCtx::update_max_commit_version_()
}
// Unified interface for normal transaction end(both commit and abort). We We
// want to integrate the following five things that all txn commits should do.
// want to integrate the following six things that all txn commits should do.
//
// 1.end_log_ts: We set end_log_ts during final log state is synced which must
// have been done, so we check the validation of end_log_ts here(Maybe set it in
@ -1554,9 +1556,10 @@ int ObPartTransCtx::update_max_commit_version_()
// 3.mt_ctx.tx_end: We need callback all txn ops for all data in txn after final
// state is synced. It must be called for all txns to clean and release its data
// resource.
// 4.set_state: We need set state after final state is synced. It tells others
// 4.set_status: We need set status to kill the concurrent read and write.
// 5.set_state: We need set state after final state is synced. It tells others
// that all data for this txn is decided and visible.
// 5.insert_tx_data: We need insert into tx_data in order to cleanot data which
// 6.insert_tx_data: We need insert into tx_data in order to cleanot data which
// need be delay cleanout
//
// NB: You need pay much attention to the order of the following steps
@ -1566,7 +1569,6 @@ int ObPartTransCtx::tx_end_(const bool commit)
int ret = OB_SUCCESS;
// NB: The order of the following steps is critical
// We need first set end_code_ for the s
int32_t state = commit ? ObTxData::COMMIT : ObTxData::ABORT;
int64_t commit_version = ctx_tx_data_.get_commit_version();
int64_t end_log_ts = ctx_tx_data_.get_end_log_ts();
@ -1585,19 +1587,23 @@ int ObPartTransCtx::tx_end_(const bool commit)
} else if (commit && ObTransVersion::INVALID_TRANS_VERSION == commit_version) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "commit version is invalid when tx end", K(ret), KPC(this));
// STEP3: We need invoke mt_ctx_.trans_end before state is filled in here
// because we relay on the end_code_ in mt_ctx_ to report the suicide before
// the tnode can be cleanout by concurrent read.
} else if (OB_FAIL(mt_ctx_.trans_end(commit, commit_version, end_log_ts))) {
TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), "context", *this);
// STEP3: We need set status in order to kill concurrent read and write. What
// you need pay attention to is that we rely on the status to report the
// suicide before the tnode can be cleanout by concurrent read using state in
// ctx_tx_data.
} else if (!commit && FALSE_IT(mt_ctx_.set_tx_rollbacked())) {
// STEP4: We need set state in order to informing others of the final status
// of my txn. What you need pay attention to is that after this action, others
// can cleanout the unfinished txn state and see all your data.
// TODO: we can move set_state before mt_ctx_.trans_end for commit state in
// order to accelerate users to see the data state.
// can cleanout the unfinished txn state and see all your data. We currently
// move set_state before mt_ctx_.trans_end for the commit state in order to
// accelerate users to see the data state.
} else if (OB_FAIL(ctx_tx_data_.set_state(state))) {
TRANS_LOG(WARN, "set tx data state failed", K(ret), KPC(this));
// STEP5: We need insert into the tx_data after all states are filled
// STEP5: We need invoke mt_ctx_.trans_end before state is filled in here
// because we relay on the state in the ctx_tx_data_ to callback all txn ops.
} else if (OB_FAIL(mt_ctx_.trans_end(commit, commit_version, end_log_ts))) {
TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), "context", *this);
// STEP6: We need insert into the tx_data after all states are filled
} else if (has_persisted_log_() && OB_FAIL(ctx_tx_data_.insert_into_tx_table())) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), KPC(this));
}