[DeadLock] remove thread local execution flag to enabel register deadlock in PL

This commit is contained in:
fengdeyiji
2024-01-04 16:43:35 +00:00
committed by ob-robot
parent e5b7cbc593
commit 0608c45482
16 changed files with 100 additions and 120 deletions

View File

@ -44,7 +44,6 @@ public:
static void destroy();
static int64_t getClock();
static int64_t getRealClock();
static int64_t getCurrentTime();
static void msleep(const int64_t ms);
static void usleep(const int64_t us);
static void try_advance_cur_ts(const int64_t cur_ts);
@ -81,25 +80,6 @@ inline int64_t ObClockGenerator::getRealClock()
return clock_generator_.get_us();
}
inline int64_t ObClockGenerator::getCurrentTime()
{
int64_t ret_val = ATOMIC_LOAD(&clock_generator_.last_used_time_);
while (true) {
const int64_t last = ATOMIC_LOAD(&clock_generator_.last_used_time_);
const int64_t now = ObTimeUtility::current_time();
if (now < last) {
ret_val = last;
break;
} else if (ATOMIC_BCAS(&clock_generator_.last_used_time_, last, now)) {
ret_val = now;
break;
} else {
PAUSE();
}
}
return ret_val;
}
inline void ObClockGenerator::msleep(const int64_t ms)
{
if (ms > 0) {

View File

@ -144,7 +144,7 @@ struct ObLightSharedPtr// RAII used
if (!is_valid()) {
ret = OB_INVALID_DATA;
} else {
int64_t start_sync_time = ObClockGenerator::getCurrentTime();
int64_t start_sync_time = ObClockGenerator::getClock();
int64_t loop_times = 0;
int64_t ref_cnt;
while (1 != (ref_cnt = ctrl_ptr_->get_ref())) {

View File

@ -132,8 +132,8 @@ int ElectionAcceptor::start()
LockGuard lock_guard(p_election_->lock_);
// 周期性打印选举的状态
if (ObClockGenerator::getCurrentTime() > last_dump_acceptor_info_ts_ + 3_s) {
last_dump_acceptor_info_ts_ = ObClockGenerator::getCurrentTime();
if (ObClockGenerator::getClock() > last_dump_acceptor_info_ts_ + 3_s) {
last_dump_acceptor_info_ts_ = ObClockGenerator::getClock();
ELECT_LOG(INFO, "dump acceptor info", K(*this));
}
// 当acceptor的Lease有效状态发生变化时需要打印日志以及汇报事件
@ -163,7 +163,7 @@ int ElectionAcceptor::start()
if (last_record_lease_valid_state && !lease_valid_state) {// 这个定时任务可能是被延迟致lease到期时触发的,为了在lease到期的第一时间投票
can_vote = true;
LOG_ELECT_LEADER(INFO, "vote when lease expired");
} else if (ObClockGenerator::getCurrentTime() - last_time_window_open_ts_ >= CALCULATE_TIME_WINDOW_SPAN_TS()) {
} else if (ObClockGenerator::getClock() - last_time_window_open_ts_ >= CALCULATE_TIME_WINDOW_SPAN_TS()) {
can_vote = true;
} else {
LOG_ELECT_LEADER(INFO, "can't vote now", K(last_record_lease_valid_state),
@ -269,7 +269,7 @@ void ElectionAcceptor::on_prepare_request(const ElectionPrepareRequestMsg &prepa
LOG_PHASE(ERROR, phase, "open time window failed");
} else {
is_time_window_opened_ = true;// 定时任务注册成功,打开时间窗口
last_time_window_open_ts_ = ObClockGenerator::getCurrentTime();
last_time_window_open_ts_ = ObClockGenerator::getClock();
LOG_PHASE(INFO, phase, "open time window success", K(timewindow_span));
}
}

View File

@ -223,13 +223,13 @@ int ElectionProposer::register_renew_lease_task_()
int ret = OB_SUCCESS;
LockGuard lock_guard(p_election_->lock_);
// 周期性打印选举的状态
if (ObClockGenerator::getCurrentTime() > last_dump_proposer_info_ts_ + 3_s) {
last_dump_proposer_info_ts_ = ObClockGenerator::getCurrentTime();
if (ObClockGenerator::getClock() > last_dump_proposer_info_ts_ + 3_s) {
last_dump_proposer_info_ts_ = ObClockGenerator::getClock();
ELECT_LOG(INFO, "dump proposer info", K(*this));
}
// 周期性打印选举的消息收发统计信息
if (ObClockGenerator::getCurrentTime() > last_dump_election_msg_count_state_ts_ + 10_s) {
last_dump_election_msg_count_state_ts_ = ObClockGenerator::getCurrentTime();
if (ObClockGenerator::getClock() > last_dump_election_msg_count_state_ts_ + 10_s) {
last_dump_election_msg_count_state_ts_ = ObClockGenerator::getClock();
char ls_id_buffer[32] = {0};
auto pretend_to_be_ls_id = [ls_id_buffer](const int64_t id) mutable {
int64_t pos = 0;
@ -329,7 +329,7 @@ void ElectionProposer::prepare(const ObRole role)
ELECT_TIME_GUARD(500_ms);
#define PRINT_WRAPPER KR(ret), K(role), K(*this)
int ret = OB_SUCCESS;
int64_t cur_ts = ObClockGenerator::getCurrentTime();
int64_t cur_ts = ObClockGenerator::getClock();
LogPhase phase = role == ObRole::LEADER ? LogPhase::RENEW_LEASE : LogPhase::ELECT_LEADER;
if (memberlist_with_states_.get_member_list().get_addr_list().empty()) {
LOG_PHASE(INFO, phase, "memberlist is empty, give up do prepare this time");
@ -427,7 +427,7 @@ void ElectionProposer::on_prepare_request(const ElectionPrepareRequestMsg &prepa
.get_addr_list()))) {
LOG_ELECT_LEADER(ERROR, "broadcast prepare request failed");
} else {
last_do_prepare_ts_ = ObClockGenerator::getCurrentTime();
last_do_prepare_ts_ = ObClockGenerator::getClock();
if (role_ == ObRole::LEADER) {
LOG_ELECT_LEADER(INFO, "join elect leader phase as leader");
} else if (role_ == ObRole::FOLLOWER) {

View File

@ -271,13 +271,13 @@ int64_t MemberListWithStates::to_string(char *buf, const int64_t buf_len) const
}
}
if (is_synced && !v.empty()) {
int64_t map_wall_clock_ts = v[0] - get_monotonic_ts() + ObClockGenerator::getCurrentTime();
int64_t map_wall_clock_ts = v[0] - get_monotonic_ts() + ObClockGenerator::getClock();
common::databuff_printf(buf, buf_len, pos, "accept_ok_promised_ts:%s, ",
v[0] != 0 ? common::ObTime2Str::ob_timestamp_str_range<HOUR, MSECOND>(map_wall_clock_ts) : "invalid");
} else {
common::databuff_printf(buf, buf_len, pos, "accept_ok_promised_ts:[");
for (int64_t idx = 0; idx < v.count(); ++idx) {
int64_t map_wall_clock_ts = v[idx] - get_monotonic_ts() + ObClockGenerator::getCurrentTime();
int64_t map_wall_clock_ts = v[idx] - get_monotonic_ts() + ObClockGenerator::getClock();
if (idx == v.count() - 1) {
common::databuff_printf(buf, buf_len, pos, "%s]", v[idx] != 0 ?
common::ObTime2Str::ob_timestamp_str_range<HOUR, MSECOND>(map_wall_clock_ts) : "invalid");

View File

@ -355,8 +355,8 @@ class ElectionMsgCounter
struct Counter
{
Counter() : send_count_(0), receive_count_(0), last_send_ts_(0), last_received_ts_(0) {}
void add_send_count() { ++send_count_; last_send_ts_ = ObClockGenerator::getCurrentTime(); }
void add_received_count() { ++receive_count_; last_received_ts_ = ObClockGenerator::getCurrentTime(); }
void add_send_count() { ++send_count_; last_send_ts_ = ObClockGenerator::getClock(); }
void add_received_count() { ++receive_count_; last_received_ts_ = ObClockGenerator::getClock(); }
void reset() { new (this)Counter(); }
int64_t send_count_;
int64_t receive_count_;

View File

@ -38,10 +38,6 @@ using namespace oceanbase::rpc::frame;
namespace oceanbase
{
namespace memtable
{
extern TLOCAL(bool, TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR);
}
namespace omt
{
@ -243,7 +239,6 @@ inline void ObThWorker::process_request(rpc::ObRequest &req)
reset_sql_throttle_current_priority();
set_req_flag(&req);
memtable::TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR = false;
MTL(memtable::ObLockWaitMgr*)->setup(req.get_lock_wait_node(), req.get_receive_timestamp());
if (OB_FAIL(procor_.process(req))) {
LOG_WARN("process request fail", K(ret));

View File

@ -582,7 +582,7 @@ int ObAllTenantInfoCache::refresh_tenant_info(const uint64_t tenant_id,
int ret = OB_SUCCESS;
ObAllTenantInfo new_tenant_info;
int64_t ora_rowscn = 0;
const int64_t new_refresh_time_us = ObClockGenerator::getCurrentTime();
const int64_t new_refresh_time_us = ObClockGenerator::getClock();
content_changed = false;
if (OB_ISNULL(sql_proxy) || !is_user_tenant(tenant_id)) {
ret = OB_INVALID_ARGUMENT;

View File

@ -123,7 +123,7 @@ int ObLCLNode::register_timer_task()
CLICK();
revert_self_ref_count_();
} else {
DETECT_LOG_(INFO, "register first timer task successfully", K(*this));
DETECT_LOG_(TRACE, "register first timer task successfully", K(*this));
}
return ret;
@ -262,7 +262,7 @@ int ObLCLNode::block_(const ObDependencyResource &resource)
if (OB_FAIL(add_resource_to_list_(resource, block_list_))) {
DETECT_LOG_(WARN, "block_list_ push resource failed", PRINT_WRAPPER);
} else {
DETECT_LOG_(INFO, "block resource success", PRINT_WRAPPER);
DETECT_LOG_(TRACE, "block resource success", PRINT_WRAPPER);
}
}
@ -288,7 +288,7 @@ int ObLCLNode::block(const BlockCallBack &func)
K(func), K(*this));
block_callback_list_.pop_back();
} else {
DETECT_LOG_(INFO, "block callback success", K(*this));
DETECT_LOG_(TRACE, "block callback success", K(*this));
}
return ret;
@ -911,7 +911,7 @@ void ObLCLNode::PushStateTask::runTimerTask()
}
if (false == ATOMIC_LOAD(&lcl_node_.is_timer_task_canceled_)) {
if (expected_executed_ts > current_ts) {
DETECT_LOG(ERROR, "schedule error", K(current_ts), K(expected_executed_ts));
DETECT_LOG(WARN, "schedule error", K(current_ts), K(expected_executed_ts));
} else if (current_ts - expected_executed_ts > 100 * 1000) {// 100ms
if (REACH_TIME_INTERVAL(100 * 1000)) {// 100ms
DETECT_LOG(WARN, "task scheduled out of range", K(current_ts), K(expected_executed_ts));

View File

@ -577,7 +577,7 @@ public:
ATOMIC_STORE(&is_running_, false);
int64_t last_print_time = 0;
while (ATOMIC_LOAD(&total_running_task_count_) != 0) {
int64_t current_time = ObClockGenerator::getCurrentTime();
int64_t current_time = ObClockGenerator::getClock();
if (current_time - last_print_time > 500_ms) {// print log every 500ms
last_print_time = current_time;
OCCAM_LOG(INFO, "OccamTimr waiting running task finished",

View File

@ -58,9 +58,6 @@ using namespace memtable::tablelock;
namespace memtable
{
// this flag is used to indicate wether if need register to deadlock when end_stmt
// CAUTIONS: DO NOT use get_thread_node().neet_wait(), it MAY CORE!
TLOCAL(bool, TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR) = false;
static const uint64_t TRANS_FLAG = 1L << 63L; // 10
static const uint64_t TABLE_LOCK_FLAG = 1L << 62L; // 01
@ -166,7 +163,7 @@ void RowHolderMapper::reset_hash_holder(const ObTabletID &tablet_id,
};
if (OB_FAIL(map_.erase_if(ObIntWarp(hash), remove_if_op))) {
if (ret != OB_ENTRY_NOT_EXIST) {
TRANS_LOG(WARN, "clear hash holder error", KR(ret), K(hash), K(tx_id));
TRANS_LOG(TRACE, "clear hash holder error", KR(ret), K(hash), K(tx_id));
}
}
}
@ -199,7 +196,7 @@ void ObLockWaitMgr::run1()
(void)repost(cur);
}
// dump debug info, and check deadlock enabdle, clear mapper if deadlock is disabled
now = ObClockGenerator::getCurrentTime();
now = ObClockGenerator::getClock();
if (now - last_dump_ts > 5_s) {
last_dump_ts = now;
row_holder_mapper_.dump_mapper_info();
@ -289,22 +286,22 @@ int ObLockWaitMgr::register_to_deadlock_detector_(const ObTransID &self_tx_id,
self_sess_id)));
if (LockHashHelper::is_rowkey_hash(node->hash())) {// waiting for row
DeadLockBlockCallBack deadlock_block_call_back(row_holder_mapper_, node->hash());
if (OB_FAIL(ObTransDeadlockDetectorAdapter::register_local_execution_to_deadlock_detector_waiting_for_row(on_collect_callback,
deadlock_block_call_back,
self_tx_id,
self_sess_id))) {
if (OB_FAIL(ObTransDeadlockDetectorAdapter::lock_wait_mgr_reconstruct_detector_waiting_for_row(on_collect_callback,
deadlock_block_call_back,
self_tx_id,
self_sess_id))) {
TRANS_LOG(WARN, "fail to regester to deadlock detector", K(ret), K(self_sess_id));
} else {
TRANS_LOG(INFO, "wait for row", K(node->hash()), K(self_tx_id), K(blocked_tx_id), K(self_sess_id));
TRANS_LOG(TRACE, "wait for row", K(node->hash()), K(self_tx_id), K(blocked_tx_id), K(self_sess_id));
}
} else {// waiting for other trans
if (OB_FAIL(ObTransDeadlockDetectorAdapter::register_local_execution_to_deadlock_detector_waiting_for_trans(on_collect_callback,
blocked_tx_id,
self_tx_id,
self_sess_id))) {
if (OB_FAIL(ObTransDeadlockDetectorAdapter::lock_wait_mgr_reconstruct_detector_waiting_for_trans(on_collect_callback,
blocked_tx_id,
self_tx_id,
self_sess_id))) {
TRANS_LOG(WARN, "fail to regester to deadlock detector", K(ret), K(self_sess_id));
} else {
TRANS_LOG(INFO, "wait for trans", K(node->hash()), K(self_tx_id), K(blocked_tx_id), K(self_sess_id));
TRANS_LOG(TRACE, "wait for trans", K(node->hash()), K(self_tx_id), K(blocked_tx_id), K(self_sess_id));
}
}
}
@ -577,7 +574,6 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
ObFunction<int(bool&, bool&)> &rechecker)
{
int ret = OB_SUCCESS;
TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR = false;
Node *node = NULL;
if (OB_NOT_NULL(node = get_thread_node())) {
Key key(&row_key);
@ -609,7 +605,6 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
tx_id,
holder_tx_id);
node->set_need_wait();
TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR = true;// to tell end_stmt() not register deadlock
}
}
}
@ -630,7 +625,6 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
ObFunction<int(bool&)> &check_need_wait)
{
int ret = OB_SUCCESS;
TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR = false;
Node *node = NULL;
if (OB_UNLIKELY(!is_inited_)) {
@ -667,7 +661,6 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
holder_tx_id);
node->set_need_wait();
node->set_lock_mode(lock_mode);
TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR = true;// to tell end_stmt() not register deadlock
}
}
return ret;

View File

@ -68,7 +68,7 @@ int MdsTableBase::init(const ObTabletID tablet_id,
mgr_handle_.set_mds_table_mgr(p_mgr);
debug_info_.do_init_tablet_pointer_ = pointer;
debug_info_.init_trace_id_ = *ObCurTraceId::get_trace_id();
debug_info_.init_ts_ = ObClockGenerator::getCurrentTime();
debug_info_.init_ts_ = ObClockGenerator::getClock();
if (MDS_FAIL(register_to_mds_table_mgr())) {
MDS_LOG(WARN, "fail to register mds table", KR(ret), K(*this), K(ls_id), K(tablet_id));
}
@ -102,7 +102,7 @@ void MdsTableBase::mark_removed_from_t3m(ObTabletPointer *pointer)
} else {
debug_info_.do_remove_tablet_pointer_ = pointer;
debug_info_.remove_trace_id_ = *ObCurTraceId::get_trace_id();
ATOMIC_STORE(&debug_info_.remove_ts_, ObClockGenerator::getCurrentTime());
ATOMIC_STORE(&debug_info_.remove_ts_, ObClockGenerator::getClock());
}
}
@ -111,7 +111,7 @@ void MdsTableBase::mark_switched_to_empty_shell()
if (ATOMIC_LOAD(&debug_info_.switch_to_empty_shell_ts_) != 0) {
MDS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "this MdsTable has been marked switch to empty shell", K(*this));
} else {
ATOMIC_STORE(&debug_info_.switch_to_empty_shell_ts_, ObClockGenerator::getCurrentTime());
ATOMIC_STORE(&debug_info_.switch_to_empty_shell_ts_, ObClockGenerator::getClock());
}
}

View File

@ -1461,7 +1461,7 @@ int MdsTableImpl<MdsTableType>::forcely_reset_mds_table(const char *reason)
if (OB_FAIL(for_each_scan_row(FowEachRowAction::RESET, op))) {
MDS_LOG_GC(ERROR, "fail to do reset");
} else {
debug_info_.last_reset_ts_ = ObClockGenerator::getCurrentTime();
debug_info_.last_reset_ts_ = ObClockGenerator::getClock();
flushing_scn_.reset();
last_inner_recycled_scn_ = share::SCN::min_scn();
rec_scn_ = share::SCN::max_scn();

View File

@ -193,7 +193,7 @@ int ObStandbyTimestampService::switch_to_leader()
if (OB_FAIL(MTL(logservice::ObLogService *)->get_palf_role(share::GTS_LS, role, tmp_epoch))) {
TRANS_LOG(WARN, "get ObStandbyTimestampService role fail", KR(ret));
} else {
ATOMIC_STORE(&switch_to_leader_ts_, ObClockGenerator::getCurrentTime());
ATOMIC_STORE(&switch_to_leader_ts_, ObClockGenerator::getClock());
epoch_ = tmp_epoch;
int64_t type = MTL(ObTimestampAccess *)->get_service_type();
if (ObTimestampAccess::ServiceType::FOLLOWER == type) {

View File

@ -30,10 +30,6 @@
namespace oceanbase
{
namespace memtable
{
extern TLOCAL(bool, TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR);
}
using namespace sql;
using namespace common;
using namespace share::detector;
@ -325,10 +321,10 @@ int ObTransDeadlockDetectorAdapter::gen_dependency_resource_array_(const ObIArra
return ret;
}
int ObTransDeadlockDetectorAdapter::register_remote_execution_to_deadlock_detector_(const ObTransID self_tx_id,
const uint32_t self_session_id,
const ObIArray<ObTransIDAndAddr> &conflict_tx_ids,
SessionGuard &session_guard)
int ObTransDeadlockDetectorAdapter::register_to_deadlock_detector_(const ObTransID self_tx_id,
const uint32_t self_session_id,
const ObIArray<ObTransIDAndAddr> &conflict_tx_ids,
SessionGuard &session_guard)
{
#define PRINT_WRAPPER KR(ret), K(self_tx_id), K(self_session_id), K(conflict_tx_ids), K(query_timeout), K(self_tx_scheduler)
int ret = OB_SUCCESS;
@ -375,9 +371,9 @@ int ObTransDeadlockDetectorAdapter::register_remote_execution_to_deadlock_detect
#undef PRINT_WRAPPER
}
int ObTransDeadlockDetectorAdapter::remote_execution_replace_conflict_trans_ids_(const ObTransID self_tx_id,
const ObIArray<ObTransIDAndAddr> &conflict_tx_ids,
SessionGuard &session_guard)
int ObTransDeadlockDetectorAdapter::replace_conflict_trans_ids_(const ObTransID self_tx_id,
const ObIArray<ObTransIDAndAddr> &conflict_tx_ids,
SessionGuard &session_guard)
{
#define PRINT_WRAPPER KR(ret), K(self_tx_id), K(conflict_tx_ids), K(current_blocked_resources)
int ret = OB_SUCCESS;
@ -418,7 +414,7 @@ int ObTransDeadlockDetectorAdapter::remote_execution_replace_conflict_trans_ids_
#undef PRINT_WRAPPER
}
int ObTransDeadlockDetectorAdapter::register_remote_execution_or_replace_conflict_trans_ids(const ObTransID self_tx_id,
int ObTransDeadlockDetectorAdapter::register_or_replace_conflict_trans_ids(const ObTransID self_tx_id,
const uint32_t self_session_id,
const ObArray<ObTransIDAndAddr> &conflict_tx_ids)
{
@ -444,13 +440,13 @@ int ObTransDeadlockDetectorAdapter::register_remote_execution_or_replace_conflic
} else if (OB_FAIL(MTL(ObDeadLockDetectorMgr*)->check_detector_exist(self_tx_id, is_detector_exist))) {
DETECT_LOG(WARN, "fail to get detector exist status", PRINT_WRAPPER);
} else if (!is_detector_exist) {
if (OB_FAIL(register_remote_execution_to_deadlock_detector_(self_tx_id, self_session_id, conflict_tx_ids, session_guard))) {
if (OB_FAIL(register_to_deadlock_detector_(self_tx_id, self_session_id, conflict_tx_ids, session_guard))) {
DETECT_LOG(WARN, "register new detector in remote execution failed", PRINT_WRAPPER);
} else {
DETECT_LOG(INFO, "register new detector in remote execution", PRINT_WRAPPER);
}
} else {
if (OB_FAIL(remote_execution_replace_conflict_trans_ids_(self_tx_id, conflict_tx_ids, session_guard))) {
if (OB_FAIL(replace_conflict_trans_ids_(self_tx_id, conflict_tx_ids, session_guard))) {
DETECT_LOG(INFO, "replace block list in remote execution", PRINT_WRAPPER);
}
}
@ -628,8 +624,6 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob
DETECT_LOG(ERROR, "session is NULL", PRINT_WRAPPER);
} else if (++step && session->is_inner()) {
DETECT_LOG(TRACE, "inner session no need register to deadlock", PRINT_WRAPPER);
} else if (++step && memtable::TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR) {
DETECT_LOG(TRACE, "will register deadlock in LockWaitMgr::post_process after end_stmt()", PRINT_WRAPPER);
} else if (++step && OB_ISNULL(desc = session->get_tx_desc())) {
ret = OB_BAD_NULL_ERROR;
DETECT_LOG(ERROR, "desc in session is NULL", PRINT_WRAPPER);
@ -638,7 +632,7 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob
} else if (++step && !desc->is_valid()) {
DETECT_LOG(INFO, "invalid tx desc no need register to deadlock", PRINT_WRAPPER);
} else if (++step && is_rollback) {// statment is failed, maybe will try again, check if need register to deadlock detector
if (++step && session->get_query_timeout_ts() < ObClockGenerator::getCurrentTime()) {
if (++step && session->get_query_timeout_ts() < ObClockGenerator::getClock()) {
unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_TIMEOUT);
DETECT_LOG(INFO, "query timeout, no need register to deadlock", PRINT_WRAPPER);
} else if (++step && conflict_txs.empty()) {
@ -647,9 +641,7 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob
} else if (++step && exec_ctx.get_errcode() != OB_TRY_LOCK_ROW_CONFLICT) {
unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_OTHER_ERR);
DETECT_LOG(INFO, "try unregister deadlock detecotr cause meet non-lock error", PRINT_WRAPPER);
} else if (++step && OB_FAIL(register_remote_execution_or_replace_conflict_trans_ids(desc->tid(),
session->get_sessid(),
conflict_txs))) {
} else if (++step && OB_FAIL(register_or_replace_conflict_trans_ids(desc->tid(), session->get_sessid(), conflict_txs))) {
DETECT_LOG(WARN, "register or replace list failed", PRINT_WRAPPER);
} else {
// do nothing, register success or keep retrying
@ -664,7 +656,7 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob
int exec_ctx_err_code = exec_ctx.get_errcode();
if (OB_SUCC(ret)) {
if (OB_SUCCESS != exec_ctx_err_code) {
if ((OB_ITER_END != exec_ctx_err_code) && (2 != step)) {
if ((OB_ITER_END != exec_ctx_err_code)) {
if (session->get_retry_info().get_retry_cnt() <= 1 ||// first time lock conflict or other error
session->get_retry_info().get_retry_cnt() % 10 == 0) {// other wise, control log print frequency
DETECT_LOG(INFO, "maintain deadlock info", PRINT_WRAPPER);
@ -683,19 +675,29 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob
// @param [in] self_trans_id who am i.
// @param [in] sess_id which session to kill if this node is killed.
// @return the error code.
int ObTransDeadlockDetectorAdapter::register_local_execution_to_deadlock_detector_waiting_for_row(CollectCallBack &on_collect_op,
const BlockCallBack &func,
const ObTransID &self_trans_id,
const uint32_t sess_id)
int ObTransDeadlockDetectorAdapter::lock_wait_mgr_reconstruct_detector_waiting_for_row(CollectCallBack &on_collect_op,
const BlockCallBack &func,
const ObTransID &self_trans_id,
const uint32_t sess_id)
{
#define PRINT_WRAPPER KR(ret), K(self_trans_id), K(sess_id)
#define PRINT_WRAPPER KR(ret), K(self_trans_id), K(sess_id), K(exist)
CHECK_DEADLOCK_ENABLED();
int ret = OB_SUCCESS;
bool exist = false;
if (sess_id == 0) {
DETECT_LOG(ERROR, "invalid session id", PRINT_WRAPPER);
} else if (nullptr == (MTL(ObDeadLockDetectorMgr*))) {
ret = OB_ERR_UNEXPECTED;
DETECT_LOG(WARN, "fail to get ObDeadLockDetectorMgr", PRINT_WRAPPER);
} else if (OB_FAIL(MTL(ObDeadLockDetectorMgr*)->check_detector_exist(self_trans_id, exist))) {
DETECT_LOG(WARN, "fail to check detector exist", PRINT_WRAPPER);
} else if (exist) {
if (MTL(ObDeadLockDetectorMgr*)->unregister_key(self_trans_id)) {
DETECT_LOG(WARN, "fail to unregister key", PRINT_WRAPPER);
}
}
if (OB_FAIL(ret)) {
DETECT_LOG(WARN, "local execution register to deadlock detector waiting for row failed", PRINT_WRAPPER);
} else if (OB_FAIL(create_detector_node_and_set_parent_if_needed_(on_collect_op, self_trans_id, sess_id))) {
DETECT_LOG(WARN, "fail to create detector node", PRINT_WRAPPER);
} else if (OB_FAIL(MTL(ObDeadLockDetectorMgr*)->block(self_trans_id, func))) {
@ -714,18 +716,28 @@ int ObTransDeadlockDetectorAdapter::register_local_execution_to_deadlock_detecto
// @param [in] self_trans_id who am i.
// @param [in] sess_id which session to kill if this node is killed.
// @return the error code.
int ObTransDeadlockDetectorAdapter::register_local_execution_to_deadlock_detector_waiting_for_trans(CollectCallBack &on_collect_op,
const ObTransID &conflict_trans_id,
const ObTransID &self_trans_id,
const uint32_t sess_id)
int ObTransDeadlockDetectorAdapter::lock_wait_mgr_reconstruct_detector_waiting_for_trans(CollectCallBack &on_collect_op,
const ObTransID &conflict_trans_id,
const ObTransID &self_trans_id,
const uint32_t sess_id)
{
#define PRINT_WRAPPER KR(ret), K(scheduler_addr), K(self_trans_id), K(sess_id)
#define PRINT_WRAPPER KR(ret), K(scheduler_addr), K(self_trans_id), K(sess_id), K(exist)
CHECK_DEADLOCK_ENABLED();
int ret = OB_SUCCESS;
ObAddr scheduler_addr;
bool exist = false;
if (nullptr == (MTL(ObDeadLockDetectorMgr*))) {
ret = OB_ERR_UNEXPECTED;
DETECT_LOG(WARN, "fail to get ObDeadLockDetectorMgr", PRINT_WRAPPER);
} else if (OB_FAIL(MTL(ObDeadLockDetectorMgr*)->check_detector_exist(self_trans_id, exist))) {
DETECT_LOG(WARN, "fail to check detector exist", PRINT_WRAPPER);
} else if (exist) {
if (MTL(ObDeadLockDetectorMgr*)->unregister_key(self_trans_id)) {
DETECT_LOG(WARN, "fail to unregister key", PRINT_WRAPPER);
}
}
if (OB_FAIL(ret)) {
DETECT_LOG(WARN, "local execution register to deadlock detector waiting for row failed", PRINT_WRAPPER);
} else if (OB_FAIL(get_conflict_trans_scheduler(conflict_trans_id, scheduler_addr))) {
DETECT_LOG(WARN, "fail to get conflict trans scheduler addr", PRINT_WRAPPER);
} else if (OB_FAIL(create_detector_node_and_set_parent_if_needed_(on_collect_op, self_trans_id, sess_id))) {
@ -733,7 +745,7 @@ int ObTransDeadlockDetectorAdapter::register_local_execution_to_deadlock_detecto
} else if (OB_FAIL(MTL(ObDeadLockDetectorMgr*)->block(self_trans_id, scheduler_addr, conflict_trans_id))) {
DETECT_LOG(WARN, "fail to block on conflict trans", PRINT_WRAPPER);
} else {
DETECT_LOG(INFO, "local execution register to deadlock detector waiting for trans success", PRINT_WRAPPER);
DETECT_LOG(TRACE, "local execution register to deadlock detector waiting for trans success", PRINT_WRAPPER);
}
return ret;
#undef PRINT_WRAPPER

View File

@ -111,14 +111,14 @@ class ObTransDeadlockDetectorAdapter
}
/**********MAIN INTERFACE**********/
// for local execution, call from lock wait mgr
static int register_local_execution_to_deadlock_detector_waiting_for_row(CollectCallBack &on_collect_op,
const BlockCallBack &call_back,
const ObTransID &self_trans_id,
const uint32_t sess_id);
static int register_local_execution_to_deadlock_detector_waiting_for_trans(CollectCallBack &on_collect_op,
const ObTransID &conflict_trans_id,
const ObTransID &self_trans_id,
const uint32_t sess_id);
static int lock_wait_mgr_reconstruct_detector_waiting_for_row(CollectCallBack &on_collect_op,
const BlockCallBack &call_back,
const ObTransID &self_trans_id,
const uint32_t sess_id);
static int lock_wait_mgr_reconstruct_detector_waiting_for_trans(CollectCallBack &on_collect_op,
const ObTransID &conflict_trans_id,
const ObTransID &self_trans_id,
const uint32_t sess_id);
// for remote execution, call from sql trans control
static int maintain_deadlock_info_when_end_stmt(sql::ObExecContext &exec_ctx, const bool is_rollback);
// for autonomous trans
@ -143,22 +143,22 @@ class ObTransDeadlockDetectorAdapter
static int get_trans_scheduler_info_on_participant(const ObTransID trans_id,
const share::ObLSID ls_id,
ObAddr &scheduler_addr);
static int register_remote_execution_or_replace_conflict_trans_ids(const ObTransID self_tx_id,
const uint32_t self_session_id,
const ObArray<ObTransIDAndAddr> &conflict_tx_ids);
static int register_or_replace_conflict_trans_ids(const ObTransID self_tx_id,
const uint32_t self_session_id,
const ObArray<ObTransIDAndAddr> &conflict_tx_ids);
static int kill_stmt(const uint32_t sess_id);
static void copy_str_and_translate_apostrophe(const char *src_ptr,
const int64_t src_len,
char *dest_ptr,// C-style str, contain '\0'
const int64_t dest_len);
private:
static int register_remote_execution_to_deadlock_detector_(const ObTransID self_tx_id,
const uint32_t self_session_id,
const ObIArray<ObTransIDAndAddr> &conflict_tx_ids,
SessionGuard &session_guard);
static int remote_execution_replace_conflict_trans_ids_(const ObTransID self_tx_id,
const ObIArray<ObTransIDAndAddr> &conflict_tx_ids,
SessionGuard &session_guard);
static int register_to_deadlock_detector_(const ObTransID self_tx_id,
const uint32_t self_session_id,
const ObIArray<ObTransIDAndAddr> &conflict_tx_ids,
SessionGuard &session_guard);
static int replace_conflict_trans_ids_(const ObTransID self_tx_id,
const ObIArray<ObTransIDAndAddr> &conflict_tx_ids,
SessionGuard &session_guard);
static int create_detector_node_and_set_parent_if_needed_(CollectCallBack &on_collect_op,
const ObTransID &self_trans_id,
const uint32_t sess_id);