From 4a58022b57a663f991390a5c8e6b79a1c58e78e9 Mon Sep 17 00:00:00 2001 From: handora Date: Wed, 11 Aug 2021 15:33:27 +0800 Subject: [PATCH] cherrypick to 3.1_opensource_release --- deps/oblib/src/lib/ob_define.h | 1 + src/clog/ob_log_sliding_window.cpp | 15 ----- src/clog/ob_log_sliding_window.h | 2 - src/clog/ob_log_state_mgr.cpp | 2 - src/storage/memtable/mvcc/ob_mvcc_ctx.cpp | 13 +++- src/storage/memtable/mvcc/ob_mvcc_ctx.h | 3 + .../memtable/mvcc/ob_mvcc_trans_ctx.cpp | 30 ++++++++- src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h | 4 ++ src/storage/memtable/ob_memtable_array.h | 4 +- src/storage/memtable/ob_memtable_context.cpp | 54 +++++++++++++++- src/storage/memtable/ob_memtable_context.h | 1 + src/storage/ob_partition_group.cpp | 64 +++++++++++++++++-- src/storage/ob_partition_group_lock.cpp | 9 ++- src/storage/ob_pg_memtable_mgr.h | 2 - src/storage/transaction/ob_trans_ctx_mgr.cpp | 2 +- src/storage/transaction/ob_trans_functor.h | 8 ++- src/storage/transaction/ob_trans_part_ctx.cpp | 57 ++++++++++++++--- src/storage/transaction/ob_trans_part_ctx.h | 24 ++++++- 18 files changed, 249 insertions(+), 46 deletions(-) diff --git a/deps/oblib/src/lib/ob_define.h b/deps/oblib/src/lib/ob_define.h index 9d42de4fd..3e7b591dc 100644 --- a/deps/oblib/src/lib/ob_define.h +++ b/deps/oblib/src/lib/ob_define.h @@ -1527,6 +1527,7 @@ const int64_t MAX_SSTABLE_CNT_IN_STORAGE = 64; const int64_t RESERVED_STORE_CNT_IN_STORAGE = 8; // Avoid mistakenly triggering minor or major freeze to cause the problem of unsuccessful merge. const int64_t MAX_FROZEN_MEMSTORE_CNT_IN_STORAGE = 7; +const int64_t MAX_MEMSTORE_CNT = 16; // some frozen memstores and one active memstore // Only limited to minor freeze, major freeze is not subject to this restriction const int64_t MAX_MEMSTORE_CNT_IN_STORAGE = MAX_FROZEN_MEMSTORE_CNT_IN_STORAGE + 1; diff --git a/src/clog/ob_log_sliding_window.cpp b/src/clog/ob_log_sliding_window.cpp index eb479a81f..22798e804 100644 --- a/src/clog/ob_log_sliding_window.cpp +++ b/src/clog/ob_log_sliding_window.cpp @@ -211,21 +211,6 @@ void ObLogSlidingWindow::destroy_aggre_buffer() } } -int ObLogSlidingWindow::leader_takeover() -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - CLOG_LOG(ERROR, "ObPartitionLogService is not inited", K(ret), K(partition_key_)); - } else { - uint64_t max_log_id = OB_INVALID_ID; - int64_t max_log_ts = OB_INVALID_TIMESTAMP; - get_max_log_id_info(max_log_id, max_log_ts); - try_update_max_majority_log(max_log_id, max_log_ts); - } - return ret; -} - int ObLogSlidingWindow::leader_active() { int ret = OB_SUCCESS; diff --git a/src/clog/ob_log_sliding_window.h b/src/clog/ob_log_sliding_window.h index 48dcbf2f3..bc25fbb1b 100644 --- a/src/clog/ob_log_sliding_window.h +++ b/src/clog/ob_log_sliding_window.h @@ -100,7 +100,6 @@ public: virtual int get_next_replay_log_timestamp(int64_t& next_replay_log_timestamp) const = 0; virtual uint64_t get_next_index_log_id() const = 0; virtual int leader_active() = 0; - virtual int leader_takeover() = 0; virtual int leader_revoke() = 0; virtual void get_next_replay_log_id_info(uint64_t& next_log_id, int64_t& next_log_ts) const = 0; virtual bool is_fake_info_need_revoke(const uint64_t log_id, const int64_t current_time) = 0; @@ -490,7 +489,6 @@ public: return common::OB_SUCCESS; } int leader_active() override; - int leader_takeover() override; int leader_revoke() override; int get_replica_replay_type(ObReplicaReplayType& replay_type) const; // is_meta_log: log type that need been replayed by D replica and log replica diff --git a/src/clog/ob_log_state_mgr.cpp b/src/clog/ob_log_state_mgr.cpp index 89d8c071f..17c5a9bec 100644 --- a/src/clog/ob_log_state_mgr.cpp +++ b/src/clog/ob_log_state_mgr.cpp @@ -1678,8 +1678,6 @@ int ObLogStateMgr::reconfirm_to_taking_over_() revoke_leader_(revoke_type); } else if (OB_FAIL(on_leader_takeover_())) { CLOG_LOG(WARN, "on_leader_takeover_ failed, try again", K_(partition_key), K(ret)); - } else if (OB_FAIL(sw_->leader_takeover())) { - CLOG_LOG(ERROR, "sw leader_active failed", K(ret), K(partition_key_)); } else { reconfirm_->reset(); // role_ = LEADER; diff --git a/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp index 7d1b7ce2c..4a25026f9 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp @@ -14,6 +14,7 @@ #include "storage/memtable/ob_memtable_key.h" #include "storage/memtable/ob_memtable_data.h" #include "storage/memtable/ob_memtable.h" +#include "storage/transaction/ob_trans_part_ctx.h" namespace oceanbase { using namespace common; @@ -83,7 +84,8 @@ int ObIMvccCtx::register_row_commit_cb(const ObMemtableKey* key, ObMvccRow* valu TRANS_LOG(WARN, "alloc row callback failed", K(ret)); } else { tg.click(); - // count up memory size of current transaction node + (void)check_row_callback_registration_between_stmt_(); + tg.click(); add_trans_mem_total_size(data_size); node->set_mvcc_row_cb(cb); cb->set(key, node, data_size, old_row, is_replay, need_fill_redo, sql_no); @@ -374,5 +376,14 @@ int64_t ObIMvccCtx::get_query_abs_lock_wait_timeout(const int64_t lock_wait_star return abs_timeout; } +void ObIMvccCtx::check_row_callback_registration_between_stmt_() +{ + ObIMemtableCtx* i_mem_ctx = (ObIMemtableCtx*)(this); + transaction::ObPartTransCtx* trans_ctx = (transaction::ObPartTransCtx*)(i_mem_ctx->get_trans_ctx()); + if (NULL != trans_ctx && trans_ctx->is_task_match()) { + TRANS_LOG(ERROR, "register commit not match expection", K(*trans_ctx)); + } +} + } // namespace memtable } // namespace oceanbase diff --git a/src/storage/memtable/mvcc/ob_mvcc_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_ctx.h index 302b34607..0d4d82e60 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_ctx.h @@ -408,6 +408,9 @@ public: ObMvccTransNode* alloc_trans_node(); int append_callback(ObITransCallback* cb); +private: + void check_row_callback_registration_between_stmt_(); + protected: DISALLOW_COPY_AND_ASSIGN(ObIMvccCtx); int alloc_type_; diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index 6185705c6..4a6affb03 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -257,6 +257,15 @@ int ObTransCallbackMgr::remove_callback_for_uncommited_txn(ObMemtable* memtable, return ret; } +int ObTransCallbackMgr::clean_dirty_callbacks() +{ + int ret = OB_SUCCESS; + + UNUSED(fifo_callback(guard(), TCB_CLEAN_DIRTY_CB)); + + return ret; +} + int ObTransCallbackMgr::calc_checksum_before_log_ts(const int64_t log_ts) { int ret = OB_SUCCESS; @@ -524,7 +533,9 @@ int ObTransCallbackList::remove_callback_fifo_callback(const ObITransCallback* s } length_--; cnt++; - callback_mgr_.get_ctx().callback_free(iter); + if (!iter->is_savepoint()) { + callback_mgr_.get_ctx().callback_free(iter); + } iter = next; same_mem_cb_cnt++; } else { @@ -788,6 +799,8 @@ int ObMvccRowCallback::callback( } } else if (TCB_ELR_TRANS_PREPARING == type) { ret = elr_trans_preparing(); + } else if (TCB_CLEAN_DIRTY_CB == type) { + ret = clean_dirty_cb(); } else if (TCB_PRINT_CALLBACK == type) { ret = print_callback(); } else { @@ -910,6 +923,7 @@ int ObMvccRowCallback::row_pending() } else { if (NULL != tnode_) { if (INT64_MAX == ctx_.get_trans_version()) { + TRANS_LOG(ERROR, "It should never go here", K(*this), K_(ctx)); unlink_trans_node(); } else if (OB_FAIL(tnode_->fill_trans_version(ctx_.get_trans_version()))) { TRANS_LOG(WARN, "fill trans version failed", K(ret), K_(ctx)); @@ -1444,5 +1458,19 @@ uint32_t ObMvccRowCallback::get_ctx_descriptor() const return ctx_.get_ctx_descriptor(); } +int ObMvccRowCallback::clean_dirty_cb() +{ + int ret = OB_SUCCESS; + + if (marked_for_logging_) { + unlink_trans_node(); + dec_pending_cb_count(); + marked_for_logging_ = false; + need_fill_redo_ = false; + } + + return ret; +} + }; // namespace memtable }; // end namespace oceanbase diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h index 7189b7c9b..bf8a0e8ed 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h @@ -46,6 +46,7 @@ enum TransCallbackType { TCB_REMOVE_CALLBACK = 17, TCB_SUB_STMT_ABORT = 18, TCB_CRC4 = 19, + TCB_CLEAN_DIRTY_CB = 20, // TEST_ONLY, used for print callback TCB_PRINT_CALLBACK = 100 @@ -322,6 +323,7 @@ public: int mark_frozen_data( const ObMemtable* const frozen_memtable, const ObMemtable* const active_memtable, bool& marked, int64_t& cb_cnt); int calc_checksum_before_log_ts(const ObITransCallback* start, const ObITransCallback* end, const int64_t log_ts); + int clean_dirty_callbacks(); int fetch_rollback_data_size(const ObITransCallback* start, const ObITransCallback* end, int64_t& rollback_size); private: @@ -507,6 +509,7 @@ private: public: bool is_rowlocks_released() const; int calc_checksum_before_log_ts(const int64_t log_ts); + int clean_dirty_callbacks(); int fetch_rollback_data_size(const ObITransCallback* point, int64_t& rollback_size); void inc_pending_log_size(const int64_t size); void inc_flushed_log_size(const int64_t size) @@ -767,6 +770,7 @@ private: } int dec_pending_cb_count(); void mark_tnode_overflow(const int64_t log_ts); + int clean_dirty_cb(); private: ObIMvccCtx& ctx_; diff --git a/src/storage/memtable/ob_memtable_array.h b/src/storage/memtable/ob_memtable_array.h index 90e23a90c..45100f240 100644 --- a/src/storage/memtable/ob_memtable_array.h +++ b/src/storage/memtable/ob_memtable_array.h @@ -40,8 +40,8 @@ public: }; bool is_reach_max_memtable_cnt() { - return get_count_() >= 16; - }; + return get_count_() >= common::MAX_MEMSTORE_CNT; + } bool is_contain_this_memtable(ObMemtable* memtable); int check_memtable_count(int64_t& count); diff --git a/src/storage/memtable/ob_memtable_context.cpp b/src/storage/memtable/ob_memtable_context.cpp index 6b2ad6663..5839028ae 100644 --- a/src/storage/memtable/ob_memtable_context.cpp +++ b/src/storage/memtable/ob_memtable_context.cpp @@ -844,8 +844,15 @@ int ObMemtableCtx::trans_replay_end(const bool commit, const int64_t trans_versi int ret = OB_SUCCESS; int cs_ret = OB_SUCCESS; - if (commit && 0 != checksum && !ObServerConfig::get_instance().ignore_replay_checksum_error) { - const uint64_t checksum4 = calc_checksum4(); + // We must calculate the checksum and generate the checksum_log_ts even when + // the checksum verification is unnecessary. This because the trans table + // merge may be triggered after clear state in which the callback has already + const uint64_t checksum4 = calc_checksum4(); + + if (commit + && 0 != checksum + && GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_3100 + && !ObServerConfig::get_instance().ignore_replay_checksum_error) { if (checksum != checksum4) { cs_ret = OB_CHECKSUM_ERROR; TRANS_LOG(ERROR, "MT_CTX: replay checksum error", K(ret), K(*this), K(commit), K(checksum), K(checksum4)); @@ -1572,6 +1579,49 @@ int ObMemtableCtx::remove_callback_for_uncommited_txn(ObMemtable* mt, int64_t& c return ret; } +// If leader switches to follower and marks some callbacks which have not been +// submmitted to sliding window. We need clean the dirty callbacks because +// follower can not submit the log and decrement the pending_cb_count_ and the +// mini merge will not be allowed before txn finishes. Which add the dependency +// from minor merge to txn termination. +// +// NB: Because we remove the trans node before txn termination, we should guarantee +// the txn should finally abort. Except the case 3.2(TODO: handora.qc): generalize +// the process of clean dirty callbacks +// +// We solve the problem in the following way: +// - When leader is revoking and no non-2pc logs of txn has already been +// submitted to sliding window and no on-the-fly log: +// - Case 1: We rollback the txn immediately, so no dirty callbacks need to be +// removed +// - When leader is revoking and some non-2pc logs of txn has already been +// submitted to sliding window: +// - Case 2.1: We only solve the case with no on-the-fly logs(because we have no idea +// whether the on-the-fly log is paxos-choosen or not) +// - If the state is not synced successfully(txn need abort), so we remove all +// marked trans node +// - When replaying start working: +// - Case 3.1: If txn has a on-the-fly log, it means some logs are not paxos-choosen +// successfully(txn need abort), so we remove all marked trans node +// - Case 3.2: If txn has no on-the-fly log and no trans state is synced by the leader +// transfer(txn may need abort, while we donot have the information whether the +// original leader successfully synced the log), and we also remove all marked trans node. +// +int ObMemtableCtx::clean_dirty_callbacks() +{ + int ret = OB_SUCCESS; + + ObByteLockGuard guard(lock_); + + if (OB_FAIL(trans_mgr_.clean_dirty_callbacks())) { + TRANS_LOG(WARN, "fail to dirty callbacks", K(ret)); + } else { + TRANS_LOG(INFO, "clean dirty callbacks successfully", K(*this)); + } + + return ret; +} + int ObMemtableCtx::mark_frozen_data( const ObMemtable* const frozen_memtable, const ObMemtable* const active_memtable, bool& marked, int64_t& cb_cnt) { diff --git a/src/storage/memtable/ob_memtable_context.h b/src/storage/memtable/ob_memtable_context.h index c04a5878d..b9d13df2b 100644 --- a/src/storage/memtable/ob_memtable_context.h +++ b/src/storage/memtable/ob_memtable_context.h @@ -489,6 +489,7 @@ public: return trans_mgr_.count(); } void dec_pending_elr_count(); + int clean_dirty_callbacks(); public: void on_tsc_retry(const ObMemtableKey& key) override; diff --git a/src/storage/ob_partition_group.cpp b/src/storage/ob_partition_group.cpp index f412c18f9..17bee3ce0 100644 --- a/src/storage/ob_partition_group.cpp +++ b/src/storage/ob_partition_group.cpp @@ -3409,7 +3409,8 @@ int ObPartitionGroup::get_freeze_cut_(ObMemtable& frozen_memtable, const bool is K(*this)); } } else { - // 2. The freeze_id of follower is the right boundary of replay queue. + // 2. The freeze_id of follower is the the maximum log id of the right + // boundary of replay queue and the max majoritied log id // The follower will block the replay, wait it to be empty and then get the freeze_id. if (OB_FAIL(wait_follower_no_pending_task_())) { STORAGE_LOG(WARN, "wait follower no pending task failed", K(is_leader), K(freeze_id), K(*this)); @@ -3421,6 +3422,53 @@ int ObPartitionGroup::get_freeze_cut_(ObMemtable& frozen_memtable, const bool is K(freeze_id), K(freeze_ts), K(*this)); + } else { + // The logic below is sophistic: + // + // If you remember the semantic of end_log_ts and max_log_ts belong to + // the memstore, you will know that all data belong to the log before + // end_log_ts is within the memstore, and the data may or maynot exist + // in the memstore if the log creates the data is between end_log_ts and + // max_log_ts + // + // In terms of the minor freeze, follower needs to wait until replaying + // to a continuous log point and fetch the freeze point. While follower + // cannot use the min replayed log ts both as the end_log_ts and + // max_log_ts. + // + // To see why the more sophistic max_log_ts calculation is required, + // consider the following example: + // 1. Leader submits the log 5,6,7 and only log 7 is in quorum using + // paxos and its data is already filled in the memstore + // 2. Leader switches to the follower and the min replayed log ts is + // smaller than the log 5's log_ts + // 3. If we just use the min replayed log ts as both the end_log_ts and + // max_log_ts the semantic specified above is broken + // + // So we need maintain the max_log_ts using the log 7's timestamp, in + // terms of the implementation, we use the max_majority_log_ts which is + // updated after each log's synchronization of leader. + // + // What's more, we need mark all data whose log is between end_log_ts to + // max_log_ts as overflow(the requirement from the storage layer). while + // the data may already synced and we have no chance to mark the data + // except traversing all data in the memtable. So we choose to mark the + // end_log_ts as the max_majority_log_ts as well. The detailed issue can + // be found in https://work.aone.alibaba-inc.com/issue/33865988 + // + // NB: we never maintain the max_mjority_log_ts for follower, so we just + // use the variable for the corner case of leader transfer. + uint64_t max_majority_log_id = OB_INVALID_ID; + int64_t max_majority_log_ts = OB_INVALID_TIMESTAMP; + (void)pls_->get_max_majority_log(max_majority_log_id, max_majority_log_ts); + if (max_majority_log_ts > freeze_ts) { + TRANS_LOG(WARN, + "max majority log ts is larger than freeze timestamp", + K(max_majority_log_ts), + K(freeze_ts), + K(*this)); + ret = OB_EAGAIN; + } } } if (OB_FAIL(ret)) { @@ -3616,7 +3664,7 @@ int ObPartitionGroup::wait_follower_no_pending_task_() int64_t cnt = 0; int64_t task_cnt = replay_status_->get_pending_task_count(); - while (replay_status_->has_pending_task(pkey_) && OB_SUCC(ret)) { + while (replay_status_->has_pending_task(pkey_) && !replay_status_->has_encount_fatal_error() && OB_SUCC(ret)) { usleep(FREEZE_WAIT_RETRY_SLEEP_TS); cnt++; @@ -3633,6 +3681,11 @@ int ObPartitionGroup::wait_follower_no_pending_task_() } } + if (replay_status_->has_encount_fatal_error()) { + TRANS_LOG(ERROR, "encounter fatal error", K(*replay_status_), K(ret), K(pkey_)); + ret = OB_ERR_UNEXPECTED; + } + return ret; } @@ -3661,7 +3714,6 @@ int ObPartitionGroup::check_range_changed_(ObTableHandle& handle, const bool is_ base_version = mt->get_base_version(); if (tmp_freeze_ts < start_log_ts || tmp_snapshot_version < base_version) { - ret = OB_EAGAIN; STORAGE_LOG(INFO, "skip freeze, maybe in the process of restarting", K(ret), @@ -3818,7 +3870,7 @@ int ObPartitionGroup::freeze_log_and_data_v2_(const bool emergency, const bool f if (OB_STATE_NOT_MATCH == ret) { STORAGE_LOG(INFO, "skip freeze due to clog state", K(ret), K(pkey_)); ret = OB_SUCCESS; - } else if (OB_EAGAIN != ret) { + } else { STORAGE_LOG(WARN, "failed to check log_id or version range changed", K(ret), K(old_handle)); } } else if (!changed) { @@ -3837,6 +3889,10 @@ int ObPartitionGroup::freeze_log_and_data_v2_(const bool emergency, const bool f } } + if (OB_FAIL(ret) || !effected) { + freeze_record_.clear(); + } + return ret; } diff --git a/src/storage/ob_partition_group_lock.cpp b/src/storage/ob_partition_group_lock.cpp index 3afb80514..8db03b3a2 100644 --- a/src/storage/ob_partition_group_lock.cpp +++ b/src/storage/ob_partition_group_lock.cpp @@ -192,7 +192,7 @@ int ObPGLockWithPendingReplayGuard::wait_follower_no_pending_task_() int64_t cnt = 0; int64_t task_cnt = replay_status_.get_pending_task_count(); - while (replay_status_.has_pending_task(pkey_) && OB_SUCC(ret)) { + while (replay_status_.has_pending_task(pkey_) && !replay_status_.has_encount_fatal_error() && OB_SUCC(ret)) { usleep(SLEEP_FOR_PENDING_REPLAY); cnt++; @@ -209,6 +209,13 @@ int ObPGLockWithPendingReplayGuard::wait_follower_no_pending_task_() } } + if (replay_status_.has_encount_fatal_error()) { + // We just return the success because there will be no pending replay task. + // While we report the ERROR to notify the user + TRANS_LOG(ERROR, "encounter fatal error", K(replay_status_), K(ret), K(pkey_)); + ret = OB_SUCCESS; + } + return ret; } diff --git a/src/storage/ob_pg_memtable_mgr.h b/src/storage/ob_pg_memtable_mgr.h index 3d2995db1..815581042 100644 --- a/src/storage/ob_pg_memtable_mgr.h +++ b/src/storage/ob_pg_memtable_mgr.h @@ -111,8 +111,6 @@ private: private: static const int64_t PRINT_READABLE_INFO_DURATION_US = 1000 * 1000 * 60 * 10L; // 10min - static const int64_t MAX_MEMSTORE_CNT = 16; - private: int64_t memtable_head_; int64_t memtable_tail_; diff --git a/src/storage/transaction/ob_trans_ctx_mgr.cpp b/src/storage/transaction/ob_trans_ctx_mgr.cpp index 13bb7d942..dda9702d5 100644 --- a/src/storage/transaction/ob_trans_ctx_mgr.cpp +++ b/src/storage/transaction/ob_trans_ctx_mgr.cpp @@ -3750,7 +3750,7 @@ int ObPartTransCtxMgr::remove_partition(const ObPartitionKey& partition, const b if (0 == ctx_mgr->get_active_read_write_count() && 0 == ctx_mgr->get_read_only_count()) { if (0 != ctx_mgr->get_ctx_count()) { TRANS_LOG( - ERROR, "maybe some context memory not free, please attention", K(partition), K(*ctx_mgr)); + WARN, "maybe some context memory not free, please attention", K(partition), K(*ctx_mgr)); } need_retry = false; // OB_SUCCESS is not returned here. diff --git a/src/storage/transaction/ob_trans_functor.h b/src/storage/transaction/ob_trans_functor.h index b47781fb6..6e26798ed 100644 --- a/src/storage/transaction/ob_trans_functor.h +++ b/src/storage/transaction/ob_trans_functor.h @@ -885,8 +885,10 @@ public: if (OB_ISNULL(part_ctx = dynamic_cast(ctx_base))) { ret = OB_ERR_UNEXPECTED; - } else if (!part_ctx->is_dirty_trans()) { - // do nothing + } else if (!part_ctx->is_dirty_trans() || !part_ctx->has_synced_log()) { + if (part_ctx->is_dirty_trans() && !part_ctx->has_synced_log()) { + TRANS_LOG(INFO, "We donot dump the dirty trans with no synced log", K(*part_ctx)); + } clean_trx_cnt_++; } else if (OB_FAIL(part_ctx->get_trans_sstable_durable_ctx_info(end_log_ts_, ctx_info))) { TRANS_LOG(WARN, "failed to get trans table status info", K(ret)); @@ -897,7 +899,7 @@ public: allocator_.reuse(); if (OB_ISNULL(tmp_buf_ = static_cast(allocator_.alloc(serialize_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "failed to allocate memory", K(serialize_size)); + TRANS_LOG(WARN, "failed to allocate memory", K(serialize_size)); } } else { tmp_buf_ = buf_; diff --git a/src/storage/transaction/ob_trans_part_ctx.cpp b/src/storage/transaction/ob_trans_part_ctx.cpp index 141c6c8ad..a94adebe8 100644 --- a/src/storage/transaction/ob_trans_part_ctx.cpp +++ b/src/storage/transaction/ob_trans_part_ctx.cpp @@ -1097,6 +1097,8 @@ int ObPartTransCtx::end_stmt_( need_response = false; ret = OB_NOT_MASTER; TRANS_LOG(WARN, "transaction is preparing changing leader", KR(ret), "context", *this); + } else if (OB_FAIL(get_status_())) { + TRANS_LOG(WARN, "transaction is not healthy when end_stmt_", KR(ret), "context", *this); } else if (!can_rollback_stmt_) { need_response = true; if (Ob2PCState::INIT == get_state_()) { @@ -1530,6 +1532,9 @@ int ObPartTransCtx::kill(const KillTransArg& arg, ObEndTransCallbackArray& cb_ar } if (OB_SUCC(ret)) { trans_kill_(); + // Force kill cannot guarantee the consistency, so we just set end_log_ts + // to zero + end_log_ts_ = 0; (void)trans_clear_(); if (OB_FAIL(unregister_timeout_task_())) { TRANS_LOG(WARN, "unregister timer task error", KR(ret), "context", *this); @@ -1775,6 +1780,10 @@ int ObPartTransCtx::on_sync_log_success( if (0 == redo_log_no_++) { TRANS_LOG(DEBUG, "participant enter into 2pc", "context", *this, K(log_type), K(timestamp)); } + if (redo_log_no_ == 1) { + // The log is completed, we need verify the txn checksum + need_checksum_ = true; + } // need submit redo_prepare log when log_type equal OB_LOG_TRANS_REDO if (OB_LOG_TRANS_REDO == log_type) { start_us = ObTimeUtility::fast_current_time(); @@ -2528,6 +2537,7 @@ int ObPartTransCtx::leader_active(const LeaderActiveArg& arg) is_trans_state_sync_finished_ = false; is_changing_leader_ = false; prepare_changing_leader_state_ = CHANGING_LEADER_STATE::NO_CHANGING_LEADER; + update_max_submitted_log_timestamp_(max_durable_log_ts_); if (need_register_timer_task) { // The request_id_ should be initialized to prevent the 2pc cannot be // driven if all participants transferring the leader @@ -3065,6 +3075,8 @@ int ObPartTransCtx::leader_revoke(const bool first_check, bool& need_release, Ob (void)unregister_timeout_task_(); if (!has_logged_() && !is_in_2pc_() && 0 == submit_log_count_) { trans_kill_(); + // Because of no logs, we can free the dirty trans instantly + end_log_ts_ = 0; (void)trans_clear_(); set_exiting_(); if (!is_logging_()) { @@ -3084,6 +3096,12 @@ int ObPartTransCtx::leader_revoke(const bool first_check, bool& need_release, Ob if (!is_trans_state_sync_finished_) { TRANS_LOG(INFO, "transaction is killed", "context", *this); } + } else if (has_logged_() && !is_in_2pc_() && !is_trans_state_sync_finished_ && 0 == submit_log_count_) { + // - When leader is revoking and some non-2pc logs of txn has already been + // submitted to sliding window: + // - If no on-the-fly log and state log is not synced successfully, remove all + // marked_log_cnts + (void)mt_ctx_.clean_dirty_callbacks(); } else if (OB_FAIL(mt_ctx_.commit_to_replay())) { TRANS_LOG(WARN, "commit to replay error", KR(ret), "context", *this); } else { @@ -3899,7 +3917,10 @@ int ObPartTransCtx::replay_prepare_log(const ObTransPrepareLog& log, const int64 } else { batch_commit_trans_ = false; } - if (log.get_redo_log_ids().count() == 0) { + if (0 == log.get_redo_log_ids().count() && 0 == redo_log_no_) { + // We only enable the checksum check if prev_redo_log_ids' count is zero + // and redo_log_no is zero. The later check is used to filter the txn + // REDO_WITH_PREPARE log which donot include itself inth prev_redo_log_id. need_checksum_ = true; } /* @@ -4409,7 +4430,6 @@ int ObPartTransCtx::replay_trans_state_log(const ObTransStateLog& log, const int TRANS_LOG(WARN, "different can elr state", K(log), K(*this)); } can_elr_ = log.is_can_elr(); - update_durable_log_id_ts_(OB_LOG_TRANS_STATE, log_id, timestamp); log_type_ = log.get_log_type(); scheduler_ = log.get_scheduler(); is_readonly_ = log.is_readonly(); @@ -4438,9 +4458,12 @@ int ObPartTransCtx::replay_trans_state_log(const ObTransStateLog& log, const int has_trans_state_log_ = true; TRANS_LOG(INFO, "replay trans state log success", "context", *this, K(log), K(log_id)); } - if (OB_FAIL(ret)) { + if (OB_SUCC(ret)) { + update_durable_log_id_ts_(OB_LOG_TRANS_STATE, log_id, timestamp); + } else { TRANS_LOG(WARN, "replay trans state log error", KR(ret), "context", *this, K(log), K(log_id)); } + REC_TRANS_TRACE_EXT(tlog_, replay_trans_state, Y(ret), @@ -4668,11 +4691,11 @@ int ObPartTransCtx::replay_start_working_log(const int64_t timestamp, const uint CtxTransTableLockGuard guard(lock_, trans_table_seqlock_); - if (submit_log_count_ <= 0) { - // do nothing - } else if (IS_NOT_INIT) { + if (IS_NOT_INIT) { TRANS_LOG(WARN, "ObPartTransCtx not inited"); ret = OB_NOT_INIT; + } else if (is_exiting_) { + // do nothing } else if (OB_UNLIKELY(!for_replay_)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "invalid state, transaction is not replaying", KR(ret), "context", *this); @@ -4681,6 +4704,16 @@ int ObPartTransCtx::replay_start_working_log(const int64_t timestamp, const uint TRANS_LOG(WARN, "trans is not valid", K(*this), K(log_id), K(timestamp), K(log), K(timestamp)); ret = OB_TRANS_INVALID_STATE; need_print_trace_log_ = true; + } else if (0 == submit_log_count_) { + if (has_logged_() && !is_in_2pc_() && !is_trans_state_sync_finished_ && is_changing_leader_) { + // - When replaying start working: + // - Case 3.2: If txn has no on-the-fly log and no trans state is synced by the leader + // transfer(txn may need abort, while we donot have the information whether the + // original leader successfully synced the log), and we also remove all marked trans node. + (void)mt_ctx_.clean_dirty_callbacks(); + + TRANS_LOG(INFO, "clean dirty callbacks when replay start working", K(*this)); + } } else { need_print_trace_log_ = true; if (!has_logged_() && !is_in_2pc_() && !is_hazardous_ctx_) { @@ -4695,11 +4728,17 @@ int ObPartTransCtx::replay_start_working_log(const int64_t timestamp, const uint // majority(TODO: need provement) is_dirty_ = false; set_exiting_(); - } else { - // because current log is not majoritied, and some logs have been + } else if (has_logged_() && !is_in_2pc_() && submit_log_count_ > 0) { + // - When replaying start working: + // - Case 3.1: If txn has a on-the-fly log, it means some logs are not paxos-choosen + // successfully(txn need abort), so we remove all marked trans node + (void)mt_ctx_.clean_dirty_callbacks(); + + // Because current log is not majoritied, and some logs have been // majoritied, we need wait for abort log by new leader TRANS_LOG(INFO, "no need to kill trans when replay start working log", K(*this)); } + submit_log_count_ = 0; TRANS_STAT_ABORT_TRANS_INC(tenant_id_); } @@ -11911,6 +11950,7 @@ int ObPartTransCtx::fake_kill_(const int64_t terminate_log_ts) // fake kill interface end_log_ts_ = terminate_log_ts; // TODO(): the interface is currently not necessary, remove it + set_state_(Ob2PCState::CLEAR); (void)trans_clear_(); set_exiting_(); } @@ -11931,6 +11971,7 @@ int ObPartTransCtx::kill_v2_(const int64_t terminate_log_ts) } else { end_log_ts_ = terminate_log_ts; // TODO(): the interface is currently not necessary, remove it + set_state_(Ob2PCState::CLEAR); (void)trans_clear_(); set_exiting_(); } diff --git a/src/storage/transaction/ob_trans_part_ctx.h b/src/storage/transaction/ob_trans_part_ctx.h index 042a592fc..87a0ee21b 100644 --- a/src/storage/transaction/ob_trans_part_ctx.h +++ b/src/storage/transaction/ob_trans_part_ctx.h @@ -311,6 +311,10 @@ public: { return is_dirty_; } + bool has_synced_log() const + { + return 0 != max_durable_log_ts_; + } int64_t get_forbidden_sql_no() const { return ATOMIC_LOAD(&forbidden_sql_no_); @@ -375,6 +379,10 @@ public: { return enable_new_1pc_; } + bool is_task_match() + { + return stmt_info_.is_task_match(); + } void remove_trans_table(); int clear_trans_after_restore( const int64_t restore_version, const uint64_t last_restore_log_id, const int64_t fake_terminate_log_ts); @@ -391,8 +399,8 @@ public: K_(is_dup_table_prepare), K_(dup_table_syncing_log_id), K_(is_prepare_leader_revoke), K_(is_local_trans), K_(forbidden_sql_no), K(is_dirty_), K_(undo_status), K_(max_durable_sql_no), K_(max_durable_log_ts), K(mt_ctx_.get_checksum_log_ts()), K_(is_changing_leader), K_(has_trans_state_log), - K_(same_leader_batch_partitions_count), K_(is_hazardous_ctx), K(mt_ctx_.get_callback_count()), - K_(in_xa_prepare_state), K_(is_listener), K_(last_replayed_redo_log_id), + K_(is_trans_state_sync_finished), K_(status), K_(same_leader_batch_partitions_count), K_(is_hazardous_ctx), + K(mt_ctx_.get_callback_count()), K_(in_xa_prepare_state), K_(is_listener), K_(last_replayed_redo_log_id), K_(is_xa_trans_prepared)); public: @@ -687,6 +695,18 @@ private: bool is_prepared_; bool is_gts_waiting_; bool batch_commit_trans_; + // Whether there exists a trans state log for the current leader transfer + // + // It is implemented as follow: + // - For the New Leader: + // - we set the value to true when we replay the trans state log + // if the new leader is me + // - we reset the value when leader is active + // - For the original Leader: + // - we reset the value before each leader transfer + // - we set the value to true when we synced the trans state log + // - we reset the value when leader is revoked if no on-the-fly log + // exist bool is_trans_state_sync_finished_; bool is_changing_leader_; bool can_rollback_stmt_;