use max_applied_scn in remove_callbacks_for_remove_memtable
Co-authored-by: Handora <qcdsr970209@gmail.com>
This commit is contained in:
@ -390,14 +390,14 @@ int ObTransCallbackMgr::remove_callbacks_for_fast_commit(bool &has_remove)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransCallbackMgr::remove_callback_for_uncommited_txn(ObIMemtable *memtable)
|
||||
int ObTransCallbackMgr::remove_callback_for_uncommited_txn(ObIMemtable *memtable, const share::SCN max_applied_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_ISNULL(memtable)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "memtable is null", K(ret));
|
||||
} else if (OB_FAIL(callback_list_.remove_callbacks_for_remove_memtable(memtable))) {
|
||||
} else if (OB_FAIL(callback_list_.remove_callbacks_for_remove_memtable(memtable, max_applied_scn))) {
|
||||
TRANS_LOG(WARN, "fifo remove callback fail", K(ret), K(*memtable));
|
||||
}
|
||||
|
||||
|
@ -209,7 +209,7 @@ public:
|
||||
void set_for_replay(const bool for_replay);
|
||||
bool is_for_replay() const { return ATOMIC_LOAD(&for_replay_); }
|
||||
int remove_callbacks_for_fast_commit(bool &has_remove);
|
||||
int remove_callback_for_uncommited_txn(memtable::ObIMemtable *memtable);
|
||||
int remove_callback_for_uncommited_txn(memtable::ObIMemtable *memtable, const share::SCN max_applied_scn);
|
||||
int get_memtable_key_arr(transaction::ObMemtableKeyArray &memtable_key_arr);
|
||||
void acquire_callback_list();
|
||||
void revert_callback_list();
|
||||
|
@ -163,7 +163,7 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove)
|
||||
int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove, const share::SCN max_applied_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SpinLockGuard guard(latch_);
|
||||
@ -177,8 +177,8 @@ int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable
|
||||
return false;
|
||||
}
|
||||
}, // condition for stop
|
||||
[memtable_for_remove](ObITransCallback *callback) -> bool {
|
||||
if (callback->get_scn() > memtable_for_remove->get_key().get_end_scn()) {
|
||||
[max_applied_scn](ObITransCallback *callback) -> bool {
|
||||
if (callback->get_scn() > max_applied_scn) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -62,7 +62,8 @@ public:
|
||||
// callback for the correctness. And user need guarantee all callbacks
|
||||
// belonged to the memtable must be synced before removing. What's more, it
|
||||
// will calculate checksum when removing.
|
||||
int remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove);
|
||||
int remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove,
|
||||
const share::SCN max_applied_scn);
|
||||
|
||||
// remove_callbacks_for_rollback_to will remove callbacks from back to front
|
||||
// until callbacks smaller or equal than the seq_no. It will remove both
|
||||
|
@ -203,7 +203,6 @@ int ObMemtable::remove_unused_callback_for_uncommited_txn_()
|
||||
transaction::ObTransService *txs_svr = MTL(transaction::ObTransService *);
|
||||
|
||||
if (NULL != txs_svr
|
||||
&& share::ObScnRange::MAX_SCN != get_end_scn()
|
||||
&& OB_FAIL(txs_svr->remove_callback_for_uncommited_txn(this))) {
|
||||
TRANS_LOG(WARN, "remove callback for uncommited txn failed", K(ret), K(*this));
|
||||
}
|
||||
|
@ -927,7 +927,7 @@ int ObMemtableCtx::remove_callbacks_for_fast_commit()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMemtableCtx::remove_callback_for_uncommited_txn(ObMemtable *mt)
|
||||
int ObMemtableCtx::remove_callback_for_uncommited_txn(ObMemtable *mt, const share::SCN max_applied_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObByteLockGuard guard(lock_);
|
||||
@ -937,7 +937,7 @@ int ObMemtableCtx::remove_callback_for_uncommited_txn(ObMemtable *mt)
|
||||
TRANS_LOG(WARN, "memtable is NULL", K(mt));
|
||||
} else if (OB_FAIL(reuse_log_generator_())) {
|
||||
TRANS_LOG(ERROR, "fail to reset log generator", K(ret));
|
||||
} else if (OB_FAIL(trans_mgr_.remove_callback_for_uncommited_txn(mt))) {
|
||||
} else if (OB_FAIL(trans_mgr_.remove_callback_for_uncommited_txn(mt, max_applied_scn))) {
|
||||
TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt));
|
||||
}
|
||||
|
||||
|
@ -399,7 +399,7 @@ public:
|
||||
uint64_t get_tenant_id() const;
|
||||
inline bool has_read_elr_data() const { return read_elr_data_; }
|
||||
int remove_callbacks_for_fast_commit();
|
||||
int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt);
|
||||
int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt, const share::SCN max_applied_scn);
|
||||
int rollback(const int64_t seq_no, const int64_t from_seq_no);
|
||||
bool is_all_redo_submitted();
|
||||
bool is_for_replay() const { return trans_mgr_.is_for_replay(); }
|
||||
|
@ -1504,8 +1504,8 @@ int ObPartTransCtx::remove_callback_for_uncommited_txn(ObMemtable *mt)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "memtable is NULL", K(mt));
|
||||
} else if (OB_UNLIKELY(is_exiting_)) {
|
||||
} else if (OB_FAIL(mt_ctx_.remove_callback_for_uncommited_txn(mt))) {
|
||||
TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt_ctx_));
|
||||
} else if (OB_FAIL(mt_ctx_.remove_callback_for_uncommited_txn(mt, exec_info_.max_applied_log_ts_))) {
|
||||
TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt_ctx_), K(exec_info_.max_applied_log_ts_));
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -327,19 +327,19 @@ TEST_F(TestTxCallbackList, remove_callback_by_release_memtable)
|
||||
EXPECT_EQ(9, callback_list_.get_length());
|
||||
|
||||
EXPECT_EQ(OB_SUCCESS,
|
||||
callback_list_.remove_callbacks_for_remove_memtable(memtable2));
|
||||
callback_list_.remove_callbacks_for_remove_memtable(memtable2, scn_1/*not used*/));
|
||||
|
||||
EXPECT_EQ(7, callback_list_.get_length());
|
||||
EXPECT_EQ(2, mgr_.get_callback_remove_for_remove_memtable_count());
|
||||
|
||||
EXPECT_EQ(OB_SUCCESS,
|
||||
callback_list_.remove_callbacks_for_remove_memtable(memtable1));
|
||||
callback_list_.remove_callbacks_for_remove_memtable(memtable1, scn_1/*not used*/));
|
||||
|
||||
EXPECT_EQ(5, callback_list_.get_length());
|
||||
EXPECT_EQ(4, mgr_.get_callback_remove_for_remove_memtable_count());
|
||||
|
||||
EXPECT_EQ(OB_SUCCESS,
|
||||
callback_list_.remove_callbacks_for_remove_memtable(memtable3));
|
||||
callback_list_.remove_callbacks_for_remove_memtable(memtable3, scn_1/*not used*/));
|
||||
|
||||
EXPECT_EQ(4, callback_list_.get_length());
|
||||
EXPECT_EQ(5, mgr_.get_callback_remove_for_remove_memtable_count());
|
||||
@ -787,16 +787,16 @@ TEST_F(TestTxCallbackList, checksum_remove_memtable_and_tx_end)
|
||||
|
||||
EXPECT_EQ(9, callback_list_.get_length());
|
||||
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable2));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable2, scn_1/*not used*/));
|
||||
EXPECT_EQ(true, is_checksum_equal(5, checksum_));
|
||||
EXPECT_EQ(scn_3, callback_list_.checksum_scn_);
|
||||
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable3));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable3, scn_1/*not used*/));
|
||||
EXPECT_EQ(true, is_checksum_equal(5, checksum_));
|
||||
EXPECT_EQ(scn_3, callback_list_.checksum_scn_);
|
||||
|
||||
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable1));
|
||||
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable1, scn_1/*not used*/));
|
||||
EXPECT_EQ(true, is_checksum_equal(5, checksum_));
|
||||
EXPECT_EQ(scn_3, callback_list_.checksum_scn_);
|
||||
|
||||
@ -1010,6 +1010,8 @@ TEST_F(TestTxCallbackList, checksum_all_and_tx_end_test) {
|
||||
ObFunction<bool()> remove_memtable_op =
|
||||
[&]() -> bool {
|
||||
ObMemtable *mt = mts[ObRandom::rand(0, mt_cnt-1)];
|
||||
share::SCN scn_1;
|
||||
scn_1.convert_for_logservice(1);
|
||||
bool enable = false;
|
||||
for (ObITransCallback* it = need_submit_head->next_;
|
||||
it != &(callback_list_.head_);
|
||||
@ -1024,7 +1026,7 @@ TEST_F(TestTxCallbackList, checksum_all_and_tx_end_test) {
|
||||
|
||||
if (enable) {
|
||||
EXPECT_EQ(OB_SUCCESS,
|
||||
callback_list_.remove_callbacks_for_remove_memtable(mt));
|
||||
callback_list_.remove_callbacks_for_remove_memtable(mt, scn_1/*not used*/));
|
||||
}
|
||||
|
||||
return enable;
|
||||
@ -1139,7 +1141,8 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove)
|
||||
int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove,
|
||||
const share::SCN)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SpinLockGuard guard(latch_);
|
||||
|
Reference in New Issue
Block a user