do not exist observer when replay log and submit log raise exception

This commit is contained in:
chinaxing 2024-04-24 15:24:55 +00:00 committed by ob-robot
parent 5d4aacc220
commit 7a77c1b7a8
10 changed files with 234 additions and 133 deletions

View File

@ -200,8 +200,10 @@ void ObTransCallbackMgr::reset()
int cnt = need_merge_ ? MAX_CALLBACK_LIST_COUNT : MAX_CALLBACK_LIST_COUNT - 1;
for (int i = 0; i < cnt; ++i) {
if (!callback_lists_[i].empty()) {
ob_abort();
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "txn callback list is broken", K(stat), K(i), K(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
}
}
@ -250,7 +252,9 @@ void ObTransCallbackMgr::free_mvcc_row_callback(ObITransCallback *cb)
cb_allocators_[owner - 1].free(cb);
} else {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected cb", KPC(cb));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
}
@ -368,24 +372,31 @@ int ObTransCallbackMgr::append(ObITransCallback *node)
// this log has been accumulated, it will not be required in all calback-list
if (parallel_replay_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "parallel replay an serial log", K(ret), KPC(this));
TRANS_LOG(ERROR, "parallel replay a serial log", K(ret), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
if (OB_UNLIKELY(!has_branch_replayed_into_first_list_)) {
if (OB_SUCC(ret) && OB_UNLIKELY(!has_branch_replayed_into_first_list_)) {
// sanity check: the serial_final_seq_no must be set
// which will be used in replay `rollback branch savepoint` log
if (OB_UNLIKELY(!serial_final_seq_no_.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "serial_final_seq_no is invalid", K(ret), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
} else {
ATOMIC_STORE(&has_branch_replayed_into_first_list_, true);
TRANS_LOG(INFO, "replay log before serial final when reach serial final",
KPC(this), KPC(get_trans_ctx()), KPC(node));
}
ATOMIC_STORE(&has_branch_replayed_into_first_list_, true);
TRANS_LOG(INFO, "replay log before serial final when reach serial final",
KPC(this), KPC(get_trans_ctx()), KPC(node));
}
slot = 0;
}
if (slot == 0) {
if (OB_FAIL(ret)) {
} else if (slot == 0) {
// no parallel and no branch requirement
ret = callback_list_.append_callback(node, for_replay_, parallel_replay_, is_serial_final_());
// try to extend callback_lists_ if required
@ -441,10 +452,12 @@ void ObTransCallbackMgr::after_append(ObITransCallback *node, const int ret_code
int ObTransCallbackMgr::rollback_to(const ObTxSEQ to_seq_no,
const ObTxSEQ from_seq_no,
const share::SCN replay_scn)
const share::SCN replay_scn,
int64_t &remove_cnt)
{
int ret = OB_SUCCESS;
int slot = -1;
remove_cnt = callback_remove_for_rollback_to_count_;
if (OB_LIKELY(to_seq_no.support_branch())) { // since 4.3
// it is a global savepoint, rollback on all list
if (to_seq_no.get_branch() == 0) {
@ -480,6 +493,7 @@ int ObTransCallbackMgr::rollback_to(const ObTxSEQ to_seq_no,
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "rollback to fail", K(ret), K(slot), K(from_seq_no), K(to_seq_no));
}
remove_cnt = callback_remove_for_rollback_to_count_ - remove_cnt;
return ret;
}
@ -1144,7 +1158,9 @@ int ObTransCallbackMgr::log_submitted(const ObCallbackScopeArray &callbacks, sha
OB_ASSERT(iter->need_submit_log());
if (OB_FAIL(iter->log_submitted_cb(scn, last_mt))) {
TRANS_LOG(ERROR, "fail to log_submitted cb", K(ret), KPC(iter));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
} // check dup table tx
else if(check_dup_tablet_(iter)) {
// mem_ctx_->get_trans_ctx()->set_dup_table_tx_();
@ -1181,7 +1197,11 @@ int ObTransCallbackMgr::log_sync_succ(const ObCallbackScopeArray &callbacks,
sync_cnt += scope.cnt_;
}
} else {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "callback scope is null", K(ret), K(scope), K(scn), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
}
return ret;
@ -2201,7 +2221,10 @@ inline ObTxCallbackList *ObTransCallbackMgr::get_callback_list_(const int16_t in
OB_ASSERT(index < MAX_CALLBACK_LIST_COUNT);
return &callback_lists_[index - 1];
} else if (!nullable) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback list is null", K(index));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
return NULL;
}
@ -2214,8 +2237,10 @@ void ObTransCallbackMgr::check_all_redo_flushed()
ok &= list->check_all_redo_flushed(false/*quite*/);
}
if (!ok) {
usleep(1000000);
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "has redo not flushed", KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
}
__attribute__((noinline))

View File

@ -245,7 +245,8 @@ public:
int replay_succ(const int16_t callback_list_idx, const share::SCN scn);
int rollback_to(const transaction::ObTxSEQ seq_no,
const transaction::ObTxSEQ from_seq_no,
const share::SCN replay_scn);
const share::SCN replay_scn,
int64_t &remove_cnt);
void set_for_replay(const bool for_replay);
bool is_for_replay() const { return ATOMIC_LOAD(&for_replay_); }
int remove_callbacks_for_fast_commit(const int16_t callback_list_idx, const share::SCN stop_scn);

View File

@ -97,8 +97,12 @@ public:
// case3: the callback has not been sync successfully
is_iter_end = true;
} else if (callback->get_scn().is_min()) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback scn is min_scn", KPC(callback));
#ifdef ENABLE_DEBUG_LOG
usleep(5000);
ob_abort();
#endif
is_iter_end = true;
} else if (0 >= need_remove_count_ && callback->get_scn() != last_scn_for_remove_) {
// case4: the callback has exceeded the last log whose log ts need to be
// removed
@ -400,8 +404,11 @@ public:
/* ret = OB_ERR_UNEXPECTED; */
/* TRANS_LOG(ERROR, "unexpected callback", KP(callback)); */
} else if (is_commit_ && callback->get_scn().is_max()) {
TRANS_LOG(ERROR, "callback has not submitted log yet when commit callback", KP(callback));
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "callback has not submitted log yet when commit callback", K(ret), KP(callback));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
} else if (is_commit_
&& OB_FAIL(callback->trans_commit())) {
TRANS_LOG(ERROR, "trans commit failed", KPC(callback));

View File

@ -56,11 +56,15 @@ void ObTxCallbackList::reset()
{
if (length_ + removed_ != appended_) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "BUG:list state insanity", KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
if (length_ + removed_ != logged_ + unlog_removed_) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "BUG:list state insanity", KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
head_.set_prev(&head_);
head_.set_next(&head_);
@ -112,29 +116,31 @@ int ObTxCallbackList::append_callback(ObITransCallback *callback,
&& append_pos->get_scn() > callback->get_scn()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "replay callback scn out of order", K(ret), KPC(callback), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
} else {
append_pos->append(callback);
// start parallel replay, remember the position
if (for_replay && parallel_replay && !serial_final && !parallel_start_pos_) {
ATOMIC_STORE(&parallel_start_pos_, get_tail());
}
++appended_;
ATOMIC_INC(&length_);
data_size_ += callback->get_data_size();
if (repos_lc) {
log_cursor_ = get_tail();
}
if (for_replay) {
++logged_;
++synced_;
}
// Once callback is appended into callback lists, we can not handle the
// error after it. So it should never report the error later. What's more,
// after_append also should never return the error.
(void)callback->after_append_cb(for_replay);
}
append_pos->append(callback);
// start parallel replay, remember the position
if (for_replay && parallel_replay && !serial_final && !parallel_start_pos_) {
ATOMIC_STORE(&parallel_start_pos_, get_tail());
}
++appended_;
ATOMIC_INC(&length_);
data_size_ += callback->get_data_size();
if (repos_lc) {
log_cursor_ = get_tail();
}
if (for_replay) {
++logged_;
++synced_;
}
// Once callback is appended into callback lists, we can not handle the
// error after it. So it should never report the error later. What's more,
// after_append also should never return the error.
(void)callback->after_append_cb(for_replay);
}
return ret;
}
@ -203,76 +209,82 @@ int ObTxCallbackList::callback_(ObITxCallbackFunctor &functor,
OB_SUCC(ret) && !iter_end && iter != NULL && iter != end;
iter = next) {
functor.refresh();
if (iter->get_scn().is_min()) {
if (OB_UNLIKELY(iter->get_scn().is_min())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "callback with min_scn", K(ret), KPC(iter), KPC(this));
#ifdef ENABLE_DEBUG_LOG
usleep(5000);
ob_abort();
}
if (functor.is_iter_end(iter)) {
#endif
} else if (functor.is_iter_end(iter)) {
iter_end = true;
} else {
next = (is_reverse ? iter->get_prev() : iter->get_next());
if (OB_FAIL(functor(iter))) {
// don't print log, print it in functor
} else if (functor.need_remove_callback()) {
if (removed_ && (removed_ % 10000 == 0)) {
uint64_t checksum_now = batch_checksum_.calc();
TRANS_LOG(INFO, "[CallbackList] remove-callback", K(checksum_now), KPC(this));
}
const share::SCN iter_scn = iter->get_scn();
// the del operation must serialize with append operation
// if it is operating on the list tail
bool deleted = false;
if (next == end) {
if (!lock_state.APPEND_LOCKED_) {
LockGuard guard(*this, LOCK_MODE::LOCK_APPEND);
ret = iter->del();
deleted = true;
} else {
ret = iter->del();
deleted = true;
}
}
if ((deleted && OB_FAIL(ret)) || (!deleted && OB_FAIL(iter->del()))) {
TRANS_LOG(ERROR, "remove callback failed", K(ret), KPC(iter), K(deleted));
// sanity check before remove:
// should not remove parallel replayed callback before serial replay finished
if (parallel_start_pos_
&& !is_skip_checksum_()
&& !callback_mgr_.is_serial_final()
&& iter_scn >= parallel_start_pos_->get_scn()
&& iter_scn <= sync_scn_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "remove parallel callback while serial part not all replayed", K(ret), KPC(iter), KPC(this));
#ifdef ENABLE_DEBUG_LOG
usleep(5000);
ob_abort();
#endif
} else {
if (parallel_start_pos_
&& !is_skip_checksum_()
&& !callback_mgr_.is_serial_final()
&& iter_scn >= parallel_start_pos_->get_scn()
&& iter_scn <= sync_scn_) {
TRANS_LOG(ERROR, "should not remove this callback", KPC(iter));
usleep(5000);
ob_abort();
if (removed_ && (removed_ % 10000 == 0)) {
uint64_t checksum_now = batch_checksum_.calc();
TRANS_LOG(INFO, "[CallbackList] remove-callback", K(checksum_now), KPC(this));
}
if (log_cursor_ == iter) {
log_cursor_ = next;
}
if (parallel_start_pos_ == iter) {
parallel_start_pos_ = (next == &head_) ? NULL : next;
}
++removed_;
if (iter->need_submit_log()) {
++unlog_removed_;
}
++remove_count;
if (iter->is_need_free()) {
if (iter->is_table_lock_callback()) {
callback_mgr_.get_ctx().free_table_lock_callback(iter);
} else if (MutatorType::MUTATOR_ROW_EXT_INFO == iter->get_mutator_type()) {
callback_mgr_.get_ctx().free_ext_info_callback(iter);
// the del operation must serialize with append operation
// if it is operating on the list tail
bool deleted = false;
if (next == end) {
if (!lock_state.APPEND_LOCKED_) {
LockGuard guard(*this, LOCK_MODE::LOCK_APPEND);
ret = iter->del();
deleted = true;
} else {
callback_mgr_.get_ctx().free_mvcc_row_callback(iter);
ret = iter->del();
deleted = true;
}
}
if ((deleted && OB_FAIL(ret)) || (!deleted && OB_FAIL(iter->del()))) {
TRANS_LOG(ERROR, "remove callback failed", K(ret), KPC(iter), K(deleted));
} else {
if (log_cursor_ == iter) {
log_cursor_ = next;
}
if (parallel_start_pos_ == iter) {
parallel_start_pos_ = (next == &head_) ? NULL : next;
}
++removed_;
if (iter->need_submit_log()) {
++unlog_removed_;
}
++remove_count;
if (iter->is_need_free()) {
if (iter->is_table_lock_callback()) {
callback_mgr_.get_ctx().free_table_lock_callback(iter);
} else if (MutatorType::MUTATOR_ROW_EXT_INFO == iter->get_mutator_type()) {
callback_mgr_.get_ctx().free_ext_info_callback(iter);
} else {
callback_mgr_.get_ctx().free_mvcc_row_callback(iter);
}
}
}
}
}
if ((++traverse_count & 0xFFFFF) == 0) {
TRANS_LOG(WARN, "memtable fifo callback too long",
K(traverse_count), K(remove_count), K(functor));
}
}
if ((++traverse_count & 0xFFFFF) == 0) {
TRANS_LOG(WARN, "memtable fifo callback too long",
K(traverse_count), K(remove_count), K(functor));
}
}
functor.set_statistics(traverse_count, remove_count);
@ -630,11 +642,13 @@ int ObTxCallbackList::tx_commit()
if (OB_FAIL(callback_(functor, guard.state_))) {
TRANS_LOG(WARN, "trans commit failed", K(ret), K(functor));
} else if (length_ != 0) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "callback list has not been cleaned after commit callback", K(ret), K(functor));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
} else {
callback_mgr_.add_tx_end_callback_remove_cnt(functor.get_remove_cnt());
#endif
}
callback_mgr_.add_tx_end_callback_remove_cnt(functor.get_remove_cnt());
return ret;
}
@ -647,10 +661,13 @@ int ObTxCallbackList::tx_abort()
if (OB_FAIL(callback_(functor, guard.state_))) {
TRANS_LOG(WARN, "trans abort failed", K(ret), K(functor));
} else if (length_ != 0) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "callback list has not been cleaned after abort", K(ret), K(functor));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
} else {
callback_mgr_.add_tx_end_callback_remove_cnt(functor.get_remove_cnt());
#endif
}
callback_mgr_.add_tx_end_callback_remove_cnt(functor.get_remove_cnt());
return ret;
}

View File

@ -129,10 +129,15 @@ void ObMemtableCtx::reset()
}
if (OB_UNLIKELY(callback_alloc_count_ != callback_free_count_)) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback alloc and free count not match", K(*this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
if (OB_UNLIKELY(unsubmitted_cnt_ != 0)) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "txn unsubmitted cnt not zero", K(*this), K(unsubmitted_cnt_));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
if (OB_TRANS_KILLED != end_code_) {
// _NOTE_: skip when txn was forcedly killed
@ -144,7 +149,9 @@ void ObMemtableCtx::reset()
if (OB_UNLIKELY(fill != sync_succ + sync_fail)) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "redo filled_count != sync_succ + sync_fail", KPC(this),
K(fill), K(sync_succ), K(sync_fail));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
}
is_inited_ = false;
@ -514,8 +521,10 @@ int ObMemtableCtx::do_trans_end(
// after a transaction finishes, callback memory should be released
// and check memory leakage
if (OB_UNLIKELY(ATOMIC_LOAD(&callback_alloc_count_) != ATOMIC_LOAD(&callback_free_count_))) {
TRANS_LOG(ERROR, "callback alloc and free count not match", K(*this));
ob_abort(); // for easy debug, remove later
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback alloc and free count not match", KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
// release durable table lock
if (OB_FAIL(ret)) {
@ -585,7 +594,10 @@ int ObMemtableCtx::trans_replay_end(const bool commit,
"checksum_replayed", checksum_collapsed,
"checksum_before_collapse", replay_checksum,
K(checksum_signature), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_usleep(5000);
ob_abort();
#endif
}
}
}
@ -818,9 +830,9 @@ int ObMemtableCtx::rollback(const transaction::ObTxSEQ to_seq_no,
const share::SCN replay_scn)
{
int ret = OB_SUCCESS;
common::ObTimeGuard timeguard("remove callbacks for rollback to", 10 * 1000);
const int64_t start_ts = common::ObClockGenerator::getClock();
ObByteLockGuard guard(lock_);
int64_t remove_cnt = 0;
if (!to_seq_no.is_valid() || !from_seq_no.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(from_seq_no), K(to_seq_no));
@ -829,13 +841,14 @@ int ObMemtableCtx::rollback(const transaction::ObTxSEQ to_seq_no,
TRANS_LOG(WARN, "ctx is NULL", K(ret));
} else if (OB_FAIL(reuse_log_generator_())) {
TRANS_LOG(ERROR, "fail to reset log generator", K(ret));
} else if (OB_FAIL(trans_mgr_.rollback_to(to_seq_no, from_seq_no, replay_scn))) {
} else if (OB_FAIL(trans_mgr_.rollback_to(to_seq_no, from_seq_no, replay_scn, remove_cnt))) {
TRANS_LOG(WARN, "rollback to failed", K(ret), K(*this));
// rollback the table lock that with no tablelock callback
} else if (OB_FAIL(rollback_table_lock_(to_seq_no, from_seq_no))) {
TRANS_LOG(WARN, "rollback table lock failed", K(ret), K(*this), K(to_seq_no));
} else {
TRANS_LOG(INFO, "memtable handle rollback to successfuly", K(from_seq_no), K(to_seq_no), K(*this));
const int64_t elapsed = common::ObClockGenerator::getClock() - start_ts;
TRANS_LOG(INFO, "memtable handle rollback to successfuly", K(from_seq_no), K(to_seq_no), K(remove_cnt), K(elapsed), KPC(this));
}
return ret;
}

View File

@ -96,12 +96,19 @@ public:
ctx_.next_epoch_ = iter->get_epoch();
} else if (iter->get_epoch() < ctx_.epoch_from_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "found callback with epoch less than `from`", K(ret), KPC(iter), K(ctx_));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
} else if (FALSE_IT(ctx_.cur_epoch_ = iter->get_epoch())) {
} else if (!iter->need_submit_log()) {
// this should not happend
// because log_cursor is _strictly_ point to the right next to logging position
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "found callback has been logged, maybe log_cursor value is insane", K(ret), KPC(iter), K(ctx_));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
} else if (iter->is_logging_blocked()) {
ret = OB_BLOCK_FROZEN;
ctx_.last_log_blocked_memtable_ = static_cast<memtable::ObMemtable *>(iter->get_memtable());

View File

@ -1942,8 +1942,8 @@ int ObPartTransCtx::serial_submit_redo_after_write_()
ret = submitter.serial_submit(should_switch);
if (should_switch && submitter.get_submitted_cnt() > 0) {
const share::SCN serial_final_scn = submitter.get_submitted_scn();
switch_to_parallel_logging_(serial_final_scn, exec_info_.max_submitted_seq_no_);
TRANS_LOG(INFO, "**switch to parallel logging**",
int tmp_ret = switch_to_parallel_logging_(serial_final_scn, exec_info_.max_submitted_seq_no_);
TRANS_LOG(INFO, "**leader switch to parallel logging**", K(tmp_ret),
K_(ls_id), K_(trans_id),
K(serial_final_scn),
"serial_final_seq_no", exec_info_.serial_final_seq_no_,
@ -2206,8 +2206,10 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "cb arg array is empty", K(ret), KPC(this));
print_trace_log_();
#ifdef ENABLE_DEBUG_LOG
usleep(5000);
ob_abort();
#endif
}
if (log_cb->is_callbacked()) {
#ifndef NDEBUG
@ -2224,9 +2226,12 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
busy_cbs_.remove(log_cb);
return_log_cb_(log_cb);
} else {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback was missed when tx ctx exiting", KPC(log_cb), KPC(this));
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "callback was missed when tx ctx exiting", K(ret), KPC(log_cb), KPC(this));
print_trace_log_();
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
} else {
// save the first error code
@ -2630,8 +2635,10 @@ int ObPartTransCtx::on_failure(ObTxLogCb *log_cb)
int ret = OB_SUCCESS;
share::SCN max_committed_scn;
if (OB_FAIL(ls_tx_ctx_mgr_->get_ls_log_adapter()->get_palf_committed_max_scn(max_committed_scn))) {
TRANS_LOG(ERROR, "get palf max committed scn fail, need retry", K(ret));
ob_abort(); // fast abort for easy debug, TODO(yunxing.cyx) change to return do retry
TRANS_LOG(ERROR, "get palf max committed scn fail, need retry", K(ret), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort(); // fast abort for easy debug
#endif
} else {
TRANS_LOG(INFO, "succ get palf max_commited_scn", K(max_committed_scn), KPC(log_cb));
}
@ -5124,10 +5131,12 @@ int ObPartTransCtx::replay_redo_in_ctx(const ObTxRedoLog &redo_log,
if (!is_tx_log_queue) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "serial final redo must be in tx_log_queue", K(ret), KPC(this), K(timestamp));
#ifdef ENABLE_DEBUG_LOG
usleep(50_ms);
ob_abort();
#endif
} else if (!exec_info_.serial_final_scn_.is_valid()) {
switch_to_parallel_logging_(timestamp, max_seq_no);
ret = switch_to_parallel_logging_(timestamp, max_seq_no);
}
}
}
@ -5230,8 +5239,10 @@ int ObPartTransCtx::replay_rollback_to(const ObTxRollbackToLog &log,
&& !pre_barrier) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "missing pre barrier flag for parallel replay", KR(ret), K(*this));
#ifdef ENABLE_DEBUG_LOG
usleep(5000);
ob_abort();
#endif
} else if (OB_FAIL(check_replay_avaliable_(offset, timestamp, part_log_no, need_replay))) {
TRANS_LOG(WARN, "check replay available failed", KR(ret), K(offset), K(timestamp), K(*this));
} else if (!need_replay) {
@ -5285,7 +5296,9 @@ int ObPartTransCtx::replay_rollback_to(const ObTxRollbackToLog &log,
} else {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "code should not go here", K(ret), K(timestamp), K_(trans_id), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
}
if (OB_SUCC(ret) &&
@ -5450,8 +5463,10 @@ int ObPartTransCtx::replay_commit_info(const ObTxCommitInfoLog &commit_info_log,
if (is_parallel_logging() && !pre_barrier) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "missing pre barrier flag for parallel replay", KR(ret), K(*this));
#ifdef ENABLE_DEBUG_LOG
usleep(5000);
ob_abort();
#endif
} else if (OB_FAIL(check_replay_avaliable_(offset, timestamp, part_log_no, need_replay))) {
TRANS_LOG(WARN, "check replay available failed", KR(ret), K(offset), K(timestamp), K(*this));
} else if (!need_replay) {
@ -5739,10 +5754,6 @@ int ObPartTransCtx::replay_commit(const ObTxCommitLog &commit_log,
cluster_version_,
checksum))) {
TRANS_LOG(WARN, "trans replay commit failed", KR(ret), K(commit_log), KPC(this));
if (OB_CHECKSUM_ERROR == ret) {
usleep(500000);
ob_abort();
}
} else if ((!ctx_tx_data_.is_read_only()) && OB_FAIL(ctx_tx_data_.insert_into_tx_table())) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
} else if (is_local_tx_()) {
@ -10105,30 +10116,37 @@ inline bool ObPartTransCtx::is_support_parallel_replay_() const
return cluster_version_accurate_ && cluster_version_ >= CLUSTER_VERSION_4_3_0_0;
}
inline void ObPartTransCtx::switch_to_parallel_logging_(const share::SCN serial_final_scn,
const ObTxSEQ max_seq_no)
inline int ObPartTransCtx::switch_to_parallel_logging_(const share::SCN serial_final_scn,
const ObTxSEQ max_seq_no)
{
// when start replaying serial final redo log or submitted serial final redo log
// switch the Tx's logging mode to parallel logging
// this include mark serial final scn point in exec_info_
// and notify callback_mgr to remember the serial_final_scn
// which used to for check whether the callback-list has replayed continuously
// or all of it's serial logs has been synced continously
// if reach these condition, the checksum calculations of callback-list can continues
// into the parallel logged part
exec_info_.serial_final_scn_.atomic_set(serial_final_scn);
// remember the max of seq_no of redos currently submitted
// if an rollback to savepoint before this point, which means
// replay of this rollback-savepoint-log must pre-berrier to
// wait serial replay parts finished
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!max_seq_no.is_valid())) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "max seq_no of serial final log is invalid",
K(serial_final_scn), K(max_seq_no), KPC(this));
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "max seq_no of serial final log is invalid",
K(ret), K(serial_final_scn), K(max_seq_no), KPC(this));
print_trace_log_();
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
exec_info_.serial_final_seq_no_ = max_seq_no;
mt_ctx_.set_parallel_logging(serial_final_scn, max_seq_no);
if (OB_SUCC(ret)) {
// when start replaying serial final redo log or submitted serial final redo log
// switch the Tx's logging mode to parallel logging
// this include mark serial final scn point in exec_info_
// and notify callback_mgr to remember the serial_final_scn
// which used to for check whether the callback-list has replayed continuously
// or all of it's serial logs has been synced continously
// if reach these condition, the checksum calculations of callback-list can continues
// into the parallel logged part
exec_info_.serial_final_scn_.atomic_set(serial_final_scn);
// remember the max of seq_no of redos currently submitted
// if an rollback to savepoint before this point, which means
// replay of this rollback-savepoint-log must pre-berrier to
// wait serial replay parts finished
exec_info_.serial_final_seq_no_ = max_seq_no;
mt_ctx_.set_parallel_logging(serial_final_scn, max_seq_no);
}
return ret;
}
inline void ObPartTransCtx::recovery_parallel_logging_()

View File

@ -549,8 +549,8 @@ private:
share::SCN &scn,
bool need_free_extra_cb = false);
bool should_switch_to_parallel_logging_();
void switch_to_parallel_logging_(const share::SCN serial_final_scn,
const ObTxSEQ max_seq_no);
int switch_to_parallel_logging_(const share::SCN serial_final_scn,
const ObTxSEQ max_seq_no);
bool has_replay_serial_final_() const;
void recovery_parallel_logging_();
int check_can_submit_redo_();

View File

@ -257,8 +257,11 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info)
if (ctx.fill_count_ == 0) {
// BIG_ROW has been handled in `fill_log_block_`
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "should not reach here", K(ret));
TRANS_LOG(ERROR, "should not reach here", K(ret), K(ctx), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
break;
} else {
fill_ret = OB_SUCCESS;
}
@ -277,9 +280,11 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info)
} else {
// serial logging, shouldn't reach here
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "oops! fatal panic", K(ret), KPC(this));
usleep(1000);
TRANS_LOG(ERROR, "oops! fatal panic", K(ret), K(ctx), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
break;
}
} else if (OB_EAGAIN == fill_ret) {
// this list is all filled, but others remains
@ -288,14 +293,18 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info)
} else {
// serial logging, shouldn't reach here
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "oops! fatal panic", K(ret), KPC(this));
TRANS_LOG(ERROR, "oops! fatal panic", K(ret), K(ctx), KPC(this));
#ifdef ENABLE_DEBUG_LOG
usleep(1000);
ob_abort();
#endif
break;
}
} else {
stop = true;
ret = fill_ret;
}
// start submit out filled log block
bool submitted = false;
int submit_ret = OB_SUCCESS;
@ -360,8 +369,10 @@ int ObTxRedoSubmitter::after_submit_redo_out_()
int ret = OB_SUCCESS;
if (OB_FAIL(mt_ctx_.log_submitted(*helper_))) {
TRANS_LOG(WARN, "callback memctx fail", K(ret));
#ifdef ENABLE_DEBUG_LOG
usleep(1000);
ob_abort();
#endif
}
helper_->reset();
return ret;
@ -373,7 +384,9 @@ int ObTxRedoSubmitter::prepare_()
int ret = OB_SUCCESS;
if (OB_NOT_NULL(log_cb_)) {
ret = OB_ERR_UNEXPECTED;
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
} else {
ret = tx_ctx_.prepare_for_submit_redo(log_cb_, *log_block_, serial_final_);
}

View File

@ -253,8 +253,6 @@ int ObTxReplayExecutor::replay_tx_log_(const ObTxLogType log_type)
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "[Replay Tx] Unknown Log Type in replay buf",
K(log_type), KPC(this));
usleep(100000);
ob_abort();
}
}
return ret;
@ -656,7 +654,9 @@ int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo, const bool s
if (OB_UNLIKELY(!seq_no.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "seq no is invalid in mutator row", K(seq_no), KPC(this));
#ifdef ENABLE_DEBUG_LOG
ob_abort();
#endif
}
if (seq_no.get_seq() > max_seq_no.get_seq()) {
max_seq_no = seq_no;