skip active memtable when flush redo for freeze

This commit is contained in:
chinaxing 2024-01-15 13:42:48 +00:00 committed by ob-robot
parent f044d7692f
commit b3ec417991
19 changed files with 199 additions and 97 deletions

View File

@ -89,14 +89,14 @@ namespace checkpoint
} // namespace storage
namespace transaction
{
int ObPartTransCtx::submit_redo_log_for_freeze()
int ObPartTransCtx::submit_redo_log_for_freeze(const uint32_t freeze_clock)
{
int ret = OB_SUCCESS;
int64_t sleep_time = rand() % SLEEP_TIME;
ob_usleep(sleep_time);
CtxLockGuard guard(lock_);
bool submitted = false;
ret = submit_redo_log_for_freeze_(submitted);
ret = submit_redo_log_for_freeze_(submitted, freeze_clock);
if (sleep_time > 50 && sleep_time < 90) {
ret = OB_TX_NOLOGCB;
} else if (sleep_time >= 90) {

View File

@ -219,13 +219,13 @@ int ObFreezerStat::get_diagnose_info(ObStringHolder &diagnose_info)
return ret;
}
void ObFreezerStat::set_freeze_clock(const int64_t freeze_clock)
void ObFreezerStat::set_freeze_clock(const uint32_t freeze_clock)
{
ObSpinLockGuard guard(lock_);
freeze_clock_ = freeze_clock;
}
int64_t ObFreezerStat::get_freeze_clock()
uint32_t ObFreezerStat::get_freeze_clock()
{
ObSpinLockGuard guard(lock_);
return freeze_clock_;
@ -335,7 +335,7 @@ int ObFreezerStat::deep_copy_to(ObFreezerStat &other)
return ret;
}
int ObFreezerStat::begin_set_freeze_stat(const int64_t freeze_clock,
int ObFreezerStat::begin_set_freeze_stat(const uint32_t freeze_clock,
const int64_t start_time,
const int state,
const share::SCN &freeze_snapshot_version,
@ -521,11 +521,11 @@ int ObFreezer::logstream_freeze(ObFuture<int> *result)
return ret;
}
void ObFreezer::try_submit_log_for_freeze_()
void ObFreezer::try_submit_log_for_freeze_(const bool is_tablet_freeze)
{
int ret = OB_SUCCESS;
if (OB_FAIL(submit_log_for_freeze(true/*try*/))) {
if (OB_FAIL(submit_log_for_freeze(is_tablet_freeze, true/*try*/))) {
TRANS_LOG(WARN, "fail to try submit log for freeze", K(ret));
set_need_resubmit_log(true);
}
@ -538,7 +538,7 @@ int ObFreezer::inner_logstream_freeze(ObFuture<int> *result)
ObTableHandleV2 handle;
if (FALSE_IT(submit_checkpoint_task())) {
} else if (FALSE_IT(try_submit_log_for_freeze_())) {
} else if (FALSE_IT(try_submit_log_for_freeze_(false/*tablet freeze*/))) {
} else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/, result, handle))) {
TRANS_LOG(ERROR, "failed to submit ls_freeze task", K(ret), K(ls_id));
stat_.add_diagnose_info("fail to submit ls_freeze_task");
@ -587,7 +587,7 @@ int ObFreezer::ls_freeze_task()
ObLSLockGuard lock_ls(ls_, ls_->lock_, read_lock, write_lock);
if (OB_FAIL(check_ls_state())) {
} else {
submit_log_for_freeze(false/*try*/);
submit_log_for_freeze(false/*tablet freeze*/, false/*try*/);
TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K(ls_id));
}
}
@ -706,7 +706,7 @@ int ObFreezer::freeze_normal_tablet_(const ObTabletID &tablet_id, ObFuture<int>
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to set is_tablet_freeze");
}
} else if (FALSE_IT(try_submit_log_for_freeze_())) {
} else if (FALSE_IT(try_submit_log_for_freeze_(true/*tablet freeze*/))) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, result, frozen_memtable_handle))) {
TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to submit tablet_freeze_task");
@ -846,7 +846,7 @@ int ObFreezer::tablet_freeze_with_rewrite_meta(const ObTabletID &tablet_id)
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to set is_tablet_freeze");
}
} else if (FALSE_IT(try_submit_log_for_freeze_())) {
} else if (FALSE_IT(try_submit_log_for_freeze_(true/*tablet freeze*/))) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, nullptr, frozen_memtable_handle))) {
TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to submit freeze_task");
@ -932,7 +932,7 @@ int ObFreezer::try_wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtab
} else if (!ready_for_flush) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
if (need_resubmit_log()) {
submit_log_for_freeze(false/*try*/);
submit_log_for_freeze(true/*tablet freeze*/, false/*try*/);
TRANS_LOG(INFO, "[Freezer] resubmit log", K(ret));
}
const int64_t cost_time = ObTimeUtility::current_time() - start;
@ -1149,7 +1149,7 @@ int ObFreezer::batch_tablet_freeze_(const ObIArray<ObTabletID> &tablet_ids, ObFu
need_freeze = false;
TRANS_LOG(INFO, "[Freezer] no need to freeze batch tablets", K(ret), K(tablet_ids));
stat_.add_diagnose_info("no need to freeze batch tablets");
} else if (FALSE_IT(try_submit_log_for_freeze_())) {
} else if (FALSE_IT(try_submit_log_for_freeze_(true/*tablet freeze*/))) {
} else if (OB_FAIL(submit_batch_tablet_freeze_task(memtable_handles, result))) {
TRANS_LOG(WARN, "[Freezer] fail to submit batch_tablet_freeze task", K(ret));
} else {
@ -1265,7 +1265,7 @@ int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtabl
ret = OB_ERR_UNEXPECTED;
} else {
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable*>(imemtable);
try_submit_log_for_freeze_();
try_submit_log_for_freeze_(true/*tablet freeze*/);
wait_memtable_ready_for_flush(memtable);
if (OB_FAIL(memtable->finish_freeze())) {
TRANS_LOG(ERROR, "[Freezer] memtable cannot be flushed",
@ -1294,7 +1294,7 @@ namespace {
};
}
int ObFreezer::submit_log_for_freeze(bool is_try)
int ObFreezer::submit_log_for_freeze(const bool is_tablet_freeze, const bool is_try)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
@ -1309,8 +1309,9 @@ int ObFreezer::submit_log_for_freeze(bool is_try)
do {
ret = OB_SUCCESS;
transaction::ObTransID fail_tx_id;
if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_redo_log(fail_tx_id))) {
// because tablet freeze will not inc freeze clock, fake the freeze clock
const uint32_t freeze_clock = is_tablet_freeze ? get_freeze_clock() + 1 : get_freeze_clock();
if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_redo_log(fail_tx_id, freeze_clock))) {
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 1000 * 1000) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
@ -1424,7 +1425,7 @@ void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable)
while (!memtable->ready_for_flush()) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
if (need_resubmit_log()) {
submit_log_for_freeze(false/*try*/);
submit_log_for_freeze(true/*tablet freeze*/, false/*try*/);
TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K(ls_id));
}
const int64_t cost_time = ObTimeUtility::current_time() - start;

View File

@ -166,8 +166,8 @@ public:
bool need_rewrite_meta();
void set_state(int state);
int get_state();
void set_freeze_clock(const int64_t freeze_clock);
int64_t get_freeze_clock();
void set_freeze_clock(const uint32_t freeze_clock);
uint32_t get_freeze_clock();
void set_start_time(int64_t start_time);
int64_t get_start_time();
void set_end_time(int64_t end_time);
@ -177,7 +177,7 @@ public:
void set_freeze_snapshot_version(const share::SCN &freeze_snapshot_version);
share::SCN get_freeze_snapshot_version();
int deep_copy_to(ObFreezerStat &other);
int begin_set_freeze_stat(const int64_t freeze_clock,
int begin_set_freeze_stat(const uint32_t freeze_clock,
const int64_t start_time,
const int state,
const share::SCN &freeze_snapshot_version,
@ -191,7 +191,7 @@ private:
ObTabletID tablet_id_;
bool need_rewrite_meta_;
int state_;
int64_t freeze_clock_;
uint32_t freeze_clock_;
int64_t start_time_;
int64_t end_time_;
int ret_code_;
@ -298,8 +298,8 @@ private:
/* inner subfunctions for freeze process */
int inner_logstream_freeze(ObFuture<int> *result);
int submit_log_for_freeze(bool is_try);
void try_submit_log_for_freeze_();
int submit_log_for_freeze(const bool is_tablet_freeze, const bool is_try);
void try_submit_log_for_freeze_(const bool is_tablet_freeze);
int ls_freeze_task();
int tablet_freeze_task(ObTableHandleV2 handle);
int submit_freeze_task(const bool is_ls_freeze, ObFuture<int> *result, ObTableHandleV2 &handle);

View File

@ -429,9 +429,9 @@ int ObLSTxService::replay(const void *buffer,
return ret;
}
int ObLSTxService::traverse_trans_to_submit_redo_log(ObTransID &fail_tx_id)
int ObLSTxService::traverse_trans_to_submit_redo_log(ObTransID &fail_tx_id, const uint32_t freeze_clock)
{
return mgr_->traverse_tx_to_submit_redo_log(fail_tx_id);
return mgr_->traverse_tx_to_submit_redo_log(fail_tx_id, freeze_clock);
}
int ObLSTxService::traverse_trans_to_submit_next_log() { return mgr_->traverse_tx_to_submit_next_log(); }

View File

@ -111,7 +111,8 @@ public:
const transaction::ObTxSEQ &spec_seq_no = transaction::ObTxSEQ::INVL()) const;
int revert_store_ctx(storage::ObStoreCtx &store_ctx) const;
// Freeze process needs to traverse trans ctx to submit redo log
int traverse_trans_to_submit_redo_log(transaction::ObTransID &fail_tx_id);
int traverse_trans_to_submit_redo_log(transaction::ObTransID &fail_tx_id,
const uint32_t freeze_clock = UINT32_MAX);
// submit next log when all trx in frozen memtable have submitted log
int traverse_trans_to_submit_next_log();
// check schduler status for gc

View File

@ -59,6 +59,7 @@ public:
virtual bool on_memtable(const ObIMemtable * const memtable)
{ UNUSED(memtable); return false; }
virtual ObIMemtable* get_memtable() const { return nullptr; }
virtual uint32_t get_freeze_clock() const { return 0; }
virtual transaction::ObTxSEQ get_seq_no() const { return transaction::ObTxSEQ::INVL(); }
virtual int del() { return remove(); }
virtual bool is_need_free() const { return true; }
@ -71,6 +72,7 @@ public:
void after_append_cb(const bool is_replay);
bool need_submit_log() const { return need_submit_log_; }
virtual bool is_logging_blocked() const { return false; }
virtual bool on_frozen_memtable(ObIMemtable *&last_frozen_mt) const { return true; }
int log_submitted_cb(const share::SCN scn, ObIMemtable *&last_mt);
int log_sync_fail_cb(const share::SCN scn);
// interface should be implement by subclasses

View File

@ -1512,6 +1512,16 @@ bool ObMvccRowCallback::is_logging_blocked() const
return is_blocked;
}
uint32_t ObMvccRowCallback::get_freeze_clock() const
{
if (OB_ISNULL(memtable_)) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "mvcc row memtable is NULL", KPC(this));
return 0;
} else {
return memtable_->get_freeze_clock();
}
}
int ObMvccRowCallback::clean()
{
unlink_trans_node();

View File

@ -486,6 +486,7 @@ public:
const ObMemtableKey *get_key() { return &key_; }
int get_memtable_key(uint64_t &table_id, common::ObStoreRowkey &rowkey) const;
bool is_logging_blocked() const override;
uint32_t get_freeze_clock() const override;
transaction::ObTxSEQ get_seq_no() const { return seq_no_; }
int get_trans_id(transaction::ObTransID &trans_id) const;
int get_cluster_version(uint64_t &cluster_version) const override;

View File

@ -194,6 +194,7 @@ public:
{
return ATOMIC_LOAD(&data_size_) - ATOMIC_LOAD(&logged_data_size_) > limit;
}
// *NOTICE* this _only_ account MvccRowCallback on memtable
bool has_pending_log() const {
return ATOMIC_LOAD(&data_size_) - ATOMIC_LOAD(&logged_data_size_) > 0;
}

View File

@ -148,7 +148,6 @@ public:
storage::ObTableAccessContext &context,
const blocksstable::ObDatumRowkey &rowkey,
blocksstable::ObDatumRow &row) = 0;
virtual int64_t get_frozen_trans_version() { return 0; }
virtual int major_freeze(const common::ObVersion &version)
{ UNUSED(version); return common::OB_SUCCESS; }

View File

@ -105,6 +105,9 @@ public:
} else if (iter->is_logging_blocked()) {
ret = OB_BLOCK_FROZEN;
ctx_.last_log_blocked_memtable_ = static_cast<memtable::ObMemtable *>(iter->get_memtable());
} else if (OB_UNLIKELY(iter->get_freeze_clock() >= ctx_.freeze_clock_)) {
// when flush redo for frozen memtable, if memtable is active, should stop
ret = OB_BLOCK_FROZEN;
} else {
bool fake_fill = false;
if (MutatorType::MUTATOR_ROW == iter->get_mutator_type()) {

View File

@ -82,6 +82,7 @@ struct ObTxFillRedoCtx
write_seq_no_(),
skip_lock_node_(false),
all_list_(false),
freeze_clock_(UINT32_MAX),
list_log_epoch_arr_(),
cur_epoch_(0),
next_epoch_(0),
@ -105,6 +106,7 @@ struct ObTxFillRedoCtx
transaction::ObTxSEQ write_seq_no_; // to select callback list in parallel logging
bool skip_lock_node_; // whether skip fill lock node
bool all_list_; // whether to fill all callback-list
uint32_t freeze_clock_; // memtables before and equals it will be flushed
ObSEArray<RedoLogEpoch, 1> list_log_epoch_arr_; // record each list's next log epoch
int64_t cur_epoch_; // current filling epoch
int64_t next_epoch_; // next epoch of list, used to update list_log_epoch_arr_
@ -125,10 +127,24 @@ struct ObTxFillRedoCtx
public:
bool is_empty() const { return fill_count_ == 0; }
bool not_empty() const { return fill_count_ > 0; }
TO_STRING_KV(K_(tx_id), K_(write_seq_no), K_(all_list), K_(cur_epoch), K_(next_epoch),
K_(epoch_from), K_(epoch_to), K_(fill_count), K_(fill_time),
KPC_(callback_scope), K_(skip_lock_node), K_(is_all_filled), K_(list_idx),
K_(list_log_epoch_arr), KP_(last_log_blocked_memtable), K_(buf_len), K_(buf_pos));
TO_STRING_KV(K_(tx_id),
K_(write_seq_no),
K_(all_list),
K_(freeze_clock),
K_(cur_epoch),
K_(next_epoch),
K_(epoch_from),
K_(epoch_to),
K_(fill_count),
K_(fill_time),
KPC_(callback_scope),
K_(skip_lock_node),
K_(is_all_filled),
K_(list_idx),
K_(list_log_epoch_arr),
KP_(last_log_blocked_memtable),
K_(buf_len),
K_(buf_pos));
};
class ObCallbackListLogGuard

View File

@ -1335,11 +1335,11 @@ int ObLSTxCtxMgr::del_tx_ctx(ObTransCtx *ctx)
return ret;
}
int ObLSTxCtxMgr::traverse_tx_to_submit_redo_log(ObTransID &fail_tx_id)
int ObLSTxCtxMgr::traverse_tx_to_submit_redo_log(ObTransID &fail_tx_id, const uint32_t freeze_clock)
{
int ret = OB_SUCCESS;
RLockGuard guard(rwlock_);
ObTxSubmitLogFunctor fn(ObTxSubmitLogFunctor::SUBMIT_REDO_LOG);
ObTxSubmitLogFunctor fn(ObTxSubmitLogFunctor::SUBMIT_REDO_LOG, freeze_clock);
if (!is_follower_() && OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
if (OB_SUCCESS != fn.get_result()) {
// get real ret code

View File

@ -269,7 +269,8 @@ public:
int del_tx_ctx(ObTransCtx *ctx);
// Freeze process needs to traverse TxCtx to submit log
int traverse_tx_to_submit_redo_log(ObTransID &fail_tx_id);
// @param[in] freeze_clock, the freeze clock after which will not be traversaled
int traverse_tx_to_submit_redo_log(ObTransID &fail_tx_id, const uint32_t freeze_clock = UINT32_MAX);
int traverse_tx_to_submit_next_log();
// Get the min prepare version of transaction module of current observer for slave read

View File

@ -1292,8 +1292,8 @@ private:
class ObTxSubmitLogFunctor
{
public:
explicit ObTxSubmitLogFunctor(const int action)
: action_(action), result_(common::OB_SUCCESS), fail_tx_id_()
explicit ObTxSubmitLogFunctor(const int action, const uint32_t freeze_clock = UINT32_MAX)
: action_(action), freeze_clock_(freeze_clock), result_(common::OB_SUCCESS), fail_tx_id_()
{
SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/);
}
@ -1312,7 +1312,7 @@ public:
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(tx_id), "ctx", OB_P(tx_ctx));
} else if (ObTxSubmitLogFunctor::SUBMIT_REDO_LOG == action_) {
if (OB_FAIL(tx_ctx->submit_redo_log_for_freeze())) {
if (OB_FAIL(tx_ctx->submit_redo_log_for_freeze(freeze_clock_))) {
TRANS_LOG(WARN, "failed to submit redo log", K(ret), K(tx_id));
}
} else if (ObTxSubmitLogFunctor::SUBMIT_NEXT_LOG == action_) {
@ -1337,6 +1337,7 @@ public:
private:
int action_;
uint32_t freeze_clock_;
int result_;
ObTransID fail_tx_id_;
};

View File

@ -1825,10 +1825,9 @@ int ObPartTransCtx::remove_callback_for_uncommited_txn(
}
// the semantic of submit redo for freeze is
// should flush all redos which can be flushed
// otherwise, need return some error to caller
// to indicate need retry
int ObPartTransCtx::submit_redo_log_for_freeze()
// should flush all redos bellow specified freeze_clock (inclusive)
// otherwise, need return some error to caller to indicate need retry
int ObPartTransCtx::submit_redo_log_for_freeze(const uint32_t freeze_clock)
{
int ret = OB_SUCCESS;
TRANS_LOG(TRACE, "", K_(trans_id), K_(ls_id));
@ -1838,7 +1837,7 @@ int ObPartTransCtx::submit_redo_log_for_freeze()
if (need_submit) {
CtxLockGuard guard(lock_);
tg.click();
ret = submit_redo_log_for_freeze_(submitted);
ret = submit_redo_log_for_freeze_(submitted, freeze_clock);
tg.click();
if (submitted) {
REC_TRANS_TRACE_EXT2(tlog_, submit_log_for_freeze, OB_Y(ret),
@ -1905,7 +1904,7 @@ int ObPartTransCtx::serial_submit_redo_after_write_()
int64_t before_submit_pending_size = mt_ctx_.get_pending_log_size();
bool should_switch = should_switch_to_parallel_logging_();
ObTxRedoSubmitter submitter(*this, mt_ctx_);
ret = submitter.submit(false /*flush all log*/, should_switch, false /*display blocked info*/);
ret = submitter.serial_submit(should_switch);
if (should_switch && submitter.get_submitted_cnt() > 0) {
const share::SCN serial_final_scn = submitter.get_submitted_scn();
switch_to_parallel_logging_(serial_final_scn);
@ -1966,13 +1965,13 @@ int ObPartTransCtx::prepare_for_submit_redo(ObTxLogCb *&log_cb,
return ret;
}
int ObPartTransCtx::submit_redo_log_for_freeze_(bool &submitted)
int ObPartTransCtx::submit_redo_log_for_freeze_(bool &submitted, const uint32_t freeze_clock)
{
int ret = OB_SUCCESS;
ATOMIC_STORE(&is_submitting_redo_log_for_freeze_, true);
if (OB_SUCC(check_can_submit_redo_())) {
ObTxRedoSubmitter submitter(*this, mt_ctx_);
if (OB_FAIL(submitter.submit(true/*flush all*/, false/*final serial*/, true /*display blocked info*/))) {
if (OB_FAIL(submitter.submit_for_freeze(freeze_clock, true /*display blocked info*/))) {
if (OB_BLOCK_FROZEN != ret) {
TRANS_LOG(WARN, "fail to submit redo log for freeze", K(ret));
// for some error, txn will be aborted immediately
@ -2859,7 +2858,7 @@ int ObPartTransCtx::submit_redo_if_parallel_logging_()
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_parallel_logging())) {
ObTxRedoSubmitter submitter(*this, mt_ctx_);
if (OB_FAIL(submitter.submit(true/*flush all*/, false/*final serial*/, true /*display blocked info*/))) {
if (OB_FAIL(submitter.submit_all(true /*display blocked info*/))) {
TRANS_LOG(WARN, "submit redo log fail", K(ret));
}
}
@ -9237,8 +9236,9 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
if (exec_info_.state_ == ObTxState::INIT) {
// promise redo log before move log
bool submitted = false;
if (OB_FAIL(submit_redo_log_for_freeze_(submitted))) {
TRANS_LOG(WARN, "submit log failed", KR(ret), KPC(this));
ObTxRedoSubmitter submitter(*this, mt_ctx_);
if (OB_FAIL(submitter.serial_submit(false))) {
TRANS_LOG(WARN, "submit redo failed", K(ret), KPC(this));
} else {
sub_state_.set_transfer_blocking();
}

View File

@ -361,7 +361,7 @@ public:
int try_submit_next_log();
// for instant logging and freezing
int submit_redo_after_write(const bool force, const ObTxSEQ &write_seq_no);
int submit_redo_log_for_freeze();
int submit_redo_log_for_freeze(const uint32_t freeze_clock);
int return_redo_log_cb(ObTxLogCb *log_cb);
int push_replaying_log_ts(const share::SCN log_ts_ns, const int64_t log_entry_no);
int push_replayed_log_ts(const share::SCN log_ts_ns,
@ -872,7 +872,7 @@ private:
const share::ObLSID &ori_ls_id, const ObAddr &ori_addr);
int check_ls_state_(const SCN &snapshot, const ObLSID &ls_id, const ObStandbyCheckInfo &check_info);
int get_ls_replica_readable_scn_(const ObLSID &ls_id, SCN &snapshot_version);
int submit_redo_log_for_freeze_(bool &try_submit);
int submit_redo_log_for_freeze_(bool &try_submit, const uint32_t freeze_clock);
void print_first_mvcc_callback_();
public:
int prepare_for_submit_redo(ObTxLogCb *&log_cb,

View File

@ -35,22 +35,33 @@ public:
helper_(NULL),
from_all_list_(false),
flush_all_(false),
write_seq_no_(),
serial_final_(false),
submit_if_not_full_(true),
flush_freeze_clock_(UINT32_MAX),
write_seq_no_(),
submit_cb_list_idx_(-1),
submit_out_cnt_(0),
submitted_scn_()
{}
~ObTxRedoSubmitter();
// submit, will traversal all callback-list
int submit(const bool flush_all, const bool is_final, const bool display_blocked_info = false);
int submit_for_freeze(const uint32_t freeze_clock = UINT32_MAX, const bool display_blocked_info = true) {
return submit_(true, freeze_clock, false, display_blocked_info);
}
int serial_submit(const bool is_final) {
return submit_(false, UINT32_MAX, is_final, false);
}
int submit_all(const bool display_blocked_info) {
return submit_(true, UINT32_MAX, false, display_blocked_info);
}
// fill log_block, and if full submit out and continue to fill
int fill(ObTxLogBlock &log_block, memtable::ObRedoLogSubmitHelper &helper, const bool display_blocked_info = true);
// parallel submit, only traversal writter's callback-list
int parallel_submit(const ObTxSEQ &write_seq);
int get_submitted_cnt() const { return submit_out_cnt_; }
share::SCN get_submitted_scn() const { return submitted_scn_; }
private:
// general submit entry, will traversal all callback-list
int submit_(const bool flush_all, const uint32_t freeze_clock, const bool is_final, const bool display_blocked_info);
private:
// common submit redo pipeline
// prepare -> fill -> submit_out -> after_submit
@ -68,9 +79,16 @@ private:
int submit_log_block_out_(const int64_t replay_hint, bool &submitted);
int after_submit_redo_out_();
public:
TO_STRING_KV(K_(tx_id), K_(ls_id), K_(from_all_list), K_(flush_all),
K_(write_seq_no), K_(serial_final),
K_(submit_if_not_full), K_(submit_out_cnt), K_(submit_cb_list_idx));
TO_STRING_KV(K_(tx_id),
K_(ls_id),
K_(from_all_list),
K_(flush_all),
K_(flush_freeze_clock),
K_(write_seq_no),
K_(serial_final),
K_(submit_if_not_full),
K_(submit_out_cnt),
K_(submit_cb_list_idx));
private:
ObPartTransCtx &tx_ctx_;
memtable::ObMemtableCtx &mt_ctx_;
@ -81,15 +99,17 @@ private:
memtable::ObRedoLogSubmitHelper *helper_;
// for writer thread submit, only submit single list
// for freeze or switch leader or commit, submit from all list
bool from_all_list_;
bool from_all_list_ : 1;
// whether flush all logs before can return
bool flush_all_;
bool flush_all_ : 1;
// whether is submitting the final serial log
bool serial_final_ : 1;
// wheter submit out if log_block is not full filled
bool submit_if_not_full_ : 1;
// flush memtables before and equals specified freeze_clock
int64_t flush_freeze_clock_;
// writer seq_no, used to select logging callback-list
ObTxSEQ write_seq_no_;
// whether is submitting the final serial log
bool serial_final_;
// wheter submit out if log_block is not full filled
bool submit_if_not_full_;
// submit from which list, use by wirte thread logging
int submit_cb_list_idx_;
// the count of logs this submitter submitted out
@ -163,11 +183,15 @@ int ObTxRedoSubmitter::parallel_submit(const ObTxSEQ &write_seq_no)
// the caller has hold TransCtx's FlushRedo write Lock
// which ensure no writer thread is logging
// and also hold TransCtx's CtxLock, which is safe to operate in the flush pipline
int ObTxRedoSubmitter::submit(const bool flush_all, const bool is_final, const bool display_blocked_info)
int ObTxRedoSubmitter::submit_(const bool flush_all,
const uint32_t freeze_clock,
const bool is_final,
const bool display_blocked_info)
{
int ret = OB_SUCCESS;
from_all_list_ = true;
flush_all_ = flush_all;
flush_freeze_clock_ = freeze_clock;
serial_final_ = is_final;
ObTxLogBlock log_block;
log_block_ = &log_block;
@ -205,7 +229,6 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info)
int ret = OB_SUCCESS;
memtable::ObTxFillRedoCtx ctx;
ctx.tx_id_ = tx_id_;
const bool parallel_logging = tx_ctx_.is_parallel_logging();
ctx.write_seq_no_ = write_seq_no_;
const bool is_parallel_logging = tx_ctx_.is_parallel_logging();
bool stop = false;
@ -277,7 +300,7 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info)
bool submitted = false;
int submit_ret = OB_SUCCESS;
if (ctx.fill_count_ > 0 && !skip_submit) {
const int64_t replay_hint = ctx.tx_id_.get_id() + (parallel_logging ? ctx.list_idx_ : 0);
const int64_t replay_hint = ctx.tx_id_.get_id() + (is_parallel_logging ? ctx.list_idx_ : 0);
submit_ret = submit_log_block_out_(replay_hint, submitted);
if (OB_SUCCESS == submit_ret) {
submit_ret = after_submit_redo_out_();
@ -371,6 +394,7 @@ int ObTxRedoSubmitter::fill_log_block_(memtable::ObTxFillRedoCtx &ctx)
ctx.helper_ = helper_;
ctx.skip_lock_node_ = false;
ctx.all_list_ = from_all_list_;
ctx.freeze_clock_ = flush_freeze_clock_;
ctx.fill_count_ = 0;
ctx.list_idx_ = submit_cb_list_idx_;
int64_t start_ts = ObTimeUtility::fast_current_time();

View File

@ -50,6 +50,7 @@ struct MockDelegate {
const bool has_hold_ctx_lock,
share::SCN &submitted) = 0;
virtual int fill_redo_log(memtable::ObTxFillRedoCtx &ctx) = 0;
};
struct MockImpl : MockDelegate {
int a_;
@ -110,10 +111,14 @@ int succ_submit_redo_log_out(ObTxLogBlock & b,
const bool has_hold_ctx_lock,
share::SCN &submitted_scn)
{
log_cb = NULL;
submitted_scn.convert_for_tx(123123123);
if (log_cb) {
((ObPartTransCtx*)(log_cb->ctx_))->return_log_cb_(log_cb);
log_cb = NULL;
}
return OB_SUCCESS;
}
bool ObPartTransCtx::is_parallel_logging() const
{
return mock_ptr->is_parallel_logging();
@ -170,7 +175,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_BLOCK_FROZEN)
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.submit(false/*flush_all*/, false));
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.serial_submit(false));
}
}
@ -193,7 +198,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_BUF_NOT_ENOUGH)
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_SUCCESS, submitter.submit(false/*flush_all*/, false));
EXPECT_EQ(OB_SUCCESS, submitter.serial_submit(false));
}
}
@ -216,7 +221,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_ALL_FILLED)
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_SUCCESS, submitter.submit(false/*flush_all*/, false));
EXPECT_EQ(OB_SUCCESS, submitter.serial_submit(false));
}
}
@ -239,7 +244,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_UNEXPECTED_ERROR)
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_ALLOCATE_MEMORY_FAILED, submitter.submit(false/*flush_all*/, false));
EXPECT_EQ(OB_ALLOCATE_MEMORY_FAILED, submitter.serial_submit(false));
}
}
@ -262,7 +267,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_serial_final)
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_SUCCESS, submitter.submit(false/*flush_all*/, true/*serial final*/));
EXPECT_EQ(OB_SUCCESS, submitter.serial_submit(true/*serial final*/));
}
}
@ -380,35 +385,19 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_BLOCK_FROZEN)
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.submit(false/*flush_all*/, false));
}
}
TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_BLOCKED_BY_OTHERS)
{
mdo_.a_ = 3;
EXPECT_CALL(mdo_, is_parallel_logging())
.Times(AtLeast(1))
.WillRepeatedly(Return(true));
{
InSequence s1;
EXPECT_CALL(mdo_, fill_redo_log(_))
.Times(1)
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
ctx.fill_count_ = 100;
ctx.buf_pos_ = 200;
return OB_ITER_END;
ctx.fill_count_ = 0;
ctx.buf_pos_ = 0;
return OB_BLOCK_FROZEN;
}));
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_ITER_END, submitter.submit(false/*flush_all*/, false));
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.submit_for_freeze());
}
}
TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_CURRENT_FILLED_BUT_OTHERS_REMAINS)
TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_FROZEN_BLOCKED_BY_OTHERS)
{
mdo_.a_ = 3;
EXPECT_CALL(mdo_, is_parallel_logging())
@ -416,6 +405,7 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_CURRENT_FILLED_BUT
.WillRepeatedly(Return(true));
{
InSequence s1;
// first list flushed the flushable part
EXPECT_CALL(mdo_, fill_redo_log(_))
.Times(1)
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
@ -423,11 +413,32 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_CURRENT_FILLED_BUT
ctx.buf_pos_ = 200;
return OB_EAGAIN;
}));
// submit it out to log service layer
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
// retry from other lists, the list with small epoch flushed hit a block frozen
EXPECT_CALL(mdo_, fill_redo_log(_))
.Times(1)
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
ctx.fill_count_ = 100;
ctx.buf_pos_ = 200;
return OB_BLOCK_FROZEN;
}));
// submit it out to log service layer
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
// retry from other list, can not submit others due to the list with min epoch is block frozen
EXPECT_CALL(mdo_, fill_redo_log(_))
.Times(1)
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
ctx.fill_count_ = 0;
ctx.buf_pos_ = 0;
return OB_BLOCK_FROZEN;
}));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_EAGAIN, submitter.submit(false/*flush_all*/, false));
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.submit_for_freeze());
}
}
@ -439,6 +450,7 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_BUF_NOT_ENOUGH)
.WillRepeatedly(Return(true));
{
InSequence s1;
// the list with small epoch is large
EXPECT_CALL(mdo_, fill_redo_log(_))
.Times(1)
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
@ -449,8 +461,38 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_BUF_NOT_ENOUGH)
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
// the list with small epoch is all flushed
EXPECT_CALL(mdo_, fill_redo_log(_))
.Times(1)
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
ctx.fill_count_ = 100;
ctx.buf_pos_ = 200;
return OB_EAGAIN;
}));
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
// the second list with small epoch is all flushed
EXPECT_CALL(mdo_, fill_redo_log(_))
.Times(1)
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
ctx.fill_count_ = 100;
ctx.buf_pos_ = 200;
return OB_EAGAIN;
}));
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
// all list filled, nothing to fill
EXPECT_CALL(mdo_, fill_redo_log(_))
.Times(1)
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
ctx.fill_count_ = 0;
ctx.buf_pos_ = 0;
return OB_SUCCESS;
}));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_SUCCESS, submitter.submit(false/*flush_all*/, false));
EXPECT_EQ(OB_SUCCESS, submitter.submit_for_freeze());
}
}
@ -473,7 +515,7 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_ALL_FILLED)
.Times(1)
.WillOnce(Invoke(succ_submit_redo_log_out));
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_SUCCESS, submitter.submit(false/*flush_all*/, false));
EXPECT_EQ(OB_SUCCESS, submitter.submit_for_freeze());
}
}
@ -609,7 +651,7 @@ TEST_F(ObTestRedoSubmitter, submit_ROW_SIZE_TOO_LARGE)
}));
{
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
EXPECT_EQ(OB_ERR_TOO_BIG_ROWSIZE, submitter.submit(true, false, true));
EXPECT_EQ(OB_ERR_TOO_BIG_ROWSIZE, submitter.submit_all(true));
EXPECT_EQ(submitter.get_submitted_cnt(), 0);
}
{