From e1e46e7e5df600d9cd368b62df90831e0b601cc0 Mon Sep 17 00:00:00 2001 From: Handora Date: Fri, 11 Nov 2022 07:39:57 +0000 Subject: [PATCH] [BUG] use tx status to kill the concurrent read and write --- src/storage/access/ob_table_scan_iterator.cpp | 6 +- src/storage/memtable/ob_memtable_context.cpp | 65 +++++-------------- src/storage/memtable/ob_memtable_context.h | 33 ++++------ src/storage/memtable/ob_memtable_interface.h | 2 - src/storage/ob_value_row_iterator.cpp | 4 +- src/storage/tx/ob_trans_part_ctx.cpp | 34 ++++++---- 6 files changed, 56 insertions(+), 88 deletions(-) diff --git a/src/storage/access/ob_table_scan_iterator.cpp b/src/storage/access/ob_table_scan_iterator.cpp index f5b5f8940..5b2a9cf56 100644 --- a/src/storage/access/ob_table_scan_iterator.cpp +++ b/src/storage/access/ob_table_scan_iterator.cpp @@ -559,8 +559,10 @@ int ObTableScanIterator::check_txn_status_if_read_uncommitted_() auto &acc_ctx = ctx_guard_.get_store_ctx().mvcc_acc_ctx_; auto &snapshot = acc_ctx.snapshot_; if (snapshot.tx_id_.is_valid() && acc_ctx.mem_ctx_) { - if (acc_ctx.mem_ctx_->is_trans_rollbacked()) { - ret = acc_ctx.mem_ctx_->get_trans_status_retcode(); + if (acc_ctx.mem_ctx_->is_tx_rollbacked()) { + // The txn has been killed during normal processing. So we return + // OB_TRANS_KILLED to prompt this abnormal state. + ret = OB_TRANS_KILLED; STORAGE_LOG(WARN, "txn termianted when table scan", K(ret), K(acc_ctx)); } } diff --git a/src/storage/memtable/ob_memtable_context.cpp b/src/storage/memtable/ob_memtable_context.cpp index 9e203a2c5..c34395f08 100644 --- a/src/storage/memtable/ob_memtable_context.cpp +++ b/src/storage/memtable/ob_memtable_context.cpp @@ -44,12 +44,12 @@ ObMemtableCtx::ObMemtableCtx() rwlock_(), lock_(), end_code_(OB_SUCCESS), + tx_status_(ObTxStatus::NORMAL), ref_(0), query_allocator_(), ctx_cb_allocator_(), log_conflict_interval_(LOG_CONFLICT_INTERVAL), ctx_(NULL), - mutator_iter_(NULL), truncate_cnt_(0), lock_for_read_retry_count_(0), lock_for_read_elapse_(0), @@ -141,10 +141,6 @@ void ObMemtableCtx::reset() unsubmitted_cnt_ = 0; partition_audit_info_cache_.reset(); lock_mem_ctx_.reset(); - if (OB_NOT_NULL(mutator_iter_)) { - ctx_cb_allocator_.free(mutator_iter_); - mutator_iter_ = NULL; - } //FIXME: ctx_ is not reset log_conflict_interval_.reset(); mtstat_.reset(); @@ -154,6 +150,7 @@ void ObMemtableCtx::reset() is_master_ = true; is_read_only_ = false; end_code_ = OB_SUCCESS; + tx_status_ = ObTxStatus::NORMAL; // blocked_trans_ids_.reset(); tx_table_guard_.reset(); //FIXME: ObIMemtableCtx don't have resetfunction, @@ -173,7 +170,8 @@ int64_t ObMemtableCtx::to_string(char *buf, const int64_t buf_len) const common::databuff_printf(buf, buf_len, pos, "{"); pos += ObIMvccCtx::to_string(buf + pos, buf_len); common::databuff_printf(buf, buf_len, pos, - " end_code=%d is_readonly=%s ref=%ld trans_id=%s ls_id=%ld " + " end_code=%d tx_status=%ld is_readonly=%s " + "ref=%ld trans_id=%s ls_id=%ld " "callback_alloc_count=%ld callback_free_count=%ld " "checksum=%lu tmp_checksum=%lu checksum_log_ts=%lu " "redo_filled_count=%ld redo_sync_succ_count=%ld " @@ -182,7 +180,7 @@ int64_t ObMemtableCtx::to_string(char *buf, const int64_t buf_len) const "cb_statistics:[main=%ld, slave=%ld, merge=%ld, " "tx_end=%ld, rollback_to=%ld, " "fast_commit=%ld, remove_memtable=%ld]", - end_code_, STR_BOOL(is_read_only_), ref_, + end_code_, tx_status_, STR_BOOL(is_read_only_), ref_, NULL == ctx_ ? "" : S(ctx_->get_trans_id()), NULL == ctx_ ? -1 : ctx_->get_ls_id().id(), callback_alloc_count_, callback_free_count_, @@ -250,7 +248,13 @@ int ObMemtableCtx::write_auth(const bool exclusive) TRANS_LOG(ERROR, "WriteAuth: readonly trans not support update operation", "trans_id", ctx_->get_trans_id(), "ls_id", ctx_->get_ls_id(), K(ret)); } else if (OB_SUCCESS != ATOMIC_LOAD(&end_code_)) { - ret = get_trans_status_retcode(); + ret = ATOMIC_LOAD(&end_code_); + TRANS_LOG(WARN, "WriteAuth: trans is already end", K(ret), + "trans_id", ctx_->get_trans_id(), "ls_id", ctx_->get_ls_id(), K_(end_code)); + } else if (is_tx_rollbacked()) { + // The txn has been killed during normal processing. So we return + // OB_TRANS_KILLED to prompt this abnormal state. + ret = OB_TRANS_KILLED; TRANS_LOG(WARN, "WriteAuth: trans is already end", K(ret), "trans_id", ctx_->get_trans_id(), "ls_id", ctx_->get_ls_id(), K_(end_code)); } else if (!ATOMIC_LOAD(&is_master_)) { @@ -494,13 +498,7 @@ int ObMemtableCtx::do_trans_end( int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; WRLockGuard wrguard(rwlock_); - bool partial_rollbacked = is_partial_rollbacked_(); - if (OB_UNLIKELY(partial_rollbacked) && commit) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "txn has partially rollbacked", K(ret), K(end_code), KPC(this)); - ob_abort(); - } - if (partial_rollbacked || OB_SUCCESS == ATOMIC_LOAD(&end_code_)) { + if (OB_SUCCESS == ATOMIC_LOAD(&end_code_)) { ATOMIC_STORE(&end_code_, end_code); set_commit_version(trans_version); if (OB_FAIL(trans_mgr_.trans_end(commit))) { @@ -673,7 +671,7 @@ int ObMemtableCtx::sync_log_succ(const int64_t log_ts, const ObCallbackScope &ca { int ret = OB_SUCCESS; - if (is_partial_rollbacked_() || OB_SUCCESS == ATOMIC_LOAD(&end_code_)) { + if (OB_SUCCESS == ATOMIC_LOAD(&end_code_)) { if (OB_FAIL(log_gen_.sync_log_succ(log_ts, callbacks))) { TRANS_LOG(WARN, "sync log failed", K(ret)); } @@ -690,9 +688,9 @@ int ObMemtableCtx::sync_log_succ(const int64_t log_ts, const ObCallbackScope &ca void ObMemtableCtx::sync_log_fail(const ObCallbackScope &callbacks) { if (!callbacks.is_empty()) { - set_partial_rollbacked_(); + set_partial_rollbacked(); } - if (is_partial_rollbacked_() || OB_SUCCESS == ATOMIC_LOAD(&end_code_)) { + if (OB_SUCCESS == ATOMIC_LOAD(&end_code_)) { log_gen_.sync_log_fail(callbacks); } else { if (!callbacks.is_empty()) { @@ -919,25 +917,6 @@ bool ObMemtableCtx::is_all_redo_submitted() } -ObMemtableMutatorIterator *ObMemtableCtx::alloc_memtable_mutator_iter() -{ - int ret = OB_SUCCESS; - ObMemtableMutatorIterator *mmi = NULL; - - void *buf = ctx_cb_allocator_.alloc(sizeof(ObMemtableMutatorIterator)); - if (OB_ISNULL(buf) || OB_ISNULL((mmi = new(buf) ObMemtableMutatorIterator()))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - TRANS_LOG(WARN, "memtable mutator iter alloc fail", K(ret), KP(buf)); - } else if (OB_ISNULL(ATOMIC_LOAD(&ctx_))) { - ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid argument", K(ret), KP(mmi)); - } else { - ATOMIC_STORE(&mutator_iter_, mmi); - } - UNUSED(ret); - return ATOMIC_LOAD(&mutator_iter_); -} - int ObMemtableCtx::remove_callbacks_for_fast_commit() { int ret = OB_SUCCESS; @@ -984,7 +963,7 @@ int ObMemtableCtx::clean_unlog_callbacks() } } if (removed_cnt > 0) { - set_partial_rollbacked_(); + set_partial_rollbacked(); } return ret; } @@ -1373,15 +1352,5 @@ int ObMemtableCtx::check_tx_mem_size_overflow(bool &is_overflow) return ret; } -inline void ObMemtableCtx::set_partial_rollbacked_() -{ - if (OB_SUCCESS == ATOMIC_LOAD(&end_code_)) { - WRLockGuard wrguard(rwlock_); - if (OB_SUCCESS == ATOMIC_LOAD(&end_code_)) { - ATOMIC_STORE(&end_code_, PARTIAL_ROLLBACKED); - } - } -} - } } diff --git a/src/storage/memtable/ob_memtable_context.h b/src/storage/memtable/ob_memtable_context.h index df1cb69c6..2ec9af7e5 100644 --- a/src/storage/memtable/ob_memtable_context.h +++ b/src/storage/memtable/ob_memtable_context.h @@ -369,21 +369,6 @@ public: virtual int replay_to_commit(); //method called when leader revoke virtual int commit_to_replay(); - virtual int get_trans_status() const - { - return ATOMIC_LOAD(&end_code_); - } - int get_trans_status_retcode() const - { - auto s = get_trans_status(); - if (PARTIAL_ROLLBACKED == s) { return OB_TRANS_KILLED; } - return s; - } - virtual bool is_trans_rollbacked() const - { - auto s = get_trans_status(); - return s != OB_SUCCESS && s != OB_TRANS_COMMITED; - } virtual int fill_redo_log(char *buf, const int64_t buf_len, int64_t &buf_pos, @@ -413,8 +398,6 @@ public: int64_t get_ref() const { return ATOMIC_LOAD(&ref_); } uint64_t get_tenant_id() const; bool is_can_elr() const; - ObMemtableMutatorIterator *get_memtable_mutator_iter() { return mutator_iter_; } - ObMemtableMutatorIterator *alloc_memtable_mutator_iter(); inline bool has_read_elr_data() const { return read_elr_data_; } int remove_callbacks_for_fast_commit(); int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt); @@ -454,6 +437,17 @@ public: int64_t get_checksum() const { return trans_mgr_.get_checksum(); } int64_t get_tmp_checksum() const { return trans_mgr_.get_tmp_checksum(); } int64_t get_checksum_log_ts() const { return trans_mgr_.get_checksum_log_ts(); } +public: + // tx_status + enum ObTxStatus { + PARTIAL_ROLLBACKED = -1, + NORMAL = 0, + ROLLBACKED = 1, + }; + virtual int64_t get_tx_status() const { return ATOMIC_LOAD(&tx_status_); } + bool is_tx_rollbacked() const { return get_tx_status() != ObTxStatus::NORMAL; } + inline void set_partial_rollbacked() { ATOMIC_STORE(&tx_status_, ObTxStatus::PARTIAL_ROLLBACKED); } + inline void set_tx_rollbacked() { ATOMIC_STORE(&tx_status_, ObTxStatus::ROLLBACKED); } public: // table lock. int enable_lock_table(storage::ObTableHandleV2 &handle); @@ -511,16 +505,14 @@ private: { trans_mgr_.inc_flushed_log_size(size); } - bool is_partial_rollbacked_() { return ATOMIC_LOAD(&end_code_) == PARTIAL_ROLLBACKED; } - void set_partial_rollbacked_(); public: inline ObRedoLogGenerator &get_redo_generator() { return log_gen_; } private: - static const int PARTIAL_ROLLBACKED = -1; DISALLOW_COPY_AND_ASSIGN(ObMemtableCtx); RWLock rwlock_; common::ObByteLock lock_; int end_code_; + int64_t tx_status_; int64_t ref_; // allocate memory for callback when query executing ObQueryAllocator query_allocator_; @@ -529,7 +521,6 @@ private: MemtableCtxStat mtstat_; ObTimeInterval log_conflict_interval_; transaction::ObPartTransCtx *ctx_; - ObMemtableMutatorIterator *mutator_iter_; transaction::ObPartitionAuditInfoCache partition_audit_info_cache_; int64_t truncate_cnt_; // the retry count of lock for read diff --git a/src/storage/memtable/ob_memtable_interface.h b/src/storage/memtable/ob_memtable_interface.h index 89ba72009..67db11673 100644 --- a/src/storage/memtable/ob_memtable_interface.h +++ b/src/storage/memtable/ob_memtable_interface.h @@ -75,8 +75,6 @@ public: virtual int replay_to_commit() = 0; //method called when leader revoke virtual int commit_to_replay() = 0; - virtual int get_trans_status() const = 0; - virtual bool is_trans_rollbacked() const = 0; virtual void set_trans_ctx(transaction::ObPartTransCtx *ctx) = 0; virtual void inc_truncate_cnt() = 0; virtual uint64_t get_tenant_id() const = 0; diff --git a/src/storage/ob_value_row_iterator.cpp b/src/storage/ob_value_row_iterator.cpp index d1346f42c..fe4addbf3 100644 --- a/src/storage/ob_value_row_iterator.cpp +++ b/src/storage/ob_value_row_iterator.cpp @@ -292,7 +292,9 @@ int ObSingleRowGetter::get_next_row(ObNewRow *&row) // check txn status not aborted, which cause readout incorrect result if (store_ctx_->mvcc_acc_ctx_.snapshot_.tx_id_.is_valid() && store_ctx_->mvcc_acc_ctx_.mem_ctx_ && - store_ctx_->mvcc_acc_ctx_.mem_ctx_->is_trans_rollbacked()) { + store_ctx_->mvcc_acc_ctx_.mem_ctx_->is_tx_rollbacked()) { + // The txn has been killed during normal processing. So we return + // OB_TRANS_KILLED to prompt this abnormal state. ret = OB_TRANS_KILLED; STORAGE_LOG(WARN, "txn has terminated", K(ret)); } diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index b7c3a4979..32c17913f 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -366,7 +366,7 @@ int ObPartTransCtx::trans_kill_() int ret = OB_SUCCESS; TRANS_LOG(INFO, "trans killed", K(trans_id_)); - mt_ctx_.trans_kill(); + mt_ctx_.set_tx_rollbacked(); if (ctx_tx_data_.get_state() == ObTxData::RUNNING) { if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::ABORT))) { @@ -374,6 +374,8 @@ int ObPartTransCtx::trans_kill_() } } + mt_ctx_.trans_kill(); + return ret; } @@ -1542,7 +1544,7 @@ int ObPartTransCtx::update_max_commit_version_() } // Unified interface for normal transaction end(both commit and abort). We We -// want to integrate the following five things that all txn commits should do. +// want to integrate the following six things that all txn commits should do. // // 1.end_log_ts: We set end_log_ts during final log state is synced which must // have been done, so we check the validation of end_log_ts here(Maybe set it in @@ -1554,9 +1556,10 @@ int ObPartTransCtx::update_max_commit_version_() // 3.mt_ctx.tx_end: We need callback all txn ops for all data in txn after final // state is synced. It must be called for all txns to clean and release its data // resource. -// 4.set_state: We need set state after final state is synced. It tells others +// 4.set_status: We need set status to kill the concurrent read and write. +// 5.set_state: We need set state after final state is synced. It tells others // that all data for this txn is decided and visible. -// 5.insert_tx_data: We need insert into tx_data in order to cleanot data which +// 6.insert_tx_data: We need insert into tx_data in order to cleanot data which // need be delay cleanout // // NB: You need pay much attention to the order of the following steps @@ -1566,7 +1569,6 @@ int ObPartTransCtx::tx_end_(const bool commit) int ret = OB_SUCCESS; // NB: The order of the following steps is critical - // We need first set end_code_ for the s int32_t state = commit ? ObTxData::COMMIT : ObTxData::ABORT; int64_t commit_version = ctx_tx_data_.get_commit_version(); int64_t end_log_ts = ctx_tx_data_.get_end_log_ts(); @@ -1585,19 +1587,23 @@ int ObPartTransCtx::tx_end_(const bool commit) } else if (commit && ObTransVersion::INVALID_TRANS_VERSION == commit_version) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "commit version is invalid when tx end", K(ret), KPC(this)); - // STEP3: We need invoke mt_ctx_.trans_end before state is filled in here - // because we relay on the end_code_ in mt_ctx_ to report the suicide before - // the tnode can be cleanout by concurrent read. - } else if (OB_FAIL(mt_ctx_.trans_end(commit, commit_version, end_log_ts))) { - TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), "context", *this); + // STEP3: We need set status in order to kill concurrent read and write. What + // you need pay attention to is that we rely on the status to report the + // suicide before the tnode can be cleanout by concurrent read using state in + // ctx_tx_data. + } else if (!commit && FALSE_IT(mt_ctx_.set_tx_rollbacked())) { // STEP4: We need set state in order to informing others of the final status // of my txn. What you need pay attention to is that after this action, others - // can cleanout the unfinished txn state and see all your data. - // TODO: we can move set_state before mt_ctx_.trans_end for commit state in - // order to accelerate users to see the data state. + // can cleanout the unfinished txn state and see all your data. We currently + // move set_state before mt_ctx_.trans_end for the commit state in order to + // accelerate users to see the data state. } else if (OB_FAIL(ctx_tx_data_.set_state(state))) { TRANS_LOG(WARN, "set tx data state failed", K(ret), KPC(this)); - // STEP5: We need insert into the tx_data after all states are filled + // STEP5: We need invoke mt_ctx_.trans_end before state is filled in here + // because we relay on the state in the ctx_tx_data_ to callback all txn ops. + } else if (OB_FAIL(mt_ctx_.trans_end(commit, commit_version, end_log_ts))) { + TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), "context", *this); + // STEP6: We need insert into the tx_data after all states are filled } else if (has_persisted_log_() && OB_FAIL(ctx_tx_data_.insert_into_tx_table())) { TRANS_LOG(WARN, "insert to tx table failed", KR(ret), KPC(this)); }