fast check tx-ctx can not submit redo for freeze
This commit is contained in:
@ -743,37 +743,11 @@ bool ObTransCallbackMgr::check_list_has_min_epoch_(const int my_idx, const int64
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransCallbackMgr::get_next_flush_log_guard(ObCallbackListLogGuard &lock_guard, int &list_idx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int list_cnt = get_logging_list_count();
|
||||
int64_t epoch = INT64_MAX;
|
||||
int idx = -1;
|
||||
ObTxCallbackList *l = NULL;
|
||||
for (int i =0; i< list_cnt; i++) {
|
||||
ObTxCallbackList *list = get_callback_list_(i, false);
|
||||
if (list->get_log_epoch() < epoch) {
|
||||
epoch = list->get_log_epoch();
|
||||
idx = i;
|
||||
l = list;
|
||||
}
|
||||
}
|
||||
common::ObByteLock *lock = NULL;
|
||||
if (idx == -1) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
} else if (OB_ISNULL(lock = l->try_lock_log())) {
|
||||
ret = OB_NEED_RETRY;
|
||||
} else {
|
||||
lock_guard.set(lock);
|
||||
list_idx = idx;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// retval:
|
||||
// - OB_EAGAIN: other list has small log_epoch
|
||||
// - OB_ENTRY_NOT_EXIST: no need log
|
||||
// - OB_NEED_RETRY: lock hold by other thread
|
||||
// - OB_BLOCK_FROZEN: next to logging callback's memtable was logging blocked
|
||||
int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq,
|
||||
ObCallbackListLogGuard &lock_guard,
|
||||
int &list_idx)
|
||||
@ -792,6 +766,8 @@ int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq,
|
||||
common::ObByteLock *log_lock = NULL;
|
||||
if (my_epoch == INT64_MAX) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
} else if (OB_UNLIKELY(list->is_logging_blocked())) {
|
||||
ret = OB_BLOCK_FROZEN;
|
||||
} else if (OB_ISNULL(log_lock = list->try_lock_log())) {
|
||||
ret = OB_NEED_RETRY;
|
||||
} else if (!check_list_has_min_epoch_(list_idx, my_epoch, min_epoch, min_epoch_idx)) {
|
||||
@ -1555,11 +1531,6 @@ int ObMvccRowCallback::del()
|
||||
ObIMemtable *last_mt = NULL;
|
||||
log_submitted(share::SCN(), last_mt);
|
||||
}
|
||||
// set block_frozen_memtable if the first callback is linked to a logging_blocked memtable
|
||||
// to prevent the case where the first callback is removed but the block_frozen_memtable pointer is still existed
|
||||
// clear block_frozen_memtable once a callback is deleted
|
||||
transaction::ObPartTransCtx *part_ctx = static_cast<transaction::ObPartTransCtx *>(get_trans_ctx());
|
||||
part_ctx->clear_block_frozen_memtable();
|
||||
|
||||
ret = remove();
|
||||
return ret;
|
||||
@ -2212,6 +2183,25 @@ void ObTransCallbackMgr::set_skip_checksum_calc()
|
||||
ATOMIC_STORE(&skip_checksum_, true);
|
||||
}
|
||||
|
||||
bool ObTransCallbackMgr::is_logging_blocked(bool &has_pending_log) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool all_blocked = false;
|
||||
RDLockGuard guard(rwlock_);
|
||||
CALLBACK_LISTS_FOREACH_CONST(idx, list) {
|
||||
if (list->has_pending_log()) {
|
||||
has_pending_log = true;
|
||||
if (list->is_logging_blocked()) {
|
||||
all_blocked = true;
|
||||
} else {
|
||||
all_blocked = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return all_blocked;
|
||||
}
|
||||
|
||||
}; // end namespace mvcc
|
||||
}; // end namespace oceanbase
|
||||
|
||||
|
||||
@ -247,6 +247,7 @@ private:
|
||||
void wakeup_waiting_txns_();
|
||||
int extend_callback_lists_(const int16_t cnt);
|
||||
public:
|
||||
bool is_logging_blocked(bool &has_pending_log) const;
|
||||
int fill_log(ObTxFillRedoCtx &ctx, ObITxFillRedoFunctor &func);
|
||||
int log_submitted(const ObCallbackScopeArray &callbacks, share::SCN scn, int &submitted);
|
||||
int log_sync_succ(const ObCallbackScopeArray &callbacks, const share::SCN scn, int64_t &sync_cnt);
|
||||
@ -273,7 +274,6 @@ public:
|
||||
int get_log_guard(const transaction::ObTxSEQ &write_seq,
|
||||
ObCallbackListLogGuard &log_guard,
|
||||
int &cb_list_idx);
|
||||
int get_next_flush_log_guard(ObCallbackListLogGuard &lock_guard, int &list_idx);
|
||||
void set_parallel_logging(const share::SCN serial_final_scn);
|
||||
void set_skip_checksum_calc();
|
||||
bool skip_checksum_calc() const { return ATOMIC_LOAD(&skip_checksum_); }
|
||||
|
||||
@ -824,6 +824,18 @@ inline bool ObTxCallbackList::is_append_only_() const
|
||||
return callback_mgr_.is_callback_list_append_only(id_);
|
||||
}
|
||||
|
||||
bool ObTxCallbackList::is_logging_blocked() const
|
||||
{
|
||||
bool blocked = false;
|
||||
if (log_latch_.try_lock()) {
|
||||
if (log_cursor_ != &head_ && log_cursor_->is_logging_blocked()) {
|
||||
blocked = true;
|
||||
}
|
||||
log_latch_.unlock();
|
||||
}
|
||||
return blocked;
|
||||
}
|
||||
|
||||
ObTxCallbackList::LockGuard::LockGuard(ObTxCallbackList &list,
|
||||
const ObTxCallbackList::LOCK_MODE mode,
|
||||
ObTimeGuard *tg)
|
||||
|
||||
@ -120,6 +120,9 @@ public:
|
||||
|
||||
// traversal to find and break
|
||||
bool find(ObITxCallbackFinder &func);
|
||||
|
||||
// is logging blocked: test current list can fill log
|
||||
bool is_logging_blocked() const;
|
||||
private:
|
||||
union LockState {
|
||||
LockState() : v_(0) {}
|
||||
@ -191,6 +194,9 @@ public:
|
||||
{
|
||||
return ATOMIC_LOAD(&data_size_) - ATOMIC_LOAD(&logged_data_size_) > limit;
|
||||
}
|
||||
bool has_pending_log() const {
|
||||
return ATOMIC_LOAD(&data_size_) - ATOMIC_LOAD(&logged_data_size_) > 0;
|
||||
}
|
||||
DECLARE_TO_STRING;
|
||||
private:
|
||||
const int16_t id_;
|
||||
@ -236,7 +242,7 @@ private:
|
||||
// used to serialize append callback to list tail
|
||||
common::ObByteLock append_latch_;
|
||||
// used to serialize fill and flush log of this list
|
||||
common::ObByteLock log_latch_;
|
||||
mutable common::ObByteLock log_latch_;
|
||||
// used to serialize operates on synced callbacks
|
||||
common::ObByteLock iter_synced_latch_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTxCallbackList);
|
||||
|
||||
@ -1297,5 +1297,6 @@ void ObMemtableCtx::check_all_redo_flushed()
|
||||
{
|
||||
trans_mgr_.check_all_redo_flushed();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -386,6 +386,9 @@ public:
|
||||
//method called when leader revoke
|
||||
virtual int commit_to_replay();
|
||||
virtual int fill_redo_log(ObTxFillRedoCtx &ctx);
|
||||
bool is_logging_blocked(bool &has_pending_log) const {
|
||||
return trans_mgr_.is_logging_blocked(has_pending_log);
|
||||
}
|
||||
void check_all_redo_flushed();
|
||||
int get_log_guard(const transaction::ObTxSEQ &write_seq,
|
||||
ObCallbackListLogGuard &log_guard,
|
||||
|
||||
@ -105,25 +105,6 @@ public:
|
||||
} else if (iter->is_logging_blocked()) {
|
||||
ret = OB_BLOCK_FROZEN;
|
||||
ctx_.last_log_blocked_memtable_ = static_cast<memtable::ObMemtable *>(iter->get_memtable());
|
||||
// TODO:(yunxing.cyx) find a way let redo submitter can quickly check BLOCK_FROZEN
|
||||
// comment out the following piece of code, it not works under multiple CallbackList:
|
||||
/************************************************************************************
|
||||
if (data_node_count == 0) {
|
||||
// To prevent unnecessary submit_log actions for freeze
|
||||
// Becasue the first callback is linked to a logging_blocked memtable
|
||||
transaction::ObPartTransCtx *part_ctx = static_cast<transaction::ObPartTransCtx *>(mem_ctx_->get_trans_ctx());
|
||||
part_ctx->set_block_frozen_memtable(static_cast<memtable::ObMemtable *>(iter->get_memtable()));
|
||||
int64_t current_time = ObTimeUtility::current_time();
|
||||
if (last_logging_blocked_time_ == 0) {
|
||||
last_logging_blocked_time_ = current_time;
|
||||
} else if (current_time - last_logging_blocked_time_ > 5 * 1_min) {
|
||||
TRANS_LOG(WARN, "logging block cost too much time", KPC(part_ctx), KPC(iter));
|
||||
if (REACH_TENANT_TIME_INTERVAL(1_min)) {
|
||||
bug_detect_for_logging_blocked_();
|
||||
}
|
||||
}
|
||||
}
|
||||
***********************************************************************************/
|
||||
} else {
|
||||
bool fake_fill = false;
|
||||
if (MutatorType::MUTATOR_ROW == iter->get_mutator_type()) {
|
||||
|
||||
@ -1781,7 +1781,7 @@ int ObPartTransCtx::submit_redo_log_for_freeze()
|
||||
TRANS_LOG(TRACE, "", K_(trans_id), K_(ls_id));
|
||||
ObTimeGuard tg("submit_redo_for_freeze_log", 100000);
|
||||
bool submitted = false;
|
||||
bool need_submit = !is_logging_blocked();
|
||||
bool need_submit = fast_check_need_submit_redo_for_freeze_();
|
||||
if (need_submit) {
|
||||
CtxLockGuard guard(lock_);
|
||||
tg.click();
|
||||
@ -1791,10 +1791,6 @@ int ObPartTransCtx::submit_redo_log_for_freeze()
|
||||
REC_TRANS_TRACE_EXT2(tlog_, submit_log_for_freeze, OB_Y(ret),
|
||||
OB_ID(used), tg.get_diff(), OB_ID(ref), get_ref());
|
||||
}
|
||||
// TODO: mark frozen memtable for fast check need submit redo
|
||||
// if (OB_BLOCK_FROZEN != ret) {
|
||||
// clear_block_frozen_memtable();
|
||||
// }
|
||||
if (OB_TRANS_HAS_DECIDED == ret || OB_BLOCK_FROZEN == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
@ -1843,10 +1839,6 @@ int ObPartTransCtx::submit_redo_after_write(const bool force, const ObTxSEQ &wri
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: mark frozen memtable for fast check need submit redo
|
||||
// if (OB_BLOCK_FROZEN != ret) {
|
||||
// clear_block_frozen_memtable();
|
||||
// }
|
||||
if (OB_TRANS_HAS_DECIDED == ret || OB_BLOCK_FROZEN == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
@ -1943,27 +1935,11 @@ int ObPartTransCtx::submit_redo_log_for_freeze_(bool &submitted)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::set_block_frozen_memtable(memtable::ObMemtable *memtable)
|
||||
bool ObPartTransCtx::fast_check_need_submit_redo_for_freeze_() const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(memtable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "memtable cannot be null", K(ret), KPC(this));
|
||||
} else {
|
||||
ATOMIC_STORE(&block_frozen_memtable_, memtable);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObPartTransCtx::clear_block_frozen_memtable()
|
||||
{
|
||||
ATOMIC_STORE(&block_frozen_memtable_, nullptr);
|
||||
}
|
||||
|
||||
bool ObPartTransCtx::is_logging_blocked()
|
||||
{
|
||||
memtable::ObMemtable *memtable = ATOMIC_LOAD(&block_frozen_memtable_);
|
||||
return OB_NOT_NULL(memtable) && memtable->get_logging_blocked();
|
||||
bool has_pending_log = false;
|
||||
const bool blocked = mt_ctx_.is_logging_blocked(has_pending_log);
|
||||
return has_pending_log && !blocked;
|
||||
}
|
||||
|
||||
void ObPartTransCtx::get_audit_info(int64_t &lock_for_read_elapse) const
|
||||
|
||||
@ -848,13 +848,11 @@ public:
|
||||
*/
|
||||
int end_access();
|
||||
int rollback_to_savepoint(const int64_t op_sn, const ObTxSEQ from_scn, const ObTxSEQ to_scn, ObIArray<ObTxLSEpochPair> &downstream_parts);
|
||||
int set_block_frozen_memtable(memtable::ObMemtable *memtable);
|
||||
void clear_block_frozen_memtable();
|
||||
bool is_logging_blocked();
|
||||
bool is_xa_trans() const { return !exec_info_.xid_.empty(); }
|
||||
bool is_transfer_deleted() const { return transfer_deleted_; }
|
||||
int handle_tx_keepalive_response(const int64_t status);
|
||||
private:
|
||||
bool fast_check_need_submit_redo_for_freeze_() const;
|
||||
int check_status_();
|
||||
int tx_keepalive_response_(const int64_t status);
|
||||
void post_keepalive_msg_(const int status);
|
||||
|
||||
@ -130,6 +130,8 @@ int ObTxRedoSubmitter::parallel_submit(const ObTxSEQ &write_seq_no)
|
||||
if (OB_NEED_RETRY == ret) {
|
||||
// give up, lock conflict
|
||||
ret = OB_SUCCESS;
|
||||
} else if (OB_BLOCK_FROZEN == ret) {
|
||||
// memtable is logging blocked
|
||||
} else if (OB_EAGAIN == ret) {
|
||||
// others need flush firstly
|
||||
// TODO: try flush others out and retry
|
||||
|
||||
Reference in New Issue
Block a user