diff --git a/src/storage/access/ob_multiple_get_merge.cpp b/src/storage/access/ob_multiple_get_merge.cpp index 2b77bb1d8e..f8a1a9f2dd 100644 --- a/src/storage/access/ob_multiple_get_merge.cpp +++ b/src/storage/access/ob_multiple_get_merge.cpp @@ -233,18 +233,12 @@ int ObMultipleGetMerge::inner_get_next_row(ObDatumRow &row) // and if we find that it does not exist, there must be an anomaly if (GCONF.enable_defensive_check() && access_ctx_->query_flag_.is_lookup_for_4377()) { - ret = OB_ERR_DEFENSIVE_CHECK; - ObString func_name = ObString::make_string("[index lookup]ObMultipleGetMerge::inner_get_next_row"); - LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); - LOG_DBA_ERROR(OB_ERR_DEFENSIVE_CHECK, "msg", "Fatal Error!!! Catch a defensive error!", K(ret), - K(rowkeys_), - K(get_row_range_idx_ - 1), - K(rowkeys_->at(get_row_range_idx_ - 1)), - K(fuse_row), - KPC(access_ctx_->store_ctx_)); - concurrency_control::ObDataValidationService::set_delay_resource_recycle(access_ctx_->ls_id_); - dump_table_statistic_for_4377(); - dump_tx_statistic_for_4377(access_ctx_->store_ctx_); + ret = handle_4377("[index lookup]ObMultipleGetMerge::inner_get_next_row"); + STORAGE_LOG(WARN,"[index lookup] row not found", K(ret), + K(rowkeys_), + K(get_row_range_idx_ - 1), + K(rowkeys_->at(get_row_range_idx_ - 1)), + K(fuse_row)); } } } diff --git a/src/storage/access/ob_multiple_merge.cpp b/src/storage/access/ob_multiple_merge.cpp index c1a837173a..72c90bbc35 100644 --- a/src/storage/access/ob_multiple_merge.cpp +++ b/src/storage/access/ob_multiple_merge.cpp @@ -1581,6 +1581,26 @@ void ObMultipleMerge::reuse_lob_locator() lob_reader_.reuse(); } +int ObMultipleMerge::handle_4377(const char* func) +{ + int ret = OB_ERR_DEFENSIVE_CHECK; + // check whether txn is aborted + if (access_ctx_->store_ctx_->is_uncommitted_data_rollbacked()) { + STORAGE_LOG(WARN, "transaction has been aborted", KPC(access_ctx_->store_ctx_)); + ret = OB_TRANS_KILLED; + } else { + ObString func_name = ObString::make_string(func); + LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); + LOG_DBA_ERROR(OB_ERR_DEFENSIVE_CHECK, "msg", + "Fatal Error!!! Catch a defensive error! index lookup: row not found in data-table", + K(ret), KPC(access_ctx_->store_ctx_)); + concurrency_control::ObDataValidationService::set_delay_resource_recycle(access_ctx_->ls_id_); + dump_table_statistic_for_4377(); + dump_tx_statistic_for_4377(access_ctx_->store_ctx_); + } + return ret; +} + void ObMultipleMerge::dump_tx_statistic_for_4377(ObStoreCtx *store_ctx) { int ret = OB_SUCCESS; diff --git a/src/storage/access/ob_multiple_merge.h b/src/storage/access/ob_multiple_merge.h index 26617c32b5..8cf75b0545 100644 --- a/src/storage/access/ob_multiple_merge.h +++ b/src/storage/access/ob_multiple_merge.h @@ -91,6 +91,7 @@ protected: void reset_iter_array(const bool can_reuse = false); void reuse_iter_array(); void reclaim_iter_array(); + int handle_4377(const char* func); void dump_tx_statistic_for_4377(ObStoreCtx *store_ctx); void dump_table_statistic_for_4377(); int set_base_version() const; diff --git a/src/storage/access/ob_single_merge.cpp b/src/storage/access/ob_single_merge.cpp index f59b10e5d4..c3d357b935 100644 --- a/src/storage/access/ob_single_merge.cpp +++ b/src/storage/access/ob_single_merge.cpp @@ -373,21 +373,14 @@ int ObSingleMerge::inner_get_next_row(ObDatumRow &row) if (GCONF.enable_defensive_check() && access_ctx_->query_flag_.is_lookup_for_4377() && OB_ITER_END == ret) { - ret = OB_ERR_DEFENSIVE_CHECK; - ObString func_name = ObString::make_string("[index lookup]ObSingleMerge::inner_get_next_row"); - LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); - LOG_DBA_ERROR(OB_ERR_DEFENSIVE_CHECK, "msg", "Fatal Error!!! Catch a defensive error!", K(ret), - K(have_uncommited_row), - K(enable_fuse_row_cache), - K(read_snapshot_version), - KPC(read_info), - KPC(access_ctx_->store_ctx_), - K(tables_)); - concurrency_control::ObDataValidationService::set_delay_resource_recycle(access_ctx_->ls_id_); - dump_table_statistic_for_4377(); - dump_tx_statistic_for_4377(access_ctx_->store_ctx_); + ret = handle_4377("[index lookup]ObSingleMerge::inner_get_next_row"); + STORAGE_LOG(WARN, "[index lookup] row not found", K(ret), + K(have_uncommited_row), + K(enable_fuse_row_cache), + K(read_snapshot_version), + KPC(read_info), + K(tables_)); } - rowkey_ = NULL; } else { ret = OB_ITER_END; diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index c3087b87a6..8b135ee7df 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -640,17 +640,17 @@ int ObTransCallbackMgr::remove_callback_for_uncommited_txn(const memtable::ObMem // when leader revoked, writes has not been logged must be discarded // otherwise freeze memtable checkpoint will be blocked on waiting these. -int ObTransCallbackMgr::clean_unlog_callbacks(int64_t &removed_cnt) +int ObTransCallbackMgr::clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction &before_remove) { int ret = OB_SUCCESS; if (need_merge_) { // OLD (before 4.3) - if (OB_FAIL(callback_list_.clean_unlog_callbacks(removed_cnt))) { + if (OB_FAIL(callback_list_.clean_unlog_callbacks(removed_cnt, before_remove))) { TRANS_LOG(WARN, "clean unlog callbacks failed", K(ret)); } } else { // NEW (since 4.3) CALLBACK_LISTS_FOREACH(idx, list) { int64_t rm_cnt = 0; - if (OB_FAIL(list->clean_unlog_callbacks(rm_cnt))) { + if (OB_FAIL(list->clean_unlog_callbacks(rm_cnt, before_remove))) { TRANS_LOG(WARN, "clean unlog callbacks failed", K(ret), K(idx)); } else { removed_cnt += rm_cnt; diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h index 69a73beb91..8a5589ca9b 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h @@ -272,7 +272,7 @@ public: ObIArray &checksum_scn); int update_checksum(const ObIArray &checksum, const ObIArray &checksum_scn); - int clean_unlog_callbacks(int64_t &removed_cnt); + int clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction &before_remove); // when not inc, return -1 int64_t inc_pending_log_size(const int64_t size); void try_merge_multi_callback_lists(const int64_t new_size, const int64_t size, const bool is_logging_blocked); diff --git a/src/storage/memtable/mvcc/ob_tx_callback_functor.h b/src/storage/memtable/mvcc/ob_tx_callback_functor.h index 1881c995c2..cee3252397 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_functor.h +++ b/src/storage/memtable/mvcc/ob_tx_callback_functor.h @@ -452,7 +452,8 @@ private: class ObCleanUnlogCallbackFunctor : public ObITxCallbackFunctor { public: - ObCleanUnlogCallbackFunctor() {} + ObCleanUnlogCallbackFunctor(common::ObFunction &before_remove) + : before_remove_(&before_remove) {} virtual int operator()(ObITransCallback *callback) override { @@ -462,13 +463,17 @@ public: ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "unexpected callback", KP(callback)); } else if (callback->need_submit_log()) { + if (before_remove_) { + before_remove_->operator()(); + before_remove_ = NULL; + } callback->rollback_callback(); need_remove_callback_ = true; } return ret; } - + common::ObFunction *before_remove_; VIRTUAL_TO_STRING_KV("CleanUnlogCallback", "CleanUnlogCallback"); }; diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp index b9919001b5..0034d42a9a 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp @@ -558,10 +558,10 @@ int ObTxCallbackList::sync_log_fail(const ObCallbackScope &callbacks, return ret; } -int ObTxCallbackList::clean_unlog_callbacks(int64_t &removed_cnt) +int ObTxCallbackList::clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction &before_remove) { int ret = OB_SUCCESS; - ObCleanUnlogCallbackFunctor functor; + ObCleanUnlogCallbackFunctor functor(before_remove); LockGuard guard(*this, LOCK_MODE::LOCK_ALL); if (log_cursor_ == &head_) { diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.h b/src/storage/memtable/mvcc/ob_tx_callback_list.h index 2b5f45a4ad..a13deb5601 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.h +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.h @@ -75,7 +75,7 @@ public: // clean_unlog_callbacks will remove all unlogged callbacks. Which is called // when switch to follower forcely. - int clean_unlog_callbacks(int64_t &removed_cnt); + int clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction &before_remove); int fill_log(ObITransCallback* log_cursor, ObTxFillRedoCtx &ctx, ObITxFillRedoFunctor &functor); int submit_log_succ(const ObCallbackScope &callbacks); int sync_log_succ(const share::SCN scn, int64_t sync_cnt); diff --git a/src/storage/memtable/ob_memtable_context.cpp b/src/storage/memtable/ob_memtable_context.cpp index fdb7a42a57..e029415324 100644 --- a/src/storage/memtable/ob_memtable_context.cpp +++ b/src/storage/memtable/ob_memtable_context.cpp @@ -894,18 +894,23 @@ int ObMemtableCtx::remove_callback_for_uncommited_txn(const memtable::ObMemtable int ObMemtableCtx::clean_unlog_callbacks() { int ret = OB_SUCCESS; - int64_t removed_cnt = 0; { + int64_t removed_cnt = 0; + struct BeforeRemoveCallback { + memtable::ObMemtableCtx *mt_; + BeforeRemoveCallback(memtable::ObMemtableCtx *mt) : mt_(mt) {} + void operator()() { + mt_->set_partial_rollbacked(); + } + }; + ObFunction before_remove(BeforeRemoveCallback(this)); ObByteLockGuard guard(lock_); - if (OB_FAIL(trans_mgr_.clean_unlog_callbacks(removed_cnt))) { + if (OB_FAIL(trans_mgr_.clean_unlog_callbacks(removed_cnt, before_remove))) { TRANS_LOG(WARN, "clean unlog callbacks failed", KR(ret)); } else { trans_mgr_.clear_pending_log_size(); } } - if (removed_cnt > 0) { - set_partial_rollbacked(); - } return ret; } diff --git a/src/storage/ob_i_store.cpp b/src/storage/ob_i_store.cpp index 50719cf970..5539054b4b 100644 --- a/src/storage/ob_i_store.cpp +++ b/src/storage/ob_i_store.cpp @@ -131,6 +131,15 @@ void ObStoreCtx::force_print_trace_log() } } +bool ObStoreCtx::is_uncommitted_data_rollbacked() const +{ + bool bret = false; + if (NULL != mvcc_acc_ctx_.tx_ctx_) { + bret = mvcc_acc_ctx_.tx_ctx_->is_data_rollbacked(); + } + return bret; +} + void ObStoreRowLockState::reset() { is_locked_ = false; diff --git a/src/storage/ob_i_store.h b/src/storage/ob_i_store.h index 54662a18a4..12f4137d76 100644 --- a/src/storage/ob_i_store.h +++ b/src/storage/ob_i_store.h @@ -472,6 +472,7 @@ struct ObStoreCtx const int64_t timeout, const int64_t lock_timeout_us, const share::SCN &snapshot_version); + bool is_uncommitted_data_rollbacked() const; void force_print_trace_log(); TO_STRING_KV(KP(this), K_(ls_id), diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 4a9428ecd4..c53cb2f29e 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1535,7 +1535,7 @@ int ObPartTransCtx::gc_ctx_() if (OB_FAIL(prepare_mul_data_source_tx_end_(false))) { TRANS_LOG(WARN, "trans gc need retry", K(ret), K(trans_id_), K(ls_id_)); } else { - TRANS_LOG(INFO, "[TRANS GC] part ctx abort", "context", *this); + TRANS_LOG(INFO, "[TRANS GC] participant will **abort** itself due to scheduler has quit", KPC(this)); REC_TRANS_TRACE_EXT2(tlog_, tx_ctx_gc, OB_ID(ref), get_ref()); if (need_callback_scheduler_()) { TRANS_LOG(INFO, "[TRANS GC] scheduler has down, skip callback scheduler", KP(this), @@ -1563,7 +1563,7 @@ int ObPartTransCtx::check_scheduler_status() TRANS_LOG(WARN, "check rs scheduler is alive error", KR(ret), K(is_alive), KPC(this)); // scheduler已宕机 } else if (!is_alive) { - TRANS_LOG(WARN, "scheduler server is not alive, tx ctx will do gc", KPC(this)); + TRANS_LOG(WARN, "[TRANS GC] scheduler server is not alive, participant will GC", KPC(this)); if (OB_FAIL(gc_ctx_())) { TRANS_LOG(WARN, "force kill part_ctx error", KR(ret), KPC(this)); } @@ -1718,7 +1718,7 @@ int ObPartTransCtx::recover_tx_ctx_table_info(ObTxCtxTableInfo &ctx_info) ctx_source_ = PartCtxSource::TRANSFER_RECOVER; } } - TRANS_LOG(INFO, "recover tx ctx table info succeed", K(ret), KPC(this), K(ctx_info)); + TRANS_LOG(INFO, "[TRANS RECOVERY] recover tx ctx table info succeed", K(ret), KPC(this), K(ctx_info)); } REC_TRANS_TRACE_EXT2(tlog_, @@ -1790,7 +1790,7 @@ int ObPartTransCtx::serialize_tx_ctx_to_buffer(ObTxLocalBuffer &buffer, int64_t serialize_size = pos; } } - + TRANS_LOG(INFO, "[TRANS CHECKPOINT] checkpoint trans ctx", K(ret), K_(trans_id), K_(ls_id), KP(this)); return ret; } diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 2a8d005407..1935b43f80 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -361,6 +361,9 @@ public: // newly added for 4.0 bool is_decided() const { return ctx_tx_data_.is_decided(); } + // check data is rollbacked either partially or totally + // in which case the reader should be failed + bool is_data_rollbacked() const { return mt_ctx_.is_tx_rollbacked(); } int retry_dup_trx_before_prepare( const share::SCN &before_prepare_version, const ObDupTableBeforePrepareRequest::BeforePrepareScnSrc before_prepare_src); diff --git a/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp b/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp index 6af5e3e2d4..d4cd11dd76 100644 --- a/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp +++ b/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp @@ -577,7 +577,8 @@ TEST_F(TestTxCallbackList, remove_callback_by_clean_unlog_callbacks) EXPECT_EQ(9, callback_list_.get_length()); int64_t removed_cnt = 0; - EXPECT_EQ(OB_SUCCESS, callback_list_.clean_unlog_callbacks(removed_cnt)); + ObFunction before_remove = []{}; + EXPECT_EQ(OB_SUCCESS, callback_list_.clean_unlog_callbacks(removed_cnt, before_remove)); EXPECT_EQ(5, callback_list_.get_length()); EXPECT_EQ(4, rollback_cnt_);