[OPTIMIZATION] performance is slow down for submit redo with big txn
This commit is contained in:
@ -442,11 +442,13 @@ void ObTransCallbackMgr::reset_pdml_stat()
|
||||
force_merge_multi_callback_lists();
|
||||
}
|
||||
|
||||
int ObTransCallbackMgr::remove_callbacks_for_fast_commit(bool &has_remove)
|
||||
int ObTransCallbackMgr::remove_callbacks_for_fast_commit(const ObITransCallback *generate_cursor,
|
||||
bool &meet_generate_cursor)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(callback_list_.remove_callbacks_for_fast_commit(has_remove))) {
|
||||
if (OB_FAIL(callback_list_.remove_callbacks_for_fast_commit(generate_cursor,
|
||||
meet_generate_cursor))) {
|
||||
TRANS_LOG(WARN, "remove callbacks for fast commit fail", K(ret));
|
||||
}
|
||||
|
||||
|
@ -217,7 +217,8 @@ public:
|
||||
const int64_t from_seq_no);
|
||||
void set_for_replay(const bool for_replay);
|
||||
bool is_for_replay() const { return ATOMIC_LOAD(&for_replay_); }
|
||||
int remove_callbacks_for_fast_commit(bool &has_remove);
|
||||
int remove_callbacks_for_fast_commit(const ObITransCallback *generate_cursor,
|
||||
bool &meet_generate_cursor);
|
||||
int remove_callback_for_uncommited_txn(
|
||||
const memtable::ObMemtableSet *memtable_set,
|
||||
const share::SCN max_applied_scn);
|
||||
|
@ -72,11 +72,14 @@ protected:
|
||||
class ObRemoveCallbacksForFastCommitFunctor : public ObITxCallbackFunctor
|
||||
{
|
||||
public:
|
||||
ObRemoveCallbacksForFastCommitFunctor(const int64_t need_remove_count)
|
||||
: need_remove_count_(need_remove_count),
|
||||
ObRemoveCallbacksForFastCommitFunctor(const ObITransCallback *generate_cursor,
|
||||
const int64_t need_remove_count)
|
||||
: generate_cursor_(generate_cursor),
|
||||
need_remove_count_(need_remove_count),
|
||||
last_scn_for_remove_(share::SCN::min_scn()),
|
||||
checksum_scn_(share::SCN::min_scn()),
|
||||
checksumer_(NULL) {}
|
||||
checksumer_(NULL),
|
||||
meet_generate_cursor_(false) {}
|
||||
|
||||
virtual bool is_iter_end(ObITransCallback *callback) const override
|
||||
{
|
||||
@ -136,6 +139,12 @@ public:
|
||||
need_remove_callback_ = true;
|
||||
need_remove_count_--;
|
||||
|
||||
// If we are removing callback pointed by generate_cursor, we need reset
|
||||
// the generate_cursor. Otherwise the dangling pointer may coredump.
|
||||
if (generate_cursor_ == callback) {
|
||||
meet_generate_cursor_ = true;
|
||||
}
|
||||
|
||||
// if we satisfy the removing count of fast commit, we still need remember
|
||||
// the last log ts we have already removed and then continue to remove the
|
||||
// callbacks until all callbacks with the same log ts has been removed in
|
||||
@ -156,15 +165,26 @@ public:
|
||||
return ret;
|
||||
}
|
||||
|
||||
VIRTUAL_TO_STRING_KV(K_(need_remove_count),
|
||||
// return whether we are removing callbacks that pointed by generate cursor
|
||||
bool meet_generate_cursor() const
|
||||
{
|
||||
return meet_generate_cursor_;
|
||||
}
|
||||
|
||||
VIRTUAL_TO_STRING_KV(KP_(generate_cursor),
|
||||
K_(need_remove_count),
|
||||
K_(checksum_scn),
|
||||
K_(last_scn_for_remove));
|
||||
K_(last_scn_for_remove),
|
||||
K_(meet_generate_cursor));
|
||||
|
||||
private:
|
||||
const ObITransCallback *generate_cursor_;
|
||||
int64_t need_remove_count_;
|
||||
share::SCN last_scn_for_remove_;
|
||||
share::SCN checksum_scn_;
|
||||
ObBatchChecksum *checksumer_;
|
||||
|
||||
bool meet_generate_cursor_;
|
||||
};
|
||||
|
||||
class ObNeverStopForCallbackTraverseFunctor
|
||||
|
@ -172,13 +172,15 @@ int64_t ObTxCallbackList::calc_need_remove_count_for_fast_commit_()
|
||||
return need_remove_count;
|
||||
}
|
||||
|
||||
int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove)
|
||||
int ObTxCallbackList::remove_callbacks_for_fast_commit(const ObITransCallback *generate_cursor,
|
||||
bool &meet_generate_cursor)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
has_remove = false;
|
||||
meet_generate_cursor = false;
|
||||
SpinLockGuard guard(latch_);
|
||||
|
||||
ObRemoveCallbacksForFastCommitFunctor functor(calc_need_remove_count_for_fast_commit_());
|
||||
ObRemoveCallbacksForFastCommitFunctor functor(generate_cursor,
|
||||
calc_need_remove_count_for_fast_commit_());
|
||||
functor.set_checksumer(checksum_scn_, &batch_checksum_);
|
||||
|
||||
if (OB_FAIL(callback_(functor))) {
|
||||
@ -186,7 +188,7 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove)
|
||||
} else {
|
||||
callback_mgr_.add_fast_commit_callback_remove_cnt(functor.get_remove_cnt());
|
||||
ensure_checksum_(functor.get_checksum_last_scn());
|
||||
has_remove = SCN::min_scn() != functor.get_checksum_last_scn();
|
||||
meet_generate_cursor = functor.meet_generate_cursor();
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -53,9 +53,10 @@ public:
|
||||
// parameter _fast_commit_callback_count. It will only remove callbacks
|
||||
// without removing data by calling checkpoint_callback. So user need
|
||||
// implement lazy callback for the correctness. What's more, it will calculate
|
||||
// checksum when removing. Finally it returns has_remove if you really remove
|
||||
// the callbacks.
|
||||
int remove_callbacks_for_fast_commit(bool &has_remove);
|
||||
// checksum when removing. Finally it returns meet_generate_cursor if you remove
|
||||
// the callbacks that generate_cursor is pointing to.
|
||||
int remove_callbacks_for_fast_commit(const ObITransCallback *generate_cursor,
|
||||
bool &meet_generate_cursor);
|
||||
|
||||
// remove_callbacks_for_remove_memtable will remove all callbacks that is
|
||||
// belonged to the specified memtable sets. It will only remove callbacks
|
||||
|
@ -857,13 +857,15 @@ bool ObMemtableCtx::is_all_redo_submitted()
|
||||
int ObMemtableCtx::remove_callbacks_for_fast_commit()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool has_remove = false;
|
||||
bool meet_generate_cursor = false;
|
||||
common::ObTimeGuard timeguard("remove callbacks for fast commit", 10 * 1000);
|
||||
ObByteLockGuard guard(lock_);
|
||||
|
||||
if (OB_FAIL(trans_mgr_.remove_callbacks_for_fast_commit(has_remove))) {
|
||||
if (OB_FAIL(trans_mgr_.remove_callbacks_for_fast_commit(
|
||||
log_gen_.get_generate_cursor(),
|
||||
meet_generate_cursor))) {
|
||||
TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), KPC(this));
|
||||
} else if (has_remove && OB_FAIL(reuse_log_generator_())) {
|
||||
} else if (meet_generate_cursor && OB_FAIL(reuse_log_generator_())) {
|
||||
TRANS_LOG(ERROR, "fail to reset log generator", K(ret), KPC(this));
|
||||
}
|
||||
|
||||
|
@ -428,25 +428,25 @@ TEST_F(TestTxCallbackList, remove_callback_by_fast_commit)
|
||||
|
||||
fast_commit_reserve_cnt_ = 16;
|
||||
bool has_remove = false;
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(has_remove));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(nullptr, has_remove));
|
||||
EXPECT_EQ(8, callback_list_.get_length());
|
||||
EXPECT_EQ(1, mgr_.get_callback_remove_for_fast_commit_count());
|
||||
EXPECT_EQ(true, has_remove);
|
||||
|
||||
fast_commit_reserve_cnt_ = 14;
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(has_remove));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(nullptr, has_remove));
|
||||
EXPECT_EQ(6, callback_list_.get_length());
|
||||
EXPECT_EQ(3, mgr_.get_callback_remove_for_fast_commit_count());
|
||||
EXPECT_EQ(true, has_remove);
|
||||
|
||||
fast_commit_reserve_cnt_ = 1;
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(has_remove));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(nullptr, has_remove));
|
||||
EXPECT_EQ(4, callback_list_.get_length());
|
||||
EXPECT_EQ(5, mgr_.get_callback_remove_for_fast_commit_count());
|
||||
EXPECT_EQ(true, has_remove);
|
||||
|
||||
fast_commit_reserve_cnt_ = 1;
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(has_remove));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(nullptr, has_remove));
|
||||
EXPECT_EQ(4, callback_list_.get_length());
|
||||
EXPECT_EQ(5, mgr_.get_callback_remove_for_fast_commit_count());
|
||||
EXPECT_EQ(false, has_remove);
|
||||
@ -884,22 +884,22 @@ TEST_F(TestTxCallbackList, checksum_fast_commit_and_tx_end)
|
||||
|
||||
fast_commit_reserve_cnt_ = 16;
|
||||
bool has_remove = false;
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(has_remove));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(nullptr, has_remove));
|
||||
EXPECT_EQ(true, is_checksum_equal(1, checksum_));
|
||||
EXPECT_EQ(scn_2, callback_list_.checksum_scn_);
|
||||
|
||||
fast_commit_reserve_cnt_ = 14;
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(has_remove));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(nullptr, has_remove));
|
||||
EXPECT_EQ(true, is_checksum_equal(3, checksum_));
|
||||
EXPECT_EQ(scn_3, callback_list_.checksum_scn_);
|
||||
|
||||
fast_commit_reserve_cnt_ = 1;
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(has_remove));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(nullptr, has_remove));
|
||||
EXPECT_EQ(true, is_checksum_equal(5, checksum_));
|
||||
EXPECT_EQ(scn_4, callback_list_.checksum_scn_);
|
||||
|
||||
fast_commit_reserve_cnt_ = 1;
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(has_remove));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_fast_commit(nullptr, has_remove));
|
||||
EXPECT_EQ(true, is_checksum_equal(5, checksum_));
|
||||
EXPECT_EQ(scn_4, callback_list_.checksum_scn_);
|
||||
|
||||
@ -1074,7 +1074,7 @@ TEST_F(TestTxCallbackList, checksum_all_and_tx_end_test) {
|
||||
bool has_remove = false;
|
||||
if (enable) {
|
||||
EXPECT_EQ(OB_SUCCESS,
|
||||
callback_list_.remove_callbacks_for_fast_commit(has_remove));
|
||||
callback_list_.remove_callbacks_for_fast_commit(nullptr, has_remove));
|
||||
EXPECT_EQ(true, has_remove);
|
||||
}
|
||||
|
||||
@ -1157,7 +1157,8 @@ void ObMemtableCtx::callback_free(ObITransCallback *cb)
|
||||
}
|
||||
}
|
||||
|
||||
int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove)
|
||||
int ObTxCallbackList::remove_callbacks_for_fast_commit(const ObITransCallback *callback,
|
||||
bool &has_remove)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
has_remove = false;
|
||||
@ -1167,7 +1168,7 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove)
|
||||
const int64_t recommand_reserve_count = (fast_commit_callback_count + 1) / 2;
|
||||
const int64_t need_remove_count = length_ - recommand_reserve_count;
|
||||
|
||||
ObRemoveCallbacksForFastCommitFunctor functor(need_remove_count);
|
||||
ObRemoveCallbacksForFastCommitFunctor functor(callback, need_remove_count);
|
||||
functor.set_checksumer(checksum_scn_, &batch_checksum_);
|
||||
|
||||
if (OB_FAIL(callback_(functor))) {
|
||||
|
Reference in New Issue
Block a user