[refine] writer thread force logging after finished submit log for freeze

This commit is contained in:
chinaxing
2024-05-14 06:13:46 +00:00
committed by ob-robot
parent 8dd33b177b
commit d5668ff4d4
8 changed files with 71 additions and 70 deletions

View File

@ -315,7 +315,7 @@ ObMvccWriteGuard::~ObMvccWriteGuard()
if (REACH_TIME_INTERVAL(100 * 1000)) { if (REACH_TIME_INTERVAL(100 * 1000)) {
TRANS_LOG(WARN, "failed to submit log if neccesary", K(ret), K(is_freeze), KPC(tx_ctx)); TRANS_LOG(WARN, "failed to submit log if neccesary", K(ret), K(is_freeze), KPC(tx_ctx));
} }
if (is_freeze) { if (is_freeze && OB_BLOCK_FROZEN != ret) {
memtable_->get_freezer()->set_need_resubmit_log(true); memtable_->get_freezer()->set_need_resubmit_log(true);
} }
} }

View File

@ -792,7 +792,11 @@ int ObTransCallbackMgr::prep_and_fill_from_list_(ObTxFillRedoCtx &ctx,
return ret; return ret;
} }
bool ObTransCallbackMgr::check_list_has_min_epoch_(const int my_idx, const int64_t my_epoch, int64_t &min_epoch, int &min_idx) bool ObTransCallbackMgr::check_list_has_min_epoch_(const int my_idx,
const int64_t my_epoch,
const bool require_min,
int64_t &min_epoch,
int &min_idx)
{ {
bool ret = true; bool ret = true;
int list_cnt = get_logging_list_count(); int list_cnt = get_logging_list_count();
@ -802,10 +806,16 @@ bool ObTransCallbackMgr::check_list_has_min_epoch_(const int my_idx, const int64
int64_t epoch_i = list->get_log_epoch(); int64_t epoch_i = list->get_log_epoch();
if (epoch_i < my_epoch) { if (epoch_i < my_epoch) {
ret = false; ret = false;
min_epoch = epoch_i; if (require_min) {
min_idx = i; if (min_epoch == 0 || epoch_i < min_epoch) {
TRANS_LOG(DEBUG, "hit", K(epoch_i), K(i)); min_epoch = epoch_i;
break; min_idx = i;
}
} else {
min_epoch = epoch_i;
min_idx = i;
break;
}
} }
} }
} }
@ -819,11 +829,11 @@ bool ObTransCallbackMgr::check_list_has_min_epoch_(const int my_idx, const int64
// - OB_BLOCK_FROZEN: next to logging callback's memtable was logging blocked // - OB_BLOCK_FROZEN: next to logging callback's memtable was logging blocked
int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq, int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq,
ObCallbackListLogGuard &lock_guard, ObCallbackListLogGuard &lock_guard,
int &list_idx) int &ret_list_idx)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
RDLockGuard guard(rwlock_); RDLockGuard guard(rwlock_);
list_idx = (write_seq.get_branch() % MAX_CALLBACK_LIST_COUNT); int list_idx = (write_seq.get_branch() % MAX_CALLBACK_LIST_COUNT);
ObTxCallbackList *list = get_callback_list_(list_idx, true); ObTxCallbackList *list = get_callback_list_(list_idx, true);
if (OB_ISNULL(list)) { if (OB_ISNULL(list)) {
ret = OB_ENTRY_NOT_EXIST; ret = OB_ENTRY_NOT_EXIST;
@ -831,7 +841,7 @@ int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq,
int64_t my_epoch = list->get_log_epoch(); int64_t my_epoch = list->get_log_epoch();
int64_t min_epoch = 0; int64_t min_epoch = 0;
int min_epoch_idx =-1; int min_epoch_idx =-1;
bool flush_min_epoch_list = false; bool pending_too_large = false;
common::ObByteLock *log_lock = NULL; common::ObByteLock *log_lock = NULL;
if (my_epoch == INT64_MAX) { if (my_epoch == INT64_MAX) {
ret = OB_ENTRY_NOT_EXIST; ret = OB_ENTRY_NOT_EXIST;
@ -839,24 +849,23 @@ int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq,
ret = OB_BLOCK_FROZEN; ret = OB_BLOCK_FROZEN;
} else if (OB_ISNULL(log_lock = list->try_lock_log())) { } else if (OB_ISNULL(log_lock = list->try_lock_log())) {
ret = OB_NEED_RETRY; ret = OB_NEED_RETRY;
} else if (!check_list_has_min_epoch_(list_idx, my_epoch, min_epoch, min_epoch_idx)) { // if current list pending size too large, try to submit the min_epoch list
} else if (FALSE_IT(pending_too_large = list->pending_log_too_large(GCONF._private_buffer_size * 10))) {
} else if (!check_list_has_min_epoch_(list_idx, my_epoch, pending_too_large, min_epoch, min_epoch_idx)) {
ret = OB_EAGAIN; ret = OB_EAGAIN;
ObIMemtable *to_log_memtable = list->get_log_cursor()->get_memtable(); ObIMemtable *to_log_memtable = list->get_log_cursor()->get_memtable();
if (TC_REACH_TIME_INTERVAL(1_s)) { if (TC_REACH_TIME_INTERVAL(1_s)) {
TRANS_LOG(WARN, "has smaller epoch unlogged", KPC(this), TRANS_LOG(WARN, "has smaller epoch unlogged", KPC(this),
K(list_idx), K(write_seq), K(my_epoch), K(min_epoch), K(min_epoch_idx), KP(to_log_memtable)); K(list_idx), K(write_seq), K(my_epoch), K(min_epoch), K(min_epoch_idx), KP(to_log_memtable));
} }
// if current list pending size too large, try to submit the min_epoch list
if (OB_UNLIKELY(list->pending_log_too_large(GCONF._private_buffer_size * 10))) {
flush_min_epoch_list = true;
}
} else { } else {
ret_list_idx = list_idx;
lock_guard.set(log_lock); lock_guard.set(log_lock);
} }
if (OB_FAIL(ret) && log_lock) { if (OB_FAIL(ret) && log_lock) {
log_lock->unlock(); log_lock->unlock();
} }
if (OB_UNLIKELY(flush_min_epoch_list)) { if (OB_EAGAIN == ret && OB_UNLIKELY(pending_too_large)) {
ObTxCallbackList *min_epoch_list = get_callback_list_(min_epoch_idx, false); ObTxCallbackList *min_epoch_list = get_callback_list_(min_epoch_idx, false);
if (OB_ISNULL(log_lock = min_epoch_list->try_lock_log())) { if (OB_ISNULL(log_lock = min_epoch_list->try_lock_log())) {
// lock conflict, acquired by others // lock conflict, acquired by others
@ -864,9 +873,8 @@ int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq,
if (REACH_TIME_INTERVAL(1_s)) { if (REACH_TIME_INTERVAL(1_s)) {
TRANS_LOG(INFO, "decide to flush callback list with min_epoch", KPC(this), K(min_epoch), K(min_epoch_idx)); TRANS_LOG(INFO, "decide to flush callback list with min_epoch", KPC(this), K(min_epoch), K(min_epoch_idx));
} }
list_idx = min_epoch_idx; ret_list_idx = min_epoch_idx;
lock_guard.set(log_lock); lock_guard.set(log_lock);
ret = OB_SUCCESS;
} }
} }
} }
@ -936,7 +944,7 @@ int ObTransCallbackMgr::fill_from_one_list(ObTxFillRedoCtx &ctx,
// - OB_SUCCESS: all callbacks from all callback-list filled // - OB_SUCCESS: all callbacks from all callback-list filled
// - OB_EAGAIN: due to parallel logging, must return to flush this list and retry others // - OB_EAGAIN: due to parallel logging, must return to flush this list and retry others
// - OB_BLOCK_FROZEN: stopped due to can not logging waiting memtable frozen // - OB_BLOCK_FROZEN: stopped due to can not logging waiting memtable frozen
// - OB_ITER_END: stopped due to has smaller write_epoch whos log isn't submitted // - OB_ITER_END: stopped due to has smaller write_epoch whose log hasn't submitted
// - OB_BUF_NOT_ENOUGH: stopped due to buffer can not hold current node // - OB_BUF_NOT_ENOUGH: stopped due to buffer can not hold current node
// return policy: // return policy:
// - if parallel_logging, return if need switch to next list and has // - if parallel_logging, return if need switch to next list and has

View File

@ -365,7 +365,11 @@ private:
} }
int fill_from_one_list(ObTxFillRedoCtx &ctx, const int list_idx, ObITxFillRedoFunctor &func); int fill_from_one_list(ObTxFillRedoCtx &ctx, const int list_idx, ObITxFillRedoFunctor &func);
int fill_from_all_list(ObTxFillRedoCtx &ctx, ObITxFillRedoFunctor &func); int fill_from_all_list(ObTxFillRedoCtx &ctx, ObITxFillRedoFunctor &func);
bool check_list_has_min_epoch_(const int my_idx, const int64_t my_epoch, int64_t &min_epoch, int &min_idx); bool check_list_has_min_epoch_(const int my_idx,
const int64_t my_epoch,
const bool require_min,
int64_t &min_epoch,
int &min_idx);
void calc_list_fill_log_epoch_(const int list_idx, int64_t &epoch_from, int64_t &epoch_to); void calc_list_fill_log_epoch_(const int list_idx, int64_t &epoch_from, int64_t &epoch_to);
void calc_next_to_fill_log_info_(const ObIArray<RedoLogEpoch> &arr, void calc_next_to_fill_log_info_(const ObIArray<RedoLogEpoch> &arr,
int &index, int &index,

View File

@ -1905,69 +1905,50 @@ int ObPartTransCtx::submit_redo_after_write(const bool force, const ObTxSEQ &wri
#define LOAD_PARALLEL_LOGGING parallel_logging = exec_info_.serial_final_scn_.atomic_load().is_valid() #define LOAD_PARALLEL_LOGGING parallel_logging = exec_info_.serial_final_scn_.atomic_load().is_valid()
LOAD_PARALLEL_LOGGING; LOAD_PARALLEL_LOGGING;
if (!parallel_logging) { if (!parallel_logging) {
if (force) { int submitted_cnt = 0;
// For the frozen memtable, we must guarantee if (force || OB_SUCCESS == lock_.try_lock()) {
CtxLockGuard guard(lock_); CtxLockGuard guard(lock_, force /* need lock */);
// double check parallel_logging is on // double check parallel_logging is on
LOAD_PARALLEL_LOGGING; LOAD_PARALLEL_LOGGING;
if (!parallel_logging) { if (!parallel_logging) {
ret = serial_submit_redo_after_write_(); ret = serial_submit_redo_after_write_(submitted_cnt);
}
} else if (OB_SUCCESS == lock_.try_lock()) {
CtxLockGuard guard(lock_, false);
// double check parallel_logging is on
LOAD_PARALLEL_LOGGING;
if (!parallel_logging) {
ret = serial_submit_redo_after_write_();
} }
} }
if (OB_BUF_NOT_ENOUGH == ret) { if (submitted_cnt > 0 && OB_EAGAIN == ret) {
// bufer full, try fill after switch to parallel logging // has remains, try fill after switch to parallel logging
LOAD_PARALLEL_LOGGING; LOAD_PARALLEL_LOGGING;
} }
} }
#undef LOAD_PARALLEL_LOGGING #undef LOAD_PARALLEL_LOGGING
tg.click("serial_log"); tg.click("serial_log");
if (parallel_logging) { if (parallel_logging && OB_SUCC(lock_.try_rdlock_flush_redo())) {
if (force) { if (OB_SUCC(check_can_submit_redo_())) {
CtxLockGuard guard(lock_, CtxLockGuard::MODE::REDO_FLUSH_R); if (is_committing_()) {
if (OB_SUCC(check_can_submit_redo_()) && !is_committing_()) { ret = force ? OB_TRANS_HAS_DECIDED : OB_SUCCESS;
// TODO(handora.qc): Currently, parallel_submit might return } else {
// OB_SUCCESS without having fully submitted all necessary logs,
// resulting in an inability to mark need_resubmit on the freezer.
// Therefore, we rely on the fallback mechanism of freezer_task to
// retry submitting logs to meet the requirement of freezing that
// waiting for all logs to be submitted.
//
// However, we definitely need to ensure in the future that OB_SUCCESS
// is only returned only when the logs are fully submitted.
ObTxRedoSubmitter submitter(*this, mt_ctx_); ObTxRedoSubmitter submitter(*this, mt_ctx_);
if (OB_FAIL(submitter.parallel_submit(write_seq_no))) { if (OB_FAIL(submitter.parallel_submit(write_seq_no))) {
if (OB_ITER_END == ret || OB_EAGAIN == ret) { if (!force && (OB_ITER_END == ret // blocked by others, current remains
|| OB_NEED_RETRY == ret // acquire lock failed
)) {
ret = OB_SUCCESS; ret = OB_SUCCESS;
} }
} }
} }
} else if (OB_SUCC(lock_.try_rdlock_flush_redo())) {
if (OB_SUCC(check_can_submit_redo_()) && !is_committing_()) {
ObTxRedoSubmitter submitter(*this, mt_ctx_);
if (OB_FAIL(submitter.parallel_submit(write_seq_no))) {
if (OB_ITER_END == ret || OB_EAGAIN == ret) {
ret = OB_SUCCESS;
}
}
}
lock_.unlock_flush_redo();
} }
lock_.unlock_flush_redo();
}
if (!force && (OB_TRANS_HAS_DECIDED == ret // do committing
|| OB_BLOCK_FROZEN == ret // memtable logging blocked
|| OB_EAGAIN == ret // partial submitted or submit to log-service fail
)) {
ret = OB_SUCCESS;
} }
}
if (OB_TRANS_HAS_DECIDED == ret || OB_BLOCK_FROZEN == ret) {
ret = OB_SUCCESS;
} }
return ret; return ret;
} }
int ObPartTransCtx::serial_submit_redo_after_write_() int ObPartTransCtx::serial_submit_redo_after_write_(int &submitted_cnt)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_SUCC(check_can_submit_redo_())) { if (OB_SUCC(check_can_submit_redo_())) {
@ -1975,7 +1956,8 @@ int ObPartTransCtx::serial_submit_redo_after_write_()
bool should_switch = should_switch_to_parallel_logging_(); bool should_switch = should_switch_to_parallel_logging_();
ObTxRedoSubmitter submitter(*this, mt_ctx_); ObTxRedoSubmitter submitter(*this, mt_ctx_);
ret = submitter.serial_submit(should_switch); ret = submitter.serial_submit(should_switch);
if (should_switch && submitter.get_submitted_cnt() > 0) { submitted_cnt = submitter.get_submitted_cnt();
if (should_switch && submitted_cnt > 0) {
const share::SCN serial_final_scn = submitter.get_submitted_scn(); const share::SCN serial_final_scn = submitter.get_submitted_scn();
int tmp_ret = switch_to_parallel_logging_(serial_final_scn, exec_info_.max_submitted_seq_no_); int tmp_ret = switch_to_parallel_logging_(serial_final_scn, exec_info_.max_submitted_seq_no_);
TRANS_LOG(INFO, "**leader switch to parallel logging**", K(tmp_ret), TRANS_LOG(INFO, "**leader switch to parallel logging**", K(tmp_ret),
@ -9776,7 +9758,9 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
// promise redo log before move log // promise redo log before move log
bool submitted = false; bool submitted = false;
ObTxRedoSubmitter submitter(*this, mt_ctx_); ObTxRedoSubmitter submitter(*this, mt_ctx_);
if (OB_FAIL(submitter.serial_submit(false))) { int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(submitter.serial_submit(false)) && submitter.get_submitted_cnt() <= 0) {
ret = tmp_ret;
TRANS_LOG(WARN, "submit redo failed", K(ret), KPC(this)); TRANS_LOG(WARN, "submit redo failed", K(ret), KPC(this));
} else { } else {
sub_state_.set_transfer_blocking(); sub_state_.set_transfer_blocking();

View File

@ -561,7 +561,7 @@ private:
void recovery_parallel_logging_(); void recovery_parallel_logging_();
int check_can_submit_redo_(); int check_can_submit_redo_();
void force_no_need_replay_checksum_(const bool parallel_replay, const share::SCN &log_ts); void force_no_need_replay_checksum_(const bool parallel_replay, const share::SCN &log_ts);
int serial_submit_redo_after_write_(); int serial_submit_redo_after_write_(int &submitted_cnt);
int submit_big_segment_log_(); int submit_big_segment_log_();
int prepare_big_segment_submit_(ObTxLogCb *segment_cb, int prepare_big_segment_submit_(ObTxLogCb *segment_cb,
const share::SCN &base_scn, const share::SCN &base_scn,

View File

@ -1658,7 +1658,7 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id,
if (verify_epoch > 0 && ctx->epoch_ != verify_epoch) { if (verify_epoch > 0 && ctx->epoch_ != verify_epoch) {
ret = OB_TRANS_CTX_NOT_EXIST; ret = OB_TRANS_CTX_NOT_EXIST;
TRANS_LOG(WARN, "current ctx illegal, born epoch not match", K(ret), K(ls), K(tx_id), TRANS_LOG(WARN, "current ctx illegal, born epoch not match", K(ret), K(ls), K(tx_id),
K(verify_epoch), KPC(ctx)); K(verify_epoch), K(ctx_born_epoch), KPC(ctx));
} else if(OB_FAIL(ls_sync_rollback_savepoint__(ctx, } else if(OB_FAIL(ls_sync_rollback_savepoint__(ctx,
savepoint, savepoint,
op_sn, op_sn,

View File

@ -136,6 +136,7 @@ ObTxRedoSubmitter::~ObTxRedoSubmitter()
int ObTxRedoSubmitter::parallel_submit(const ObTxSEQ &write_seq_no) int ObTxRedoSubmitter::parallel_submit(const ObTxSEQ &write_seq_no)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int save_ret = OB_SUCCESS;
from_all_list_ = false; from_all_list_ = false;
flush_all_ = false; flush_all_ = false;
write_seq_no_ = write_seq_no; write_seq_no_ = write_seq_no;
@ -146,20 +147,23 @@ int ObTxRedoSubmitter::parallel_submit(const ObTxSEQ &write_seq_no)
submit_if_not_full_ = true; submit_if_not_full_ = true;
bool do_submit = false; bool do_submit = false;
memtable::ObCallbackListLogGuard log_lock_guard; memtable::ObCallbackListLogGuard log_lock_guard;
submit_cb_list_idx_ = -1;
if (OB_FAIL(mt_ctx_.get_log_guard(write_seq_no, log_lock_guard, submit_cb_list_idx_))) { if (OB_FAIL(mt_ctx_.get_log_guard(write_seq_no, log_lock_guard, submit_cb_list_idx_))) {
if (OB_NEED_RETRY == ret) { if (OB_NEED_RETRY == ret) {
// give up, lock conflict // lock conflict
ret = OB_SUCCESS;
} else if (OB_BLOCK_FROZEN == ret) { } else if (OB_BLOCK_FROZEN == ret) {
// memtable is logging blocked // memtable is logging blocked
} else if (OB_EAGAIN == ret) { } else if (OB_EAGAIN == ret) {
// others need flush firstly // others need flush firstly
// TODO: try flush others out and retry
if (TC_REACH_TIME_INTERVAL(5_s)) { if (TC_REACH_TIME_INTERVAL(5_s)) {
TRANS_LOG(WARN, "blocked by other list has smaller wirte epoch unlogged", TRANS_LOG(WARN, "blocked by other list has smaller wirte epoch unlogged",
K(write_seq_no), K(tx_ctx_.get_trans_id())); K(write_seq_no), K(tx_ctx_.get_trans_id()));
} }
ret = OB_SUCCESS; if (submit_cb_list_idx_ >= 0) {
// try to submit blocking list
save_ret = OB_EAGAIN;
do_submit = true;
}
} else if (OB_ENTRY_NOT_EXIST == ret) { } else if (OB_ENTRY_NOT_EXIST == ret) {
// no callback to log // no callback to log
ret = OB_SUCCESS; ret = OB_SUCCESS;
@ -171,6 +175,7 @@ int ObTxRedoSubmitter::parallel_submit(const ObTxSEQ &write_seq_no)
} }
if (do_submit) { if (do_submit) {
ret = _submit_redo_pipeline_(false); ret = _submit_redo_pipeline_(false);
ret = OB_SUCCESS == ret ? save_ret : ret;
} }
return ret; return ret;
} }
@ -263,7 +268,7 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info)
#endif #endif
break; break;
} else { } else {
fill_ret = OB_SUCCESS; fill_ret = OB_EAGAIN;
} }
} else if (OB_BLOCK_FROZEN == fill_ret) { } else if (OB_BLOCK_FROZEN == fill_ret) {
if (is_parallel_logging) { if (is_parallel_logging) {

View File

@ -198,7 +198,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_BUF_NOT_ENOUGH)
.Times(1) .Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out)); .WillOnce(Invoke(succ_submit_redo_log_out));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_SUCCESS, submitter.serial_submit(false)); EXPECT_EQ(OB_EAGAIN, submitter.serial_submit(false));
} }
} }