check tx is aborted before report 4377 in index lookup

This commit is contained in:
chinaxing 2024-04-25 09:55:11 +00:00 committed by ob-robot
parent 79ee35a820
commit 1d8ebf3f45
15 changed files with 77 additions and 45 deletions

View File

@ -233,18 +233,12 @@ int ObMultipleGetMerge::inner_get_next_row(ObDatumRow &row)
// and if we find that it does not exist, there must be an anomaly
if (GCONF.enable_defensive_check()
&& access_ctx_->query_flag_.is_lookup_for_4377()) {
ret = OB_ERR_DEFENSIVE_CHECK;
ObString func_name = ObString::make_string("[index lookup]ObMultipleGetMerge::inner_get_next_row");
LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr());
LOG_DBA_ERROR(OB_ERR_DEFENSIVE_CHECK, "msg", "Fatal Error!!! Catch a defensive error!", K(ret),
K(rowkeys_),
K(get_row_range_idx_ - 1),
K(rowkeys_->at(get_row_range_idx_ - 1)),
K(fuse_row),
KPC(access_ctx_->store_ctx_));
concurrency_control::ObDataValidationService::set_delay_resource_recycle(access_ctx_->ls_id_);
dump_table_statistic_for_4377();
dump_tx_statistic_for_4377(access_ctx_->store_ctx_);
ret = handle_4377("[index lookup]ObMultipleGetMerge::inner_get_next_row");
STORAGE_LOG(WARN,"[index lookup] row not found", K(ret),
K(rowkeys_),
K(get_row_range_idx_ - 1),
K(rowkeys_->at(get_row_range_idx_ - 1)),
K(fuse_row));
}
}
}

View File

@ -1581,6 +1581,26 @@ void ObMultipleMerge::reuse_lob_locator()
lob_reader_.reuse();
}
int ObMultipleMerge::handle_4377(const char* func)
{
int ret = OB_ERR_DEFENSIVE_CHECK;
// check whether txn is aborted
if (access_ctx_->store_ctx_->is_uncommitted_data_rollbacked()) {
STORAGE_LOG(WARN, "transaction has been aborted", KPC(access_ctx_->store_ctx_));
ret = OB_TRANS_KILLED;
} else {
ObString func_name = ObString::make_string(func);
LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr());
LOG_DBA_ERROR(OB_ERR_DEFENSIVE_CHECK, "msg",
"Fatal Error!!! Catch a defensive error! index lookup: row not found in data-table",
K(ret), KPC(access_ctx_->store_ctx_));
concurrency_control::ObDataValidationService::set_delay_resource_recycle(access_ctx_->ls_id_);
dump_table_statistic_for_4377();
dump_tx_statistic_for_4377(access_ctx_->store_ctx_);
}
return ret;
}
void ObMultipleMerge::dump_tx_statistic_for_4377(ObStoreCtx *store_ctx)
{
int ret = OB_SUCCESS;

View File

@ -91,6 +91,7 @@ protected:
void reset_iter_array(const bool can_reuse = false);
void reuse_iter_array();
void reclaim_iter_array();
int handle_4377(const char* func);
void dump_tx_statistic_for_4377(ObStoreCtx *store_ctx);
void dump_table_statistic_for_4377();
int set_base_version() const;

View File

@ -373,21 +373,14 @@ int ObSingleMerge::inner_get_next_row(ObDatumRow &row)
if (GCONF.enable_defensive_check()
&& access_ctx_->query_flag_.is_lookup_for_4377()
&& OB_ITER_END == ret) {
ret = OB_ERR_DEFENSIVE_CHECK;
ObString func_name = ObString::make_string("[index lookup]ObSingleMerge::inner_get_next_row");
LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr());
LOG_DBA_ERROR(OB_ERR_DEFENSIVE_CHECK, "msg", "Fatal Error!!! Catch a defensive error!", K(ret),
K(have_uncommited_row),
K(enable_fuse_row_cache),
K(read_snapshot_version),
KPC(read_info),
KPC(access_ctx_->store_ctx_),
K(tables_));
concurrency_control::ObDataValidationService::set_delay_resource_recycle(access_ctx_->ls_id_);
dump_table_statistic_for_4377();
dump_tx_statistic_for_4377(access_ctx_->store_ctx_);
ret = handle_4377("[index lookup]ObSingleMerge::inner_get_next_row");
STORAGE_LOG(WARN, "[index lookup] row not found", K(ret),
K(have_uncommited_row),
K(enable_fuse_row_cache),
K(read_snapshot_version),
KPC(read_info),
K(tables_));
}
rowkey_ = NULL;
} else {
ret = OB_ITER_END;

View File

@ -640,17 +640,17 @@ int ObTransCallbackMgr::remove_callback_for_uncommited_txn(const memtable::ObMem
// when leader revoked, writes has not been logged must be discarded
// otherwise freeze memtable checkpoint will be blocked on waiting these.
int ObTransCallbackMgr::clean_unlog_callbacks(int64_t &removed_cnt)
int ObTransCallbackMgr::clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction<void()> &before_remove)
{
int ret = OB_SUCCESS;
if (need_merge_) { // OLD (before 4.3)
if (OB_FAIL(callback_list_.clean_unlog_callbacks(removed_cnt))) {
if (OB_FAIL(callback_list_.clean_unlog_callbacks(removed_cnt, before_remove))) {
TRANS_LOG(WARN, "clean unlog callbacks failed", K(ret));
}
} else { // NEW (since 4.3)
CALLBACK_LISTS_FOREACH(idx, list) {
int64_t rm_cnt = 0;
if (OB_FAIL(list->clean_unlog_callbacks(rm_cnt))) {
if (OB_FAIL(list->clean_unlog_callbacks(rm_cnt, before_remove))) {
TRANS_LOG(WARN, "clean unlog callbacks failed", K(ret), K(idx));
} else {
removed_cnt += rm_cnt;

View File

@ -272,7 +272,7 @@ public:
ObIArray<share::SCN> &checksum_scn);
int update_checksum(const ObIArray<uint64_t> &checksum,
const ObIArray<share::SCN> &checksum_scn);
int clean_unlog_callbacks(int64_t &removed_cnt);
int clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction<void()> &before_remove);
// when not inc, return -1
int64_t inc_pending_log_size(const int64_t size);
void try_merge_multi_callback_lists(const int64_t new_size, const int64_t size, const bool is_logging_blocked);

View File

@ -452,7 +452,8 @@ private:
class ObCleanUnlogCallbackFunctor : public ObITxCallbackFunctor
{
public:
ObCleanUnlogCallbackFunctor() {}
ObCleanUnlogCallbackFunctor(common::ObFunction<void()> &before_remove)
: before_remove_(&before_remove) {}
virtual int operator()(ObITransCallback *callback) override
{
@ -462,13 +463,17 @@ public:
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected callback", KP(callback));
} else if (callback->need_submit_log()) {
if (before_remove_) {
before_remove_->operator()();
before_remove_ = NULL;
}
callback->rollback_callback();
need_remove_callback_ = true;
}
return ret;
}
common::ObFunction<void()> *before_remove_;
VIRTUAL_TO_STRING_KV("CleanUnlogCallback", "CleanUnlogCallback");
};

View File

@ -558,10 +558,10 @@ int ObTxCallbackList::sync_log_fail(const ObCallbackScope &callbacks,
return ret;
}
int ObTxCallbackList::clean_unlog_callbacks(int64_t &removed_cnt)
int ObTxCallbackList::clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction<void()> &before_remove)
{
int ret = OB_SUCCESS;
ObCleanUnlogCallbackFunctor functor;
ObCleanUnlogCallbackFunctor functor(before_remove);
LockGuard guard(*this, LOCK_MODE::LOCK_ALL);
if (log_cursor_ == &head_) {

View File

@ -75,7 +75,7 @@ public:
// clean_unlog_callbacks will remove all unlogged callbacks. Which is called
// when switch to follower forcely.
int clean_unlog_callbacks(int64_t &removed_cnt);
int clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction<void()> &before_remove);
int fill_log(ObITransCallback* log_cursor, ObTxFillRedoCtx &ctx, ObITxFillRedoFunctor &functor);
int submit_log_succ(const ObCallbackScope &callbacks);
int sync_log_succ(const share::SCN scn, int64_t sync_cnt);

View File

@ -894,18 +894,23 @@ int ObMemtableCtx::remove_callback_for_uncommited_txn(const memtable::ObMemtable
int ObMemtableCtx::clean_unlog_callbacks()
{
int ret = OB_SUCCESS;
int64_t removed_cnt = 0;
{
int64_t removed_cnt = 0;
struct BeforeRemoveCallback {
memtable::ObMemtableCtx *mt_;
BeforeRemoveCallback(memtable::ObMemtableCtx *mt) : mt_(mt) {}
void operator()() {
mt_->set_partial_rollbacked();
}
};
ObFunction<void()> before_remove(BeforeRemoveCallback(this));
ObByteLockGuard guard(lock_);
if (OB_FAIL(trans_mgr_.clean_unlog_callbacks(removed_cnt))) {
if (OB_FAIL(trans_mgr_.clean_unlog_callbacks(removed_cnt, before_remove))) {
TRANS_LOG(WARN, "clean unlog callbacks failed", KR(ret));
} else {
trans_mgr_.clear_pending_log_size();
}
}
if (removed_cnt > 0) {
set_partial_rollbacked();
}
return ret;
}

View File

@ -131,6 +131,15 @@ void ObStoreCtx::force_print_trace_log()
}
}
bool ObStoreCtx::is_uncommitted_data_rollbacked() const
{
bool bret = false;
if (NULL != mvcc_acc_ctx_.tx_ctx_) {
bret = mvcc_acc_ctx_.tx_ctx_->is_data_rollbacked();
}
return bret;
}
void ObStoreRowLockState::reset()
{
is_locked_ = false;

View File

@ -472,6 +472,7 @@ struct ObStoreCtx
const int64_t timeout,
const int64_t lock_timeout_us,
const share::SCN &snapshot_version);
bool is_uncommitted_data_rollbacked() const;
void force_print_trace_log();
TO_STRING_KV(KP(this),
K_(ls_id),

View File

@ -1535,7 +1535,7 @@ int ObPartTransCtx::gc_ctx_()
if (OB_FAIL(prepare_mul_data_source_tx_end_(false))) {
TRANS_LOG(WARN, "trans gc need retry", K(ret), K(trans_id_), K(ls_id_));
} else {
TRANS_LOG(INFO, "[TRANS GC] part ctx abort", "context", *this);
TRANS_LOG(INFO, "[TRANS GC] participant will **abort** itself due to scheduler has quit", KPC(this));
REC_TRANS_TRACE_EXT2(tlog_, tx_ctx_gc, OB_ID(ref), get_ref());
if (need_callback_scheduler_()) {
TRANS_LOG(INFO, "[TRANS GC] scheduler has down, skip callback scheduler", KP(this),
@ -1563,7 +1563,7 @@ int ObPartTransCtx::check_scheduler_status()
TRANS_LOG(WARN, "check rs scheduler is alive error", KR(ret), K(is_alive), KPC(this));
// scheduler已宕机
} else if (!is_alive) {
TRANS_LOG(WARN, "scheduler server is not alive, tx ctx will do gc", KPC(this));
TRANS_LOG(WARN, "[TRANS GC] scheduler server is not alive, participant will GC", KPC(this));
if (OB_FAIL(gc_ctx_())) {
TRANS_LOG(WARN, "force kill part_ctx error", KR(ret), KPC(this));
}
@ -1718,7 +1718,7 @@ int ObPartTransCtx::recover_tx_ctx_table_info(ObTxCtxTableInfo &ctx_info)
ctx_source_ = PartCtxSource::TRANSFER_RECOVER;
}
}
TRANS_LOG(INFO, "recover tx ctx table info succeed", K(ret), KPC(this), K(ctx_info));
TRANS_LOG(INFO, "[TRANS RECOVERY] recover tx ctx table info succeed", K(ret), KPC(this), K(ctx_info));
}
REC_TRANS_TRACE_EXT2(tlog_,
@ -1790,7 +1790,7 @@ int ObPartTransCtx::serialize_tx_ctx_to_buffer(ObTxLocalBuffer &buffer, int64_t
serialize_size = pos;
}
}
TRANS_LOG(INFO, "[TRANS CHECKPOINT] checkpoint trans ctx", K(ret), K_(trans_id), K_(ls_id), KP(this));
return ret;
}

View File

@ -361,6 +361,9 @@ public:
// newly added for 4.0
bool is_decided() const { return ctx_tx_data_.is_decided(); }
// check data is rollbacked either partially or totally
// in which case the reader should be failed
bool is_data_rollbacked() const { return mt_ctx_.is_tx_rollbacked(); }
int retry_dup_trx_before_prepare(
const share::SCN &before_prepare_version,
const ObDupTableBeforePrepareRequest::BeforePrepareScnSrc before_prepare_src);

View File

@ -577,7 +577,8 @@ TEST_F(TestTxCallbackList, remove_callback_by_clean_unlog_callbacks)
EXPECT_EQ(9, callback_list_.get_length());
int64_t removed_cnt = 0;
EXPECT_EQ(OB_SUCCESS, callback_list_.clean_unlog_callbacks(removed_cnt));
ObFunction<void()> before_remove = []{};
EXPECT_EQ(OB_SUCCESS, callback_list_.clean_unlog_callbacks(removed_cnt, before_remove));
EXPECT_EQ(5, callback_list_.get_length());
EXPECT_EQ(4, rollback_cnt_);