[BUG] remove callback when on failure
This commit is contained in:
parent
a184912a69
commit
a920d19e0a
@ -223,7 +223,6 @@ void ObTransCallbackMgr::reset()
|
||||
callback_lists_ = NULL;
|
||||
}
|
||||
parallel_stat_ = 0;
|
||||
leader_changed_ = false;
|
||||
callback_main_list_append_count_ = 0;
|
||||
callback_slave_list_append_count_ = 0;
|
||||
callback_slave_list_merge_count_ = 0;
|
||||
@ -445,6 +444,24 @@ int ObTransCallbackMgr::calc_checksum_before_scn(const SCN scn,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransCallbackMgr::sync_log_fail(const ObCallbackScope &callbacks,
|
||||
int64_t &removed_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
removed_cnt = 0;
|
||||
|
||||
// TODO(handora.qc): remove it in the future
|
||||
RDLockGuard guard(rwlock_);
|
||||
|
||||
if (callbacks.is_empty()) {
|
||||
// pass empty callbacks
|
||||
} else if (OB_FAIL(callback_list_.sync_log_fail(callbacks, removed_cnt))) {
|
||||
TRANS_LOG(ERROR, "sync log fail", K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTransCallbackMgr::update_checksum(const uint64_t checksum,
|
||||
const SCN checksum_scn)
|
||||
{
|
||||
|
@ -34,7 +34,8 @@ namespace memtable
|
||||
class ObMemtableCtxCbAllocator;
|
||||
class ObIMemtable;
|
||||
class ObMemtable;
|
||||
enum class MutatorType;
|
||||
class ObCallbackScope;
|
||||
enum class MutatorType;
|
||||
|
||||
class ObITransCallback;
|
||||
struct RedoDataNode
|
||||
@ -182,7 +183,6 @@ public:
|
||||
rwlock_(ObLatchIds::MEMTABLE_CALLBACK_LIST_MGR_LOCK),
|
||||
parallel_stat_(0),
|
||||
for_replay_(false),
|
||||
leader_changed_(false),
|
||||
callback_main_list_append_count_(0),
|
||||
callback_slave_list_append_count_(0),
|
||||
callback_slave_list_merge_count_(0),
|
||||
@ -224,6 +224,8 @@ public:
|
||||
private:
|
||||
void wakeup_waiting_txns_();
|
||||
public:
|
||||
int sync_log_fail(const ObCallbackScope &callbacks,
|
||||
int64_t &removed_cnt);
|
||||
int calc_checksum_before_scn(const share::SCN scn,
|
||||
uint64_t &checksum,
|
||||
share::SCN &checksum_scn);
|
||||
@ -308,7 +310,6 @@ private:
|
||||
int64_t parallel_stat_;
|
||||
};
|
||||
bool for_replay_;
|
||||
bool leader_changed_;
|
||||
// statistics for callback remove
|
||||
int64_t callback_main_list_append_count_;
|
||||
int64_t callback_slave_list_append_count_;
|
||||
|
@ -497,6 +497,42 @@ public:
|
||||
VIRTUAL_TO_STRING_KV("CleanUnlogCallback", "CleanUnlogCallback");
|
||||
};
|
||||
|
||||
class ObSyncLogFailFunctor : public ObITxCallbackFunctor
|
||||
{
|
||||
public:
|
||||
ObSyncLogFailFunctor() {}
|
||||
|
||||
virtual int operator()(ObITransCallback *callback) override
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (NULL == callback) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected callback", KP(callback));
|
||||
} else if (callback->need_fill_redo() && callback->need_submit_log()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "sync log fail will only touch submitted log", KPC(callback));
|
||||
} else if (!callback->need_fill_redo() && !callback->need_submit_log()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "sync log fail will only touch unsynced log", KPC(callback));
|
||||
} else if (!callback->need_fill_redo() && callback->need_submit_log()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "It will never on success before submit log", KPC(callback));
|
||||
} else if (callback->need_fill_redo() && !callback->need_submit_log()) {
|
||||
if (OB_FAIL(callback->log_sync_fail_cb())) {
|
||||
// log_sync_fail_cb will never report error
|
||||
TRANS_LOG(ERROR, "log sync fail cb report error", K(ret));
|
||||
} else {
|
||||
need_remove_callback_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
VIRTUAL_TO_STRING_KV("ObSyncLogFailFunctor", "ObSyncLogFailFunctor");
|
||||
};
|
||||
|
||||
class ObSearchCallbackWCondFunctor : public ObITxCallbackFunctor
|
||||
{
|
||||
public:
|
||||
|
@ -87,6 +87,18 @@ int ObTxCallbackList::callback_(ObITxCallbackFunctor &functor)
|
||||
return callback_(functor, get_guard(), get_guard());
|
||||
}
|
||||
|
||||
int ObTxCallbackList::callback_(ObITxCallbackFunctor &functor,
|
||||
const ObCallbackScope &callbacks)
|
||||
{
|
||||
ObITransCallback *start = (ObITransCallback *)*(callbacks.start_);
|
||||
ObITransCallback *end = (ObITransCallback *)*(callbacks.end_);
|
||||
if (functor.is_reverse()) {
|
||||
return callback_(functor, start->get_next(), end->get_prev());
|
||||
} else {
|
||||
return callback_(functor, start->get_prev(), end->get_next());
|
||||
}
|
||||
}
|
||||
|
||||
int ObTxCallbackList::callback_(ObITxCallbackFunctor &functor,
|
||||
ObITransCallback *start,
|
||||
ObITransCallback *end)
|
||||
@ -253,6 +265,23 @@ int ObTxCallbackList::reverse_search_callback_by_seq_no(const int64_t seq_no,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCallbackList::sync_log_fail(const ObCallbackScope &callbacks,
|
||||
int64_t &removed_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSyncLogFailFunctor functor;
|
||||
|
||||
SpinLockGuard guard(latch_);
|
||||
|
||||
if (OB_FAIL(callback_(functor, callbacks))) {
|
||||
TRANS_LOG(WARN, "clean unlog callbacks failed", K(ret), K(functor));
|
||||
} else {
|
||||
TRANS_LOG(INFO, "sync failed log successfully", K(functor), K(*this));
|
||||
}
|
||||
removed_cnt = functor.get_remove_cnt();
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCallbackList::clean_unlog_callbacks(int64_t &removed_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -22,6 +22,7 @@ namespace memtable
|
||||
{
|
||||
|
||||
class ObTransCallbackMgr;
|
||||
class ObCallbackScope;
|
||||
|
||||
class ObTxCallbackList
|
||||
{
|
||||
@ -82,6 +83,10 @@ public:
|
||||
// when switch to follower forcely.
|
||||
int clean_unlog_callbacks(int64_t &removed_cnt);
|
||||
|
||||
// sync_log_fail will remove all callbacks that not sync successfully. Which
|
||||
// is called when callback is on failure.
|
||||
int sync_log_fail(const ObCallbackScope &callbacks, int64_t &removed_cnt);
|
||||
|
||||
// tx_calc_checksum_before_scn will calculate checksum during execution. It will
|
||||
// remember the intermediate results for final result.
|
||||
int tx_calc_checksum_before_scn(const share::SCN scn);
|
||||
@ -113,6 +118,8 @@ public:
|
||||
|
||||
private:
|
||||
int callback_(ObITxCallbackFunctor &func);
|
||||
int callback_(ObITxCallbackFunctor &functor,
|
||||
const ObCallbackScope &callbacks);
|
||||
int callback_(ObITxCallbackFunctor &func,
|
||||
ObITransCallback *start,
|
||||
ObITransCallback *end);
|
||||
|
@ -288,16 +288,6 @@ int ObMemtableCtx::write_done()
|
||||
return rwlock_.unlock();
|
||||
}
|
||||
|
||||
void ObMemtableCtx::replay_auth()
|
||||
{
|
||||
lock_.lock();
|
||||
}
|
||||
|
||||
void ObMemtableCtx::replay_done()
|
||||
{
|
||||
lock_.unlock();
|
||||
}
|
||||
|
||||
int ObMemtableCtx::write_lock_yield()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -432,8 +432,6 @@ public:
|
||||
virtual void dec_unsubmitted_cnt() override;
|
||||
virtual void inc_unsynced_cnt() override;
|
||||
virtual void dec_unsynced_cnt() override;
|
||||
void replay_auth();
|
||||
void replay_done();
|
||||
int64_t get_checksum() const { return trans_mgr_.get_checksum(); }
|
||||
int64_t get_tmp_checksum() const { return trans_mgr_.get_tmp_checksum(); }
|
||||
share::SCN get_checksum_scn() const { return trans_mgr_.get_checksum_scn(); }
|
||||
|
@ -251,29 +251,16 @@ int ObRedoLogGenerator::sync_log_succ(const SCN scn, const ObCallbackScope &call
|
||||
void ObRedoLogGenerator::sync_log_fail(const ObCallbackScope &callbacks)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t removed_cnt = 0;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(ERROR, "not init", K(ret));
|
||||
} else if (!callbacks.is_empty()) {
|
||||
ObTransCallbackMgr::RDLockGuard guard(callback_mgr_->get_rwlock());
|
||||
ObITransCallbackIterator cursor = callbacks.start_;
|
||||
do {
|
||||
ObITransCallback *iter = (ObITransCallback *)*cursor;
|
||||
if (iter->need_fill_redo()) {
|
||||
if (OB_TMP_FAIL(iter->log_sync_fail_cb())) {
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = tmp_ret;
|
||||
}
|
||||
TRANS_LOG(WARN, "failed to set sync log info for callback ", K(tmp_ret), K(*iter));
|
||||
} else {
|
||||
redo_sync_fail_cnt_ += 1;
|
||||
}
|
||||
} else {
|
||||
TRANS_LOG(ERROR, "sync_log_fail error", K(ret), K(iter), K(iter->need_fill_redo()));
|
||||
}
|
||||
} while (cursor != callbacks.end_ && !FALSE_IT(cursor++));
|
||||
if (OB_FAIL(callback_mgr_->sync_log_fail(callbacks, removed_cnt))) {
|
||||
TRANS_LOG(ERROR, "sync log failed", K(ret));
|
||||
}
|
||||
redo_sync_fail_cnt_ += removed_cnt;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -149,17 +149,18 @@ public:
|
||||
return cb;
|
||||
}
|
||||
|
||||
void create_and_append_callback(ObMemtable *mt,
|
||||
bool need_submit_log = true,
|
||||
bool need_fill_redo = true,
|
||||
share::SCN scn = share::SCN::max_scn())
|
||||
{
|
||||
ObITransCallback *create_and_append_callback(ObMemtable *mt,
|
||||
bool need_submit_log = true,
|
||||
bool need_fill_redo = true,
|
||||
share::SCN scn = share::SCN::max_scn())
|
||||
{
|
||||
ObMockTxCallback *cb = create_callback(mt,
|
||||
need_submit_log,
|
||||
need_fill_redo,
|
||||
scn);
|
||||
EXPECT_NE(NULL, (long)cb);
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.append_callback(cb));
|
||||
return cb;
|
||||
}
|
||||
|
||||
ObMemtable *create_memtable()
|
||||
@ -227,6 +228,38 @@ int ObMockTxCallback::calc_checksum(const share::SCN checksum_scn,
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
|
||||
TEST_F(TestTxCallbackList, remove_callback_on_failure)
|
||||
{
|
||||
ObMemtable *memtable = create_memtable();
|
||||
share::SCN scn_1;
|
||||
scn_1.convert_for_logservice(1);
|
||||
|
||||
create_and_append_callback(memtable,
|
||||
false, /*need_submit_log*/
|
||||
false, /*need_fill_redo*/
|
||||
scn_1);
|
||||
auto cb1 = create_and_append_callback(memtable,
|
||||
false, /*need_submit_log*/
|
||||
true /*need_fill_redo*/);
|
||||
auto cb2 = create_and_append_callback(memtable,
|
||||
false, /*need_submit_log*/
|
||||
true /*need_fill_redo*/);
|
||||
create_and_append_callback(memtable,
|
||||
false, /*need_submit_log*/
|
||||
true /*need_fill_redo*/);
|
||||
|
||||
ObCallbackScope scope;
|
||||
int64_t removed_cnt = 0;
|
||||
scope.start_ = ObITransCallbackIterator(cb1);
|
||||
scope.end_ = ObITransCallbackIterator(cb2);
|
||||
|
||||
EXPECT_EQ(false, scope.is_empty());
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.sync_log_fail(scope, removed_cnt));
|
||||
|
||||
EXPECT_EQ(2, removed_cnt);
|
||||
EXPECT_EQ(2, callback_list_.get_length());
|
||||
}
|
||||
|
||||
TEST_F(TestTxCallbackList, remove_callback_by_tx_commit)
|
||||
{
|
||||
ObMemtable *memtable = create_memtable();
|
||||
|
Loading…
x
Reference in New Issue
Block a user