|
|
|
@ -56,7 +56,7 @@ int ObTxCallbackList::append_callback(ObITransCallback *callback,
|
|
|
|
|
// It is important that we should put the before_append_cb and after_append_cb
|
|
|
|
|
// into the latch guard otherwise the callback may already paxosed and released
|
|
|
|
|
// before callback it.
|
|
|
|
|
SpinLockGuard lock(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
if (OB_ISNULL(callback)) {
|
|
|
|
|
ret = OB_ERR_UNEXPECTED;
|
|
|
|
@ -83,8 +83,8 @@ int64_t ObTxCallbackList::concat_callbacks(ObTxCallbackList &that)
|
|
|
|
|
if (that.empty()) {
|
|
|
|
|
// do nothing
|
|
|
|
|
} else {
|
|
|
|
|
SpinLockGuard this_lock(latch_);
|
|
|
|
|
SpinLockGuard that_lock(that.latch_);
|
|
|
|
|
ObByteLockGuard this_guard(latch_);
|
|
|
|
|
ObByteLockGuard that_guard(that.latch_);
|
|
|
|
|
ObITransCallback *that_head = that.head_.get_next();
|
|
|
|
|
ObITransCallback *that_tail = that.get_tail();
|
|
|
|
|
that_head->set_prev(get_tail());
|
|
|
|
@ -177,7 +177,7 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(const ObITransCallback *g
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
meet_generate_cursor = false;
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
ObRemoveCallbacksForFastCommitFunctor functor(generate_cursor,
|
|
|
|
|
calc_need_remove_count_for_fast_commit_());
|
|
|
|
@ -199,7 +199,7 @@ int ObTxCallbackList::remove_callbacks_for_remove_memtable(
|
|
|
|
|
const share::SCN max_applied_scn)
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
ObRemoveSyncCallbacksWCondFunctor functor(
|
|
|
|
|
// condition for remove
|
|
|
|
@ -249,7 +249,7 @@ int ObTxCallbackList::remove_callbacks_for_remove_memtable(
|
|
|
|
|
int ObTxCallbackList::remove_callbacks_for_rollback_to(const transaction::ObTxSEQ to_seq_no)
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
ObRemoveCallbacksWCondFunctor functor(
|
|
|
|
|
[to_seq_no](ObITransCallback *callback) -> bool {
|
|
|
|
@ -269,7 +269,6 @@ int ObTxCallbackList::remove_callbacks_for_rollback_to(const transaction::ObTxSE
|
|
|
|
|
TRANS_LOG(DEBUG, "remove callbacks for rollback to", K(to_seq_no), K(functor), K(*this));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -287,7 +286,7 @@ int ObTxCallbackList::reverse_search_callback_by_seq_no(const transaction::ObTxS
|
|
|
|
|
}
|
|
|
|
|
}, true/*is_reverse*/);
|
|
|
|
|
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
if (OB_FAIL(callback_(functor))) {
|
|
|
|
|
TRANS_LOG(ERROR, "search callbacks wont report error", K(ret), K(functor));
|
|
|
|
@ -304,7 +303,7 @@ int ObTxCallbackList::sync_log_fail(const ObCallbackScope &callbacks,
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
ObSyncLogFailFunctor functor;
|
|
|
|
|
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
if (OB_FAIL(callback_(functor, callbacks))) {
|
|
|
|
|
TRANS_LOG(WARN, "clean unlog callbacks failed", K(ret), K(functor));
|
|
|
|
@ -320,7 +319,7 @@ int ObTxCallbackList::clean_unlog_callbacks(int64_t &removed_cnt)
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
ObCleanUnlogCallbackFunctor functor;
|
|
|
|
|
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
if (OB_FAIL(callback_(functor))) {
|
|
|
|
|
TRANS_LOG(WARN, "clean unlog callbacks failed", K(ret), K(functor));
|
|
|
|
@ -336,7 +335,7 @@ int ObTxCallbackList::get_memtable_key_arr_w_timeout(transaction::ObMemtableKeyA
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
ObGetMemtableKeyWTimeoutFunctor functor(memtable_key_arr);
|
|
|
|
|
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
if (OB_FAIL(callback_(functor))) {
|
|
|
|
|
TRANS_LOG(WARN, "get memtable key arr failed", K(ret), K(functor));
|
|
|
|
@ -348,7 +347,7 @@ int ObTxCallbackList::get_memtable_key_arr_w_timeout(transaction::ObMemtableKeyA
|
|
|
|
|
int ObTxCallbackList::tx_calc_checksum_before_scn(const SCN scn)
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
ObCalcChecksumFunctor functor(scn);
|
|
|
|
|
functor.set_checksumer(checksum_scn_, &batch_checksum_);
|
|
|
|
@ -367,7 +366,7 @@ int ObTxCallbackList::tx_calc_checksum_before_scn(const SCN scn)
|
|
|
|
|
int ObTxCallbackList::tx_calc_checksum_all()
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
ObCalcChecksumFunctor functor;
|
|
|
|
|
functor.set_checksumer(checksum_scn_, &batch_checksum_);
|
|
|
|
@ -386,7 +385,7 @@ int ObTxCallbackList::tx_commit()
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
ObTxEndFunctor functor(true/*is_commit*/);
|
|
|
|
|
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
if (OB_FAIL(callback_(functor))) {
|
|
|
|
|
TRANS_LOG(WARN, "trans commit failed", K(ret), K(functor));
|
|
|
|
@ -402,7 +401,7 @@ int ObTxCallbackList::tx_abort()
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
ObTxEndFunctor functor(false/*is_commit*/);
|
|
|
|
|
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
if (OB_FAIL(callback_(functor))) {
|
|
|
|
|
TRANS_LOG(WARN, "trans abort failed", K(ret), K(functor));
|
|
|
|
@ -421,7 +420,7 @@ int ObTxCallbackList::tx_elr_preparing()
|
|
|
|
|
return callback->elr_trans_preparing();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
if (OB_FAIL(callback_(functor))) {
|
|
|
|
|
TRANS_LOG(WARN, "trans elr preparing failed", K(ret), K(functor));
|
|
|
|
@ -438,7 +437,7 @@ int ObTxCallbackList::tx_print_callback()
|
|
|
|
|
return callback->print_callback();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
if (OB_FAIL(callback_(functor))) {
|
|
|
|
|
TRANS_LOG(WARN, "trans commit failed", K(ret), K(functor));
|
|
|
|
@ -469,7 +468,7 @@ int ObTxCallbackList::replay_fail(const SCN scn)
|
|
|
|
|
true, /*need_remove_data*/
|
|
|
|
|
true /*is_reverse*/);
|
|
|
|
|
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
|
|
|
|
|
if (OB_FAIL(callback_(functor))) {
|
|
|
|
|
TRANS_LOG(ERROR, "replay fail failed", K(ret), K(functor));
|
|
|
|
@ -483,7 +482,7 @@ int ObTxCallbackList::replay_fail(const SCN scn)
|
|
|
|
|
|
|
|
|
|
void ObTxCallbackList::get_checksum_and_scn(uint64_t &checksum, SCN &checksum_scn)
|
|
|
|
|
{
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
checksum = batch_checksum_.calc();
|
|
|
|
|
checksum_scn = checksum_scn_;
|
|
|
|
|
TRANS_LOG(INFO, "get checksum and checksum_scn", KPC(this), K(checksum), K(checksum_scn));
|
|
|
|
@ -491,7 +490,7 @@ void ObTxCallbackList::get_checksum_and_scn(uint64_t &checksum, SCN &checksum_sc
|
|
|
|
|
|
|
|
|
|
void ObTxCallbackList::update_checksum(const uint64_t checksum, const SCN checksum_scn)
|
|
|
|
|
{
|
|
|
|
|
SpinLockGuard guard(latch_);
|
|
|
|
|
ObByteLockGuard guard(latch_);
|
|
|
|
|
batch_checksum_.set_base(checksum);
|
|
|
|
|
checksum_scn_.atomic_set(checksum_scn);
|
|
|
|
|
TRANS_LOG(INFO, "update checksum and checksum_scn", KPC(this), K(checksum), K(checksum_scn));
|
|
|
|
@ -530,22 +529,5 @@ DEF_TO_STRING(ObTxCallbackList)
|
|
|
|
|
return pos;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ObTxCallbackList::SpinLockGuard::SpinLockGuard(ObLatch& lock)
|
|
|
|
|
: time_guard_(5 * 1000 * 1000),
|
|
|
|
|
lock_(lock)
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
// print log and lbt if get lock use too much time.
|
|
|
|
|
if (OB_FAIL(lock_.wrlock(ObLatchIds::MEMTABLE_CALLBACK_LIST_LOCK))) {
|
|
|
|
|
TRANS_LOG(ERROR, "Lock Failed");
|
|
|
|
|
}
|
|
|
|
|
time_guard_.click();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ObTxCallbackList::SpinLockGuard::~SpinLockGuard()
|
|
|
|
|
{
|
|
|
|
|
lock_.unlock();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} // memtable
|
|
|
|
|
} // oceanbase
|
|
|
|
|