Resubmit log if submitting redo log for freeze fails
This commit is contained in:
parent
56064b2cd3
commit
38a93056a1
@ -204,6 +204,7 @@ ObFreezer::ObFreezer()
|
||||
empty_memtable_cnt_(0),
|
||||
high_priority_freeze_cnt_(0),
|
||||
low_priority_freeze_cnt_(0),
|
||||
need_resubmit_log_(false),
|
||||
is_inited_(false)
|
||||
{}
|
||||
|
||||
@ -226,6 +227,7 @@ ObFreezer::ObFreezer(ObLSWRSHandler *ls_loop_worker,
|
||||
empty_memtable_cnt_(0),
|
||||
high_priority_freeze_cnt_(0),
|
||||
low_priority_freeze_cnt_(0),
|
||||
need_resubmit_log_(false),
|
||||
is_inited_(false)
|
||||
{}
|
||||
|
||||
@ -253,6 +255,7 @@ void ObFreezer::set(ObLSWRSHandler *ls_loop_worker,
|
||||
ls_id_ = ls_id;
|
||||
stat_.reset();
|
||||
empty_memtable_cnt_ = 0;
|
||||
need_resubmit_log_ = false;
|
||||
}
|
||||
|
||||
void ObFreezer::reset()
|
||||
@ -270,6 +273,7 @@ void ObFreezer::reset()
|
||||
empty_memtable_cnt_ = 0;
|
||||
high_priority_freeze_cnt_ = 0;
|
||||
low_priority_freeze_cnt_ = 0;
|
||||
need_resubmit_log_ = false;
|
||||
is_inited_ = false;
|
||||
}
|
||||
|
||||
@ -324,6 +328,7 @@ int ObFreezer::logstream_freeze()
|
||||
FLOG_INFO("[Freezer] freeze is running", K(ret), K_(ls_id));
|
||||
} else if (FALSE_IT(max_decided_log_ts_ = max_decided_log_ts)) {
|
||||
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
|
||||
} else if (FALSE_IT(set_need_resubmit_log(false))) {
|
||||
} else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) {
|
||||
} else if (OB_FAIL(inner_logstream_freeze())) {
|
||||
undo_freeze_();
|
||||
@ -360,8 +365,12 @@ int ObFreezer::inner_logstream_freeze()
|
||||
// this means that all memtables can be dumped
|
||||
while (!data_checkpoint_->ls_freeze_finished()) {
|
||||
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
||||
if (cost_time > 10 * 1000 * 1000) {
|
||||
if (TC_REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
if (cost_time > 5 * 1000 * 1000) {
|
||||
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
|
||||
if (need_resubmit_log()) {
|
||||
submit_log_for_freeze();
|
||||
TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K_(ls_id), K(cost_time));
|
||||
}
|
||||
TRANS_LOG(WARN, "[Freezer] finish ls_freeze costs too much time",
|
||||
K_(ls_id), K(cost_time));
|
||||
stat_.add_diagnose_info("finish ls_freeze costs too much time");
|
||||
@ -410,6 +419,7 @@ int ObFreezer::tablet_freeze(const ObTabletID &tablet_id)
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id));
|
||||
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
|
||||
} else if (FALSE_IT(set_need_resubmit_log(false))) {
|
||||
} else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id,
|
||||
handle,
|
||||
ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) {
|
||||
@ -479,6 +489,7 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id)
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id));
|
||||
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
|
||||
} else if (FALSE_IT(set_need_resubmit_log(false))) {
|
||||
} else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id,
|
||||
handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) {
|
||||
TRANS_LOG(WARN, "[Freezer] fail to get tablet for freeze", K(ret), K_(ls_id), K(tablet_id));
|
||||
@ -588,8 +599,12 @@ void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable)
|
||||
|
||||
while (!memtable->ready_for_flush()) {
|
||||
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
||||
if (cost_time > 10 * 1000 * 1000) {
|
||||
if (TC_REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
if (cost_time > 5 * 1000 * 1000) {
|
||||
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
|
||||
if (need_resubmit_log()) {
|
||||
submit_log_for_freeze();
|
||||
TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K_(ls_id), K(cost_time));
|
||||
}
|
||||
TRANS_LOG(WARN, "[Freezer] ready_for_flush costs too much time",
|
||||
K_(ls_id), K(cost_time), KPC(memtable));
|
||||
stat_.add_diagnose_info("ready_for_flush costs too much time");
|
||||
@ -737,7 +752,10 @@ void ObFreezer::unset_freeze_()
|
||||
// Step2: unset max_decided_log_ts to invalid value
|
||||
max_decided_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
|
||||
// Step3: unset freeze_flag to flag the end of freeze
|
||||
// Step3: unset need_resubmit_log_
|
||||
set_need_resubmit_log(false);
|
||||
|
||||
// Step4: unset freeze_flag to flag the end of freeze
|
||||
// set the first bit 0
|
||||
do {
|
||||
old_v = ATOMIC_LOAD(&freeze_flag_);
|
||||
@ -756,7 +774,10 @@ void ObFreezer::undo_freeze_()
|
||||
// Step2: unset max_decided_log_ts to invalid value
|
||||
max_decided_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
|
||||
// Step3: unset freeze_flag and dec freeze_clock
|
||||
// Step3: unset need_resubmit_log_
|
||||
set_need_resubmit_log(false);
|
||||
|
||||
// Step4: unset freeze_flag and dec freeze_clock
|
||||
// used when freeze fails
|
||||
do {
|
||||
old_v = ATOMIC_LOAD(&freeze_flag_);
|
||||
|
@ -247,6 +247,8 @@ public:
|
||||
int get_newest_snapshot_version(const ObTabletID &tablet_id,
|
||||
int64_t &snapshot_version);
|
||||
ObFreezerStat& get_stat() { return stat_; }
|
||||
bool need_resubmit_log() { return ATOMIC_LOAD(&need_resubmit_log_); }
|
||||
void set_need_resubmit_log(bool flag) { return ATOMIC_STORE(&need_resubmit_log_, flag); }
|
||||
|
||||
private:
|
||||
class ObLSFreezeGuard
|
||||
@ -315,6 +317,8 @@ private:
|
||||
int64_t high_priority_freeze_cnt_; // waiting and freeze cnt
|
||||
int64_t low_priority_freeze_cnt_; // freeze tablet cnt
|
||||
|
||||
bool need_resubmit_log_;
|
||||
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "storage/tx/ob_trans_part_ctx.h"
|
||||
#include "storage/memtable/ob_memtable_util.h"
|
||||
#include "storage/tablelock/ob_table_lock_callback.h"
|
||||
#include "storage/ls/ob_freezer.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace common;
|
||||
@ -267,10 +268,14 @@ namespace memtable {
|
||||
ObMvccWriteGuard::~ObMvccWriteGuard()
|
||||
{
|
||||
if (NULL != ctx_) {
|
||||
int ret = OB_SUCCESS;
|
||||
auto tx_ctx = ctx_->get_trans_ctx();
|
||||
ctx_->write_done();
|
||||
if (is_freeze_) {
|
||||
(void)tx_ctx->submit_redo_log(is_freeze_);
|
||||
if (OB_NOT_NULL(memtable_) && memtable_->is_frozen_memtable()) {
|
||||
if (OB_FAIL(tx_ctx->submit_redo_log(true/*is_freeze*/))) {
|
||||
TRANS_LOG(WARN, "failed to submit freeze log", K(ret), KPC(tx_ctx));
|
||||
memtable_->get_freezer()->set_need_resubmit_log(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -44,6 +44,7 @@ class ObLockMemtable;
|
||||
namespace storage
|
||||
{
|
||||
class ObLsmtTransNode;
|
||||
class ObFreezer;
|
||||
}
|
||||
|
||||
using namespace transaction::tablelock;
|
||||
@ -261,10 +262,10 @@ public:
|
||||
ObMvccWriteGuard(const bool exclusive = false)
|
||||
: exclusive_(exclusive),
|
||||
ctx_(NULL),
|
||||
is_freeze_(false) {}
|
||||
memtable_(NULL) {}
|
||||
~ObMvccWriteGuard();
|
||||
void set_is_freeze(bool flag) {
|
||||
is_freeze_ = flag;
|
||||
void set_memtable(ObMemtable *memtable) {
|
||||
memtable_ = memtable;
|
||||
}
|
||||
/*
|
||||
* purpose of ensure replica writable
|
||||
@ -279,7 +280,7 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObMvccWriteGuard);
|
||||
const bool exclusive_; // if true multiple write_auth will be serialized
|
||||
ObMemtableCtx *ctx_;
|
||||
bool is_freeze_;
|
||||
ObMemtable *memtable_;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -311,7 +311,7 @@ int ObMemtable::set(
|
||||
row,
|
||||
NULL,
|
||||
NULL);
|
||||
guard.set_is_freeze(freezer_->is_freeze());
|
||||
guard.set_memtable(this);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -346,7 +346,7 @@ int ObMemtable::set(
|
||||
new_row,
|
||||
&old_row,
|
||||
&update_idx);
|
||||
guard.set_is_freeze(freezer_->is_freeze());
|
||||
guard.set_memtable(this);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -308,6 +308,7 @@ public:
|
||||
|
||||
// // TODO: ==================== Memtable Other Interface ==================
|
||||
int set_freezer(storage::ObFreezer *handler);
|
||||
storage::ObFreezer *get_freezer() { return freezer_; }
|
||||
int get_ls_id(share::ObLSID &ls_id);
|
||||
void set_memtable_mgr(storage::ObTabletMemtableMgr *mgr) { memtable_mgr_ = mgr; }
|
||||
void set_freeze_clock(const uint32_t freeze_clock) { ATOMIC_STORE(&freeze_clock_, freeze_clock); }
|
||||
|
Loading…
x
Reference in New Issue
Block a user