diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index b35bb5da7..c3087b87a 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -200,8 +200,10 @@ void ObTransCallbackMgr::reset() int cnt = need_merge_ ? MAX_CALLBACK_LIST_COUNT : MAX_CALLBACK_LIST_COUNT - 1; for (int i = 0; i < cnt; ++i) { if (!callback_lists_[i].empty()) { - ob_abort(); TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "txn callback list is broken", K(stat), K(i), K(this)); +#ifdef ENABLE_DEBUG_LOG + ob_abort(); +#endif } } } @@ -250,7 +252,9 @@ void ObTransCallbackMgr::free_mvcc_row_callback(ObITransCallback *cb) cb_allocators_[owner - 1].free(cb); } else { TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected cb", KPC(cb)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } } @@ -368,24 +372,31 @@ int ObTransCallbackMgr::append(ObITransCallback *node) // this log has been accumulated, it will not be required in all calback-list if (parallel_replay_) { ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "parallel replay an serial log", K(ret), KPC(this)); + TRANS_LOG(ERROR, "parallel replay a serial log", K(ret), KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } - if (OB_UNLIKELY(!has_branch_replayed_into_first_list_)) { + if (OB_SUCC(ret) && OB_UNLIKELY(!has_branch_replayed_into_first_list_)) { // sanity check: the serial_final_seq_no must be set // which will be used in replay `rollback branch savepoint` log if (OB_UNLIKELY(!serial_final_seq_no_.is_valid())) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "serial_final_seq_no is invalid", K(ret), KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif + } else { + ATOMIC_STORE(&has_branch_replayed_into_first_list_, true); + TRANS_LOG(INFO, "replay log before serial final when reach serial final", + KPC(this), KPC(get_trans_ctx()), KPC(node)); } - ATOMIC_STORE(&has_branch_replayed_into_first_list_, true); - TRANS_LOG(INFO, "replay log before serial final when reach serial final", - KPC(this), KPC(get_trans_ctx()), KPC(node)); } slot = 0; } - if (slot == 0) { + + if (OB_FAIL(ret)) { + } else if (slot == 0) { // no parallel and no branch requirement ret = callback_list_.append_callback(node, for_replay_, parallel_replay_, is_serial_final_()); // try to extend callback_lists_ if required @@ -441,10 +452,12 @@ void ObTransCallbackMgr::after_append(ObITransCallback *node, const int ret_code int ObTransCallbackMgr::rollback_to(const ObTxSEQ to_seq_no, const ObTxSEQ from_seq_no, - const share::SCN replay_scn) + const share::SCN replay_scn, + int64_t &remove_cnt) { int ret = OB_SUCCESS; int slot = -1; + remove_cnt = callback_remove_for_rollback_to_count_; if (OB_LIKELY(to_seq_no.support_branch())) { // since 4.3 // it is a global savepoint, rollback on all list if (to_seq_no.get_branch() == 0) { @@ -480,6 +493,7 @@ int ObTransCallbackMgr::rollback_to(const ObTxSEQ to_seq_no, if (OB_FAIL(ret)) { TRANS_LOG(WARN, "rollback to fail", K(ret), K(slot), K(from_seq_no), K(to_seq_no)); } + remove_cnt = callback_remove_for_rollback_to_count_ - remove_cnt; return ret; } @@ -1144,7 +1158,9 @@ int ObTransCallbackMgr::log_submitted(const ObCallbackScopeArray &callbacks, sha OB_ASSERT(iter->need_submit_log()); if (OB_FAIL(iter->log_submitted_cb(scn, last_mt))) { TRANS_LOG(ERROR, "fail to log_submitted cb", K(ret), KPC(iter)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } // check dup table tx else if(check_dup_tablet_(iter)) { // mem_ctx_->get_trans_ctx()->set_dup_table_tx_(); @@ -1181,7 +1197,11 @@ int ObTransCallbackMgr::log_sync_succ(const ObCallbackScopeArray &callbacks, sync_cnt += scope.cnt_; } } else { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "callback scope is null", K(ret), K(scope), K(scn), KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } } return ret; @@ -2201,7 +2221,10 @@ inline ObTxCallbackList *ObTransCallbackMgr::get_callback_list_(const int16_t in OB_ASSERT(index < MAX_CALLBACK_LIST_COUNT); return &callback_lists_[index - 1]; } else if (!nullable) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback list is null", K(index)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } return NULL; } @@ -2214,8 +2237,10 @@ void ObTransCallbackMgr::check_all_redo_flushed() ok &= list->check_all_redo_flushed(false/*quite*/); } if (!ok) { - usleep(1000000); + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "has redo not flushed", KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } } __attribute__((noinline)) diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h index 8f548595f..69a73beb9 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h @@ -245,7 +245,8 @@ public: int replay_succ(const int16_t callback_list_idx, const share::SCN scn); int rollback_to(const transaction::ObTxSEQ seq_no, const transaction::ObTxSEQ from_seq_no, - const share::SCN replay_scn); + const share::SCN replay_scn, + int64_t &remove_cnt); void set_for_replay(const bool for_replay); bool is_for_replay() const { return ATOMIC_LOAD(&for_replay_); } int remove_callbacks_for_fast_commit(const int16_t callback_list_idx, const share::SCN stop_scn); diff --git a/src/storage/memtable/mvcc/ob_tx_callback_functor.h b/src/storage/memtable/mvcc/ob_tx_callback_functor.h index 48982eb1f..1881c995c 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_functor.h +++ b/src/storage/memtable/mvcc/ob_tx_callback_functor.h @@ -97,8 +97,12 @@ public: // case3: the callback has not been sync successfully is_iter_end = true; } else if (callback->get_scn().is_min()) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback scn is min_scn", KPC(callback)); +#ifdef ENABLE_DEBUG_LOG usleep(5000); ob_abort(); +#endif + is_iter_end = true; } else if (0 >= need_remove_count_ && callback->get_scn() != last_scn_for_remove_) { // case4: the callback has exceeded the last log whose log ts need to be // removed @@ -400,8 +404,11 @@ public: /* ret = OB_ERR_UNEXPECTED; */ /* TRANS_LOG(ERROR, "unexpected callback", KP(callback)); */ } else if (is_commit_ && callback->get_scn().is_max()) { - TRANS_LOG(ERROR, "callback has not submitted log yet when commit callback", KP(callback)); + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "callback has not submitted log yet when commit callback", K(ret), KP(callback)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } else if (is_commit_ && OB_FAIL(callback->trans_commit())) { TRANS_LOG(ERROR, "trans commit failed", KPC(callback)); diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp index a6a442825..b9919001b 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp @@ -56,11 +56,15 @@ void ObTxCallbackList::reset() { if (length_ + removed_ != appended_) { TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "BUG:list state insanity", KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } if (length_ + removed_ != logged_ + unlog_removed_) { TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "BUG:list state insanity", KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } head_.set_prev(&head_); head_.set_next(&head_); @@ -112,29 +116,31 @@ int ObTxCallbackList::append_callback(ObITransCallback *callback, && append_pos->get_scn() > callback->get_scn()) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "replay callback scn out of order", K(ret), KPC(callback), KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif + } else { + append_pos->append(callback); + // start parallel replay, remember the position + if (for_replay && parallel_replay && !serial_final && !parallel_start_pos_) { + ATOMIC_STORE(¶llel_start_pos_, get_tail()); + } + ++appended_; + ATOMIC_INC(&length_); + data_size_ += callback->get_data_size(); + if (repos_lc) { + log_cursor_ = get_tail(); + } + if (for_replay) { + ++logged_; + ++synced_; + } + // Once callback is appended into callback lists, we can not handle the + // error after it. So it should never report the error later. What's more, + // after_append also should never return the error. + (void)callback->after_append_cb(for_replay); } - append_pos->append(callback); - // start parallel replay, remember the position - if (for_replay && parallel_replay && !serial_final && !parallel_start_pos_) { - ATOMIC_STORE(¶llel_start_pos_, get_tail()); - } - ++appended_; - ATOMIC_INC(&length_); - data_size_ += callback->get_data_size(); - if (repos_lc) { - log_cursor_ = get_tail(); - } - if (for_replay) { - ++logged_; - ++synced_; - } - // Once callback is appended into callback lists, we can not handle the - // error after it. So it should never report the error later. What's more, - // after_append also should never return the error. - (void)callback->after_append_cb(for_replay); } - return ret; } @@ -203,76 +209,82 @@ int ObTxCallbackList::callback_(ObITxCallbackFunctor &functor, OB_SUCC(ret) && !iter_end && iter != NULL && iter != end; iter = next) { functor.refresh(); - if (iter->get_scn().is_min()) { + if (OB_UNLIKELY(iter->get_scn().is_min())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "callback with min_scn", K(ret), KPC(iter), KPC(this)); +#ifdef ENABLE_DEBUG_LOG usleep(5000); ob_abort(); - } - if (functor.is_iter_end(iter)) { +#endif + } else if (functor.is_iter_end(iter)) { iter_end = true; } else { next = (is_reverse ? iter->get_prev() : iter->get_next()); if (OB_FAIL(functor(iter))) { // don't print log, print it in functor } else if (functor.need_remove_callback()) { - if (removed_ && (removed_ % 10000 == 0)) { - uint64_t checksum_now = batch_checksum_.calc(); - TRANS_LOG(INFO, "[CallbackList] remove-callback", K(checksum_now), KPC(this)); - } const share::SCN iter_scn = iter->get_scn(); - // the del operation must serialize with append operation - // if it is operating on the list tail - bool deleted = false; - if (next == end) { - if (!lock_state.APPEND_LOCKED_) { - LockGuard guard(*this, LOCK_MODE::LOCK_APPEND); - ret = iter->del(); - deleted = true; - } else { - ret = iter->del(); - deleted = true; - } - } - if ((deleted && OB_FAIL(ret)) || (!deleted && OB_FAIL(iter->del()))) { - TRANS_LOG(ERROR, "remove callback failed", K(ret), KPC(iter), K(deleted)); + // sanity check before remove: + // should not remove parallel replayed callback before serial replay finished + if (parallel_start_pos_ + && !is_skip_checksum_() + && !callback_mgr_.is_serial_final() + && iter_scn >= parallel_start_pos_->get_scn() + && iter_scn <= sync_scn_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "remove parallel callback while serial part not all replayed", K(ret), KPC(iter), KPC(this)); +#ifdef ENABLE_DEBUG_LOG + usleep(5000); + ob_abort(); +#endif } else { - if (parallel_start_pos_ - && !is_skip_checksum_() - && !callback_mgr_.is_serial_final() - && iter_scn >= parallel_start_pos_->get_scn() - && iter_scn <= sync_scn_) { - TRANS_LOG(ERROR, "should not remove this callback", KPC(iter)); - usleep(5000); - ob_abort(); + if (removed_ && (removed_ % 10000 == 0)) { + uint64_t checksum_now = batch_checksum_.calc(); + TRANS_LOG(INFO, "[CallbackList] remove-callback", K(checksum_now), KPC(this)); } - if (log_cursor_ == iter) { - log_cursor_ = next; - } - if (parallel_start_pos_ == iter) { - parallel_start_pos_ = (next == &head_) ? NULL : next; - } - ++removed_; - if (iter->need_submit_log()) { - ++unlog_removed_; - } - - ++remove_count; - - if (iter->is_need_free()) { - if (iter->is_table_lock_callback()) { - callback_mgr_.get_ctx().free_table_lock_callback(iter); - } else if (MutatorType::MUTATOR_ROW_EXT_INFO == iter->get_mutator_type()) { - callback_mgr_.get_ctx().free_ext_info_callback(iter); + // the del operation must serialize with append operation + // if it is operating on the list tail + bool deleted = false; + if (next == end) { + if (!lock_state.APPEND_LOCKED_) { + LockGuard guard(*this, LOCK_MODE::LOCK_APPEND); + ret = iter->del(); + deleted = true; } else { - callback_mgr_.get_ctx().free_mvcc_row_callback(iter); + ret = iter->del(); + deleted = true; + } + } + if ((deleted && OB_FAIL(ret)) || (!deleted && OB_FAIL(iter->del()))) { + TRANS_LOG(ERROR, "remove callback failed", K(ret), KPC(iter), K(deleted)); + } else { + if (log_cursor_ == iter) { + log_cursor_ = next; + } + if (parallel_start_pos_ == iter) { + parallel_start_pos_ = (next == &head_) ? NULL : next; + } + ++removed_; + if (iter->need_submit_log()) { + ++unlog_removed_; + } + ++remove_count; + if (iter->is_need_free()) { + if (iter->is_table_lock_callback()) { + callback_mgr_.get_ctx().free_table_lock_callback(iter); + } else if (MutatorType::MUTATOR_ROW_EXT_INFO == iter->get_mutator_type()) { + callback_mgr_.get_ctx().free_ext_info_callback(iter); + } else { + callback_mgr_.get_ctx().free_mvcc_row_callback(iter); + } } } } } - - if ((++traverse_count & 0xFFFFF) == 0) { - TRANS_LOG(WARN, "memtable fifo callback too long", - K(traverse_count), K(remove_count), K(functor)); - } + } + if ((++traverse_count & 0xFFFFF) == 0) { + TRANS_LOG(WARN, "memtable fifo callback too long", + K(traverse_count), K(remove_count), K(functor)); } } functor.set_statistics(traverse_count, remove_count); @@ -630,11 +642,13 @@ int ObTxCallbackList::tx_commit() if (OB_FAIL(callback_(functor, guard.state_))) { TRANS_LOG(WARN, "trans commit failed", K(ret), K(functor)); } else if (length_ != 0) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "callback list has not been cleaned after commit callback", K(ret), K(functor)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); - } else { - callback_mgr_.add_tx_end_callback_remove_cnt(functor.get_remove_cnt()); +#endif } - + callback_mgr_.add_tx_end_callback_remove_cnt(functor.get_remove_cnt()); return ret; } @@ -647,10 +661,13 @@ int ObTxCallbackList::tx_abort() if (OB_FAIL(callback_(functor, guard.state_))) { TRANS_LOG(WARN, "trans abort failed", K(ret), K(functor)); } else if (length_ != 0) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "callback list has not been cleaned after abort", K(ret), K(functor)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); - } else { - callback_mgr_.add_tx_end_callback_remove_cnt(functor.get_remove_cnt()); +#endif } + callback_mgr_.add_tx_end_callback_remove_cnt(functor.get_remove_cnt()); return ret; } diff --git a/src/storage/memtable/ob_memtable_context.cpp b/src/storage/memtable/ob_memtable_context.cpp index 63f54a520..fdb7a42a5 100644 --- a/src/storage/memtable/ob_memtable_context.cpp +++ b/src/storage/memtable/ob_memtable_context.cpp @@ -129,10 +129,15 @@ void ObMemtableCtx::reset() } if (OB_UNLIKELY(callback_alloc_count_ != callback_free_count_)) { TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback alloc and free count not match", K(*this)); +#ifdef ENABLE_DEBUG_LOG + ob_abort(); +#endif } if (OB_UNLIKELY(unsubmitted_cnt_ != 0)) { TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "txn unsubmitted cnt not zero", K(*this), K(unsubmitted_cnt_)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } if (OB_TRANS_KILLED != end_code_) { // _NOTE_: skip when txn was forcedly killed @@ -144,7 +149,9 @@ void ObMemtableCtx::reset() if (OB_UNLIKELY(fill != sync_succ + sync_fail)) { TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "redo filled_count != sync_succ + sync_fail", KPC(this), K(fill), K(sync_succ), K(sync_fail)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } } is_inited_ = false; @@ -514,8 +521,10 @@ int ObMemtableCtx::do_trans_end( // after a transaction finishes, callback memory should be released // and check memory leakage if (OB_UNLIKELY(ATOMIC_LOAD(&callback_alloc_count_) != ATOMIC_LOAD(&callback_free_count_))) { - TRANS_LOG(ERROR, "callback alloc and free count not match", K(*this)); - ob_abort(); // for easy debug, remove later + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback alloc and free count not match", KPC(this)); +#ifdef ENABLE_DEBUG_LOG + ob_abort(); +#endif } // release durable table lock if (OB_FAIL(ret)) { @@ -585,7 +594,10 @@ int ObMemtableCtx::trans_replay_end(const bool commit, "checksum_replayed", checksum_collapsed, "checksum_before_collapse", replay_checksum, K(checksum_signature), KPC(this)); +#ifdef ENABLE_DEBUG_LOG + ob_usleep(5000); ob_abort(); +#endif } } } @@ -818,9 +830,9 @@ int ObMemtableCtx::rollback(const transaction::ObTxSEQ to_seq_no, const share::SCN replay_scn) { int ret = OB_SUCCESS; - common::ObTimeGuard timeguard("remove callbacks for rollback to", 10 * 1000); + const int64_t start_ts = common::ObClockGenerator::getClock(); ObByteLockGuard guard(lock_); - + int64_t remove_cnt = 0; if (!to_seq_no.is_valid() || !from_seq_no.is_valid()) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid argument", K(ret), K(from_seq_no), K(to_seq_no)); @@ -829,13 +841,14 @@ int ObMemtableCtx::rollback(const transaction::ObTxSEQ to_seq_no, TRANS_LOG(WARN, "ctx is NULL", K(ret)); } else if (OB_FAIL(reuse_log_generator_())) { TRANS_LOG(ERROR, "fail to reset log generator", K(ret)); - } else if (OB_FAIL(trans_mgr_.rollback_to(to_seq_no, from_seq_no, replay_scn))) { + } else if (OB_FAIL(trans_mgr_.rollback_to(to_seq_no, from_seq_no, replay_scn, remove_cnt))) { TRANS_LOG(WARN, "rollback to failed", K(ret), K(*this)); // rollback the table lock that with no tablelock callback } else if (OB_FAIL(rollback_table_lock_(to_seq_no, from_seq_no))) { TRANS_LOG(WARN, "rollback table lock failed", K(ret), K(*this), K(to_seq_no)); } else { - TRANS_LOG(INFO, "memtable handle rollback to successfuly", K(from_seq_no), K(to_seq_no), K(*this)); + const int64_t elapsed = common::ObClockGenerator::getClock() - start_ts; + TRANS_LOG(INFO, "memtable handle rollback to successfuly", K(from_seq_no), K(to_seq_no), K(remove_cnt), K(elapsed), KPC(this)); } return ret; } diff --git a/src/storage/memtable/ob_redo_log_generator.cpp b/src/storage/memtable/ob_redo_log_generator.cpp index 8992ee934..75457bea9 100644 --- a/src/storage/memtable/ob_redo_log_generator.cpp +++ b/src/storage/memtable/ob_redo_log_generator.cpp @@ -96,12 +96,19 @@ public: ctx_.next_epoch_ = iter->get_epoch(); } else if (iter->get_epoch() < ctx_.epoch_from_) { ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "found callback with epoch less than `from`", K(ret), KPC(iter), K(ctx_)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } else if (FALSE_IT(ctx_.cur_epoch_ = iter->get_epoch())) { } else if (!iter->need_submit_log()) { // this should not happend // because log_cursor is _strictly_ point to the right next to logging position + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "found callback has been logged, maybe log_cursor value is insane", K(ret), KPC(iter), K(ctx_)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } else if (iter->is_logging_blocked()) { ret = OB_BLOCK_FROZEN; ctx_.last_log_blocked_memtable_ = static_cast(iter->get_memtable()); diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index ddcf935c2..4a9428ecd 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1942,8 +1942,8 @@ int ObPartTransCtx::serial_submit_redo_after_write_() ret = submitter.serial_submit(should_switch); if (should_switch && submitter.get_submitted_cnt() > 0) { const share::SCN serial_final_scn = submitter.get_submitted_scn(); - switch_to_parallel_logging_(serial_final_scn, exec_info_.max_submitted_seq_no_); - TRANS_LOG(INFO, "**switch to parallel logging**", + int tmp_ret = switch_to_parallel_logging_(serial_final_scn, exec_info_.max_submitted_seq_no_); + TRANS_LOG(INFO, "**leader switch to parallel logging**", K(tmp_ret), K_(ls_id), K_(trans_id), K(serial_final_scn), "serial_final_seq_no", exec_info_.serial_final_seq_no_, @@ -2206,8 +2206,10 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "cb arg array is empty", K(ret), KPC(this)); print_trace_log_(); +#ifdef ENABLE_DEBUG_LOG usleep(5000); ob_abort(); +#endif } if (log_cb->is_callbacked()) { #ifndef NDEBUG @@ -2224,9 +2226,12 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) busy_cbs_.remove(log_cb); return_log_cb_(log_cb); } else { - TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback was missed when tx ctx exiting", KPC(log_cb), KPC(this)); + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "callback was missed when tx ctx exiting", K(ret), KPC(log_cb), KPC(this)); print_trace_log_(); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } } else { // save the first error code @@ -2630,8 +2635,10 @@ int ObPartTransCtx::on_failure(ObTxLogCb *log_cb) int ret = OB_SUCCESS; share::SCN max_committed_scn; if (OB_FAIL(ls_tx_ctx_mgr_->get_ls_log_adapter()->get_palf_committed_max_scn(max_committed_scn))) { - TRANS_LOG(ERROR, "get palf max committed scn fail, need retry", K(ret)); - ob_abort(); // fast abort for easy debug, TODO(yunxing.cyx) change to return do retry + TRANS_LOG(ERROR, "get palf max committed scn fail, need retry", K(ret), KPC(this)); +#ifdef ENABLE_DEBUG_LOG + ob_abort(); // fast abort for easy debug +#endif } else { TRANS_LOG(INFO, "succ get palf max_commited_scn", K(max_committed_scn), KPC(log_cb)); } @@ -5124,10 +5131,12 @@ int ObPartTransCtx::replay_redo_in_ctx(const ObTxRedoLog &redo_log, if (!is_tx_log_queue) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "serial final redo must be in tx_log_queue", K(ret), KPC(this), K(timestamp)); +#ifdef ENABLE_DEBUG_LOG usleep(50_ms); ob_abort(); +#endif } else if (!exec_info_.serial_final_scn_.is_valid()) { - switch_to_parallel_logging_(timestamp, max_seq_no); + ret = switch_to_parallel_logging_(timestamp, max_seq_no); } } } @@ -5230,8 +5239,10 @@ int ObPartTransCtx::replay_rollback_to(const ObTxRollbackToLog &log, && !pre_barrier) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "missing pre barrier flag for parallel replay", KR(ret), K(*this)); +#ifdef ENABLE_DEBUG_LOG usleep(5000); ob_abort(); +#endif } else if (OB_FAIL(check_replay_avaliable_(offset, timestamp, part_log_no, need_replay))) { TRANS_LOG(WARN, "check replay available failed", KR(ret), K(offset), K(timestamp), K(*this)); } else if (!need_replay) { @@ -5285,7 +5296,9 @@ int ObPartTransCtx::replay_rollback_to(const ObTxRollbackToLog &log, } else { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "code should not go here", K(ret), K(timestamp), K_(trans_id), KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } } if (OB_SUCC(ret) && @@ -5450,8 +5463,10 @@ int ObPartTransCtx::replay_commit_info(const ObTxCommitInfoLog &commit_info_log, if (is_parallel_logging() && !pre_barrier) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "missing pre barrier flag for parallel replay", KR(ret), K(*this)); +#ifdef ENABLE_DEBUG_LOG usleep(5000); ob_abort(); +#endif } else if (OB_FAIL(check_replay_avaliable_(offset, timestamp, part_log_no, need_replay))) { TRANS_LOG(WARN, "check replay available failed", KR(ret), K(offset), K(timestamp), K(*this)); } else if (!need_replay) { @@ -5739,10 +5754,6 @@ int ObPartTransCtx::replay_commit(const ObTxCommitLog &commit_log, cluster_version_, checksum))) { TRANS_LOG(WARN, "trans replay commit failed", KR(ret), K(commit_log), KPC(this)); - if (OB_CHECKSUM_ERROR == ret) { - usleep(500000); - ob_abort(); - } } else if ((!ctx_tx_data_.is_read_only()) && OB_FAIL(ctx_tx_data_.insert_into_tx_table())) { TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this)); } else if (is_local_tx_()) { @@ -10105,30 +10116,37 @@ inline bool ObPartTransCtx::is_support_parallel_replay_() const return cluster_version_accurate_ && cluster_version_ >= CLUSTER_VERSION_4_3_0_0; } -inline void ObPartTransCtx::switch_to_parallel_logging_(const share::SCN serial_final_scn, - const ObTxSEQ max_seq_no) +inline int ObPartTransCtx::switch_to_parallel_logging_(const share::SCN serial_final_scn, + const ObTxSEQ max_seq_no) { - // when start replaying serial final redo log or submitted serial final redo log - // switch the Tx's logging mode to parallel logging - // this include mark serial final scn point in exec_info_ - // and notify callback_mgr to remember the serial_final_scn - // which used to for check whether the callback-list has replayed continuously - // or all of it's serial logs has been synced continously - // if reach these condition, the checksum calculations of callback-list can continues - // into the parallel logged part - exec_info_.serial_final_scn_.atomic_set(serial_final_scn); - // remember the max of seq_no of redos currently submitted - // if an rollback to savepoint before this point, which means - // replay of this rollback-savepoint-log must pre-berrier to - // wait serial replay parts finished + int ret = OB_SUCCESS; if (OB_UNLIKELY(!max_seq_no.is_valid())) { - TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "max seq_no of serial final log is invalid", - K(serial_final_scn), K(max_seq_no), KPC(this)); + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "max seq_no of serial final log is invalid", + K(ret), K(serial_final_scn), K(max_seq_no), KPC(this)); print_trace_log_(); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } - exec_info_.serial_final_seq_no_ = max_seq_no; - mt_ctx_.set_parallel_logging(serial_final_scn, max_seq_no); + if (OB_SUCC(ret)) { + // when start replaying serial final redo log or submitted serial final redo log + // switch the Tx's logging mode to parallel logging + // this include mark serial final scn point in exec_info_ + // and notify callback_mgr to remember the serial_final_scn + // which used to for check whether the callback-list has replayed continuously + // or all of it's serial logs has been synced continously + // if reach these condition, the checksum calculations of callback-list can continues + // into the parallel logged part + exec_info_.serial_final_scn_.atomic_set(serial_final_scn); + // remember the max of seq_no of redos currently submitted + // if an rollback to savepoint before this point, which means + // replay of this rollback-savepoint-log must pre-berrier to + // wait serial replay parts finished + exec_info_.serial_final_seq_no_ = max_seq_no; + mt_ctx_.set_parallel_logging(serial_final_scn, max_seq_no); + } + return ret; } inline void ObPartTransCtx::recovery_parallel_logging_() diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index b23112c1f..2a8d00540 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -549,8 +549,8 @@ private: share::SCN &scn, bool need_free_extra_cb = false); bool should_switch_to_parallel_logging_(); - void switch_to_parallel_logging_(const share::SCN serial_final_scn, - const ObTxSEQ max_seq_no); + int switch_to_parallel_logging_(const share::SCN serial_final_scn, + const ObTxSEQ max_seq_no); bool has_replay_serial_final_() const; void recovery_parallel_logging_(); int check_can_submit_redo_(); diff --git a/src/storage/tx/ob_tx_redo_submitter.h b/src/storage/tx/ob_tx_redo_submitter.h index c73227c9f..4cc5f1a61 100644 --- a/src/storage/tx/ob_tx_redo_submitter.h +++ b/src/storage/tx/ob_tx_redo_submitter.h @@ -257,8 +257,11 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info) if (ctx.fill_count_ == 0) { // BIG_ROW has been handled in `fill_log_block_` ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "should not reach here", K(ret)); + TRANS_LOG(ERROR, "should not reach here", K(ret), K(ctx), KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif + break; } else { fill_ret = OB_SUCCESS; } @@ -277,9 +280,11 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info) } else { // serial logging, shouldn't reach here ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "oops! fatal panic", K(ret), KPC(this)); - usleep(1000); + TRANS_LOG(ERROR, "oops! fatal panic", K(ret), K(ctx), KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif + break; } } else if (OB_EAGAIN == fill_ret) { // this list is all filled, but others remains @@ -288,14 +293,18 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info) } else { // serial logging, shouldn't reach here ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "oops! fatal panic", K(ret), KPC(this)); + TRANS_LOG(ERROR, "oops! fatal panic", K(ret), K(ctx), KPC(this)); +#ifdef ENABLE_DEBUG_LOG usleep(1000); ob_abort(); +#endif + break; } } else { stop = true; ret = fill_ret; } + // start submit out filled log block bool submitted = false; int submit_ret = OB_SUCCESS; @@ -360,8 +369,10 @@ int ObTxRedoSubmitter::after_submit_redo_out_() int ret = OB_SUCCESS; if (OB_FAIL(mt_ctx_.log_submitted(*helper_))) { TRANS_LOG(WARN, "callback memctx fail", K(ret)); +#ifdef ENABLE_DEBUG_LOG usleep(1000); ob_abort(); +#endif } helper_->reset(); return ret; @@ -373,7 +384,9 @@ int ObTxRedoSubmitter::prepare_() int ret = OB_SUCCESS; if (OB_NOT_NULL(log_cb_)) { ret = OB_ERR_UNEXPECTED; +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } else { ret = tx_ctx_.prepare_for_submit_redo(log_cb_, *log_block_, serial_final_); } diff --git a/src/storage/tx/ob_tx_replay_executor.cpp b/src/storage/tx/ob_tx_replay_executor.cpp index f3b272794..8788add38 100644 --- a/src/storage/tx/ob_tx_replay_executor.cpp +++ b/src/storage/tx/ob_tx_replay_executor.cpp @@ -253,8 +253,6 @@ int ObTxReplayExecutor::replay_tx_log_(const ObTxLogType log_type) ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "[Replay Tx] Unknown Log Type in replay buf", K(log_type), KPC(this)); - usleep(100000); - ob_abort(); } } return ret; @@ -656,7 +654,9 @@ int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo, const bool s if (OB_UNLIKELY(!seq_no.is_valid())) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "seq no is invalid in mutator row", K(seq_no), KPC(this)); +#ifdef ENABLE_DEBUG_LOG ob_abort(); +#endif } if (seq_no.get_seq() > max_seq_no.get_seq()) { max_seq_no = seq_no;