diff --git a/src/share/deadlock/ob_deadlock_detector_common_define.cpp b/src/share/deadlock/ob_deadlock_detector_common_define.cpp index 040faf3cd..1595b60bc 100644 --- a/src/share/deadlock/ob_deadlock_detector_common_define.cpp +++ b/src/share/deadlock/ob_deadlock_detector_common_define.cpp @@ -211,8 +211,9 @@ ObDependencyResource& ObDependencyResource::operator=(const ObDependencyResource uint64_t ObDependencyResource::hash() const { uint64_t hash_val = 0; - hash_val = murmurhash(&addr_, sizeof(addr_), hash_val); - hash_val = murmurhash(&user_key_, sizeof(user_key_), hash_val); + hash_val = addr_.hash(); + uint64_t key_hash = user_key_.hash(); + hash_val = murmurhash(&key_hash, sizeof(key_hash), hash_val); return hash_val; } @@ -225,6 +226,8 @@ bool ObDependencyResource::operator<(const ObDependencyResource &rhs) const { if (addr_ < rhs.addr_) { return true; + } else if (addr_ > rhs.addr_) { + return false; } else { if (user_key_ < rhs.user_key_) { return true; diff --git a/src/share/deadlock/ob_deadlock_detector_common_define.h b/src/share/deadlock/ob_deadlock_detector_common_define.h index 52ac15bd0..f8e78f8a0 100644 --- a/src/share/deadlock/ob_deadlock_detector_common_define.h +++ b/src/share/deadlock/ob_deadlock_detector_common_define.h @@ -32,6 +32,11 @@ namespace share { namespace detector { +// if msg in map count below LCL_MSG_CACHE_LIMIT/2, all pending msg is accepted +// if msg in map count greater than LCL_MSG_CACHE_LIMIT/2, but less than LCL_MSG_CACHE_LIMIT, +// random drop appending msg, drop probability depends on how many msg keeping in map, +// if msg count in map reach LCL_MSG_CACHE_LIMIT, drop probability is 100%, no more msg is accepted. +constexpr int64_t LCL_MSG_CACHE_LIMIT = 4096; class ObLCLMessage; class ObDependencyResource; diff --git a/src/share/deadlock/ob_deadlock_detector_mgr.h b/src/share/deadlock/ob_deadlock_detector_mgr.h index 149b27042..97e956f5b 100644 --- a/src/share/deadlock/ob_deadlock_detector_mgr.h +++ b/src/share/deadlock/ob_deadlock_detector_mgr.h @@ -105,6 +105,8 @@ public: const KeyType2 &parent_key); template int set_timeout(const KeyType &key, const int64_t timeout); + template + int check_detector_exist(const KeyType &key, bool &exist); // ungister resource operation template int unregister_key(const KeyType &key); @@ -281,6 +283,28 @@ int ObDeadLockDetectorMgr::register_key(const KeyType &key, return ret; #undef PRINT_WRAPPER } +template +int ObDeadLockDetectorMgr::check_detector_exist(const KeyType &key, bool &exist) +{ + CHECK_INIT(); + CHECK_ARGS(key); + #define PRINT_WRAPPER KR(ret), K(key) + int ret = common::OB_SUCCESS; + UserBinaryKey user_key; + DetectorRefGuard ref_guard; + if (OB_FAIL(user_key.set_user_key(key))) { + DETECT_LOG(WARN, "user key serialization failed", PRINT_WRAPPER); + } else if (OB_FAIL(get_detector_(user_key, ref_guard))) { + if (OB_ENTRY_NOT_EXIST == ret) { + exist = false; + ret = OB_SUCCESS; + } + } else { + exist = true; + } + return ret; + #undef PRINT_WRAPPER +} // unregister a user specified key // unregister action means: // 1. the detector instance associated with user specified key will be released @@ -295,7 +319,6 @@ template int ObDeadLockDetectorMgr::unregister_key(const KeyType &key) { CHECK_INIT(); - CHECK_ENABLED(); CHECK_ARGS(key); #define PRINT_WRAPPER KR(ret), K(key) int ret = common::OB_SUCCESS; diff --git a/src/share/deadlock/ob_lcl_scheme/ob_lcl_batch_sender_thread.cpp b/src/share/deadlock/ob_lcl_scheme/ob_lcl_batch_sender_thread.cpp index 0788690ee..5af1b038f 100644 --- a/src/share/deadlock/ob_lcl_scheme/ob_lcl_batch_sender_thread.cpp +++ b/src/share/deadlock/ob_lcl_scheme/ob_lcl_batch_sender_thread.cpp @@ -10,6 +10,7 @@ * See the Mulan PubL v2 for more details. */ +#include "lib/ob_errno.h" #include "share/ob_occam_time_guard.h" #include "ob_lcl_batch_sender_thread.h" #include "lib/atomic/ob_atomic.h" @@ -19,6 +20,8 @@ #include "ob_lcl_parameters.h" #include "share/deadlock/ob_deadlock_arg_checker.h" #include "share/deadlock/ob_deadlock_detector_rpc.h" +#include +#include namespace oceanbase { @@ -38,10 +41,15 @@ bool ObLCLBatchSenderThread::RemoveIfOp::operator()(const ObDependencyResource & int temp_ret = OB_SUCCESS; DETECT_TIME_GUARD(100_ms); - if (OB_SUCCESS != (temp_ret = lcl_message_list_.push_back(lcl_msg))) { + if (lcl_message_list_.count() >= LCL_MSG_CACHE_LIMIT) { + temp_ret = OB_BUF_NOT_ENOUGH; + ret = false; + DETECT_LOG_RET(WARN, temp_ret, "LCL message fetch failed", + KR(temp_ret), K(lcl_msg)); + } else if (OB_SUCCESS != (temp_ret = lcl_message_list_.push_back(lcl_msg))) { ret = false; DETECT_LOG_RET(WARN, temp_ret, "push lcl message to lcl_message_list failed", - KR(temp_ret), K(lcl_msg)); + KR(temp_ret), K(lcl_msg)); } return ret; } @@ -64,7 +72,7 @@ int ObLCLBatchSenderThread::init() int ret = OB_SUCCESS; if (OB_FAIL(share::ObThreadPool::init())) { DETECT_LOG(WARN, "init thread failed", K(ret), KP(this), K(MTL_ID())); - } else if (OB_FAIL(lcl_msg_map_.init(MEMORY_LABEL, MTL_ID()))) { + } else if (OB_FAIL(lcl_msg_map_.init("LCLSender", MTL_ID()))) { DETECT_LOG(WARN, "init thread failed", K(ret), KP(this), K(MTL_ID())); } else { is_inited_ = true; @@ -85,28 +93,64 @@ int ObLCLBatchSenderThread::start() return ret; } -int ObLCLBatchSenderThread::cache_msg(const ObDependencyResource &key, - const ObLCLMessage &lcl_msg) +int ObLCLBatchSenderThread::cache_msg(const ObDependencyResource &key, const ObLCLMessage &lcl_msg) { - CHECK_INIT_AND_START(); - #define PRINT_WRAPPER KR(ret), K(key), K(lcl_msg) + #define PRINT_WRAPPER KR(ret), K(key), K(lcl_msg), K(can_insert), K(random_drop_percentage) int ret = OB_SUCCESS; + CHECK_INIT_AND_START(); ObLCLBatchSenderThread::MergeOp op(lcl_msg); - + int64_t random_drop_percentage = 0; DETECT_TIME_GUARD(100_ms); - do { + int64_t msg_count = lcl_msg_map_.count(); + bool can_insert = false; + if (msg_count < LCL_MSG_CACHE_LIMIT / 2) {// always keep + can_insert = true; + } else if (msg_count < LCL_MSG_CACHE_LIMIT) {// random drop + int64_t keep_threshold = LCL_MSG_CACHE_LIMIT / 2; + // more keeping messages means higher probability to drop new appended one + // if reach LCL_MSG_CACHE_LIMIT, definitely drop + random_drop_percentage = (msg_count - keep_threshold) * 100 / keep_threshold; + can_insert = distribution_(random_generator_) > random_drop_percentage; + } else {// always drop + can_insert = false; + random_drop_percentage = 100; + } + if (OB_FAIL(insert_or_merge_(key, lcl_msg, can_insert))) { + DETECT_LOG(WARN, "lcl message is droped", PRINT_WRAPPER); + } + return ret; + #undef PRINT_WRAPPER +} + +int ObLCLBatchSenderThread::insert_or_merge_(const ObDependencyResource &key, + const ObLCLMessage &lcl_message, + const bool can_insert) +{ + #define PRINT_WRAPPER KR(ret), K(key), K(lcl_message), K(can_insert), K(msg_count) + DETECT_TIME_GUARD(100_ms); + int ret = OB_SUCCESS; + ObLCLBatchSenderThread::MergeOp op(lcl_message); + int64_t msg_count = lcl_msg_map_.count(); + do {// there may be concurrent problem, so need retry until success or meet can't handle failure if (OB_SUCCESS != ret) { DETECT_LOG(INFO, "try again", PRINT_WRAPPER); } - if (OB_SUCC(lcl_msg_map_.insert(key, lcl_msg))) { - // do nothing - } else if (OB_ENTRY_EXIST != ret) { - DETECT_LOG(WARN, "this error can't handle", PRINT_WRAPPER); - break; - } else if (OB_SUCC(lcl_msg_map_.operate(key, op))) { - // do nothing - } else if (OB_ENTRY_NOT_EXIST != ret) { - DETECT_LOG(WARN, "this error can't handle", PRINT_WRAPPER); + if (can_insert) {// try insert first, if exist, try update merge then + if (OB_SUCC(lcl_msg_map_.insert(key, lcl_message))) { + } else if (OB_ENTRY_EXIST != ret) { + DETECT_LOG(WARN, "this error can't handle", PRINT_WRAPPER); + break; + } else if (OB_SUCC(lcl_msg_map_.operate(key, op))) { + } else if (OB_ENTRY_NOT_EXIST != ret) { + DETECT_LOG(WARN, "this error can't handle", PRINT_WRAPPER); + } + } else {// just try update merge + if (OB_FAIL(lcl_msg_map_.operate(key, op))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_BUF_NOT_ENOUGH; + } + } + break;// no matter success or not, no retry } } while (CLICK() && (OB_ENTRY_NOT_EXIST == ret) && ATOMIC_LOAD(&is_running_)); return ret; @@ -149,18 +193,16 @@ void ObLCLBatchSenderThread::record_summary_info_and_logout_when_necessary_(int6 total_busy_time_ += diff; if (total_record_time_ > 5L * 1000L * 1000L) {// 5s - double duty_ratio = double(total_busy_time_) / total_record_time_ * 100; + int duty_ratio_percentage = double(total_busy_time_) / total_record_time_ * 100; int64_t total_constructed_detector = ATOMIC_LOAD(&ObIDeadLockDetector::total_constructed_count); int64_t total_destructed_detector = ATOMIC_LOAD(&ObIDeadLockDetector::total_destructed_count); int64_t total_alived_detector = total_constructed_detector - total_destructed_detector; - DETECT_LOG(INFO, "ObLCLBatchSenderThread periodic report summary info", + DETECT_LOG(INFO, "ObLCLBatchSenderThread periodic report summary info", K(duty_ratio_percentage), K(total_constructed_detector), K(total_destructed_detector), - K(total_alived_detector), K(duty_ratio), - K(int64_t(ObServerConfig::get_instance()._lcl_op_interval)), K(*this)); + K(total_alived_detector), K(_lcl_op_interval), K(lcl_msg_map_.count()), K(*this)); total_record_time_ = 0; total_busy_time_ = 0; over_night_times_ = 0; - duty_ratio = 0; } } @@ -173,6 +215,7 @@ void ObLCLBatchSenderThread::run1() int64_t diff = 0; ObArray mock_lcl_message_list; + mock_lcl_message_list.set_label("LCLArray"); ObLCLBatchSenderThread::RemoveIfOp op(mock_lcl_message_list); lib::set_thread_name("LCLSender"); while(ATOMIC_LOAD(&is_running_)) { @@ -190,17 +233,21 @@ void ObLCLBatchSenderThread::run1() DETECT_TIME_GUARD(50_ms < _lcl_op_interval ? 50_ms : _lcl_op_interval); begin_ts = ObClockGenerator::getRealClock(); mock_lcl_message_list.reset(); - if (OB_FAIL(lcl_msg_map_.remove_if(op))) { - DETECT_LOG(WARN, "can't fill mock_lcl_message_list", KR(ret)); - } - CLICK(); - for (int64_t idx = 0; idx < mock_lcl_message_list.count(); ++idx) { - const ObLCLMessage &msg = mock_lcl_message_list.at(idx); - if (OB_FAIL(mgr_->get_rpc().post_lcl_message(msg.get_addr(), msg))) { - DETECT_LOG(WARN, "send LCL msg failed", KR(ret), K(msg)); - CLICK(); - } else { - DETECT_LOG(DEBUG, "send LCL msg success", K(msg)); + if (ATOMIC_LOAD(&allow_send_)) { + if (OB_FAIL(lcl_msg_map_.remove_if(op))) { + DETECT_LOG(WARN, "can't fill mock_lcl_message_list", KR(ret)); + lcl_msg_map_.reset();// if fetch failed, remove all + } + CLICK(); + for (int64_t idx = 0; idx < mock_lcl_message_list.count(); ++idx) { + const ObLCLMessage &msg = mock_lcl_message_list.at(idx); + if (OB_ISNULL(mgr_)) { + } else if (OB_FAIL(mgr_->get_rpc().post_lcl_message(msg.get_addr(), msg))) { + DETECT_LOG(WARN, "send LCL msg failed", KR(ret), K(msg)); + CLICK(); + } else { + DETECT_LOG(DEBUG, "send LCL msg success", K(msg)); + } } } } diff --git a/src/share/deadlock/ob_lcl_scheme/ob_lcl_batch_sender_thread.h b/src/share/deadlock/ob_lcl_scheme/ob_lcl_batch_sender_thread.h index 76f9fe058..f8e3c63e0 100644 --- a/src/share/deadlock/ob_lcl_scheme/ob_lcl_batch_sender_thread.h +++ b/src/share/deadlock/ob_lcl_scheme/ob_lcl_batch_sender_thread.h @@ -20,6 +20,8 @@ #include "lib/container/ob_array.h" #include "lib/hash/ob_linear_hash_map.h" #include "share/deadlock/ob_deadlock_detector_common_define.h" +#include +#include namespace oceanbase { @@ -31,14 +33,19 @@ class ObDeadLockDetectorMgr; class ObLCLBatchSenderThread : public share::ObThreadPool { + using RandomGenerator = std::mt19937;// high quanlity random generator advised by cppreference + using RandomDistribution = std::uniform_int_distribution<>;// random range public: ObLCLBatchSenderThread(ObDeadLockDetectorMgr *mgr) : is_inited_(false), is_running_(false), + allow_send_(true), total_record_time_(0), total_busy_time_(0), over_night_times_(0), - mgr_(mgr) {} + mgr_(mgr), + random_generator_(std::time(nullptr)), + distribution_(0, 100) {} ~ObLCLBatchSenderThread() { destroy(); } int init(); int start(); @@ -47,8 +54,7 @@ public: void destroy(); void run1(); public: - int cache_msg(const ObDependencyResource &key, - const ObLCLMessage &lcl_msg); + int cache_msg(const ObDependencyResource &key, const ObLCLMessage &lcl_msg); TO_STRING_KV(KP(this), K_(is_inited), K_(is_running), K_(total_record_time), K_(over_night_times)); private: class RemoveIfOp @@ -68,16 +74,22 @@ private: const ObLCLMessage &lcl_message_; }; private: + int insert_or_merge_(const ObDependencyResource &key, + const ObLCLMessage &lcl_message, + const bool can_insert); int64_t update_and_get_lcl_op_interval_(); void record_summary_info_and_logout_when_necessary_(int64_t, int64_t, int64_t); private: bool is_inited_; bool is_running_; + bool allow_send_; // for unittest mock used int64_t total_record_time_; int64_t total_busy_time_; int64_t over_night_times_; ObDeadLockDetectorMgr* mgr_; common::ObLinearHashMap lcl_msg_map_; + RandomGenerator random_generator_; + RandomDistribution distribution_; }; } diff --git a/src/share/deadlock/ob_lcl_scheme/ob_lcl_node.cpp b/src/share/deadlock/ob_lcl_scheme/ob_lcl_node.cpp index b280f0925..9c86114d5 100644 --- a/src/share/deadlock/ob_lcl_scheme/ob_lcl_node.cpp +++ b/src/share/deadlock/ob_lcl_scheme/ob_lcl_node.cpp @@ -496,9 +496,7 @@ int ObLCLNode::broadcast_(const BlockList &list, lclv, public_label, ObClockGenerator::getRealClock()); - if (CLICK() && OB_FAIL(MTL(ObDeadLockDetectorMgr*)->sender_thread_.cache_msg(list.at(idx), msg))) { - DETECT_LOG_(WARN, "cache message failed", KR(ret), K(msg), K(list), K(list), K(*this), K(lbt())); - } + MTL(ObDeadLockDetectorMgr*)->sender_thread_.cache_msg(list.at(idx), msg); } return ret; @@ -843,7 +841,7 @@ int ObLCLNode::push_state_to_downstreams_with_lock_() void ObLCLNode::update_lcl_period_if_necessary_with_lock_() { int ret = OB_SUCCESS; - DETECT_TIME_GUARD(100_us); + DETECT_TIME_GUARD(10_ms); int64_t current_ts = ObClockGenerator::getRealClock(); int64_t new_period_ = current_ts / PERIOD; int64_t timeout_ts = 0; @@ -869,7 +867,7 @@ bool ObLCLNode::if_phase_match_(const int64_t ts, int64_t my_phase = ts / PHASE_TIME; int64_t msg_phase = msg.get_send_ts() / PHASE_TIME; - DETECT_TIME_GUARD(100_us); + DETECT_TIME_GUARD(10_ms); if (my_phase != msg_phase) { ret = false; } diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index 1125fc9a4..73bebd352 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -407,13 +407,18 @@ int ObSqlTransControl::do_end_trans_(ObSQLSessionInfo *session, { int ret = OB_SUCCESS; transaction::ObTxDesc *&tx_ptr = session->get_tx_desc(); - if (session->is_registered_to_deadlock()) { - if (OB_SUCC(MTL(share::detector::ObDeadLockDetectorMgr*)->unregister_key(tx_ptr->tid()))) { - DETECT_LOG(INFO, "unregister deadlock detector in do end trans", KPC(tx_ptr)); - } else { - DETECT_LOG(WARN, "unregister deadlock detector in do end trans failed", KPC(tx_ptr)); - } - session->set_registered_to_deadlock(false); + bool is_detector_exist = false; + int tmp_ret = OB_SUCCESS; + if (OB_ISNULL(MTL(share::detector::ObDeadLockDetectorMgr*))) { + tmp_ret = OB_BAD_NULL_ERROR; + DETECT_LOG(WARN, "MTL ObDeadLockDetectorMgr is NULL", K(tmp_ret), K(tx_ptr->tid())); + } else if (OB_TMP_FAIL(MTL(share::detector::ObDeadLockDetectorMgr*)-> + check_detector_exist(tx_ptr->tid(), is_detector_exist))) { + DETECT_LOG(WARN, "fail to check detector exist, may causing detector leak", K(tmp_ret), + K(tx_ptr->tid())); + } else if (is_detector_exist) { + ObTransDeadlockDetectorAdapter::unregister_from_deadlock_detector(tx_ptr->tid(), + ObTransDeadlockDetectorAdapter::UnregisterPath::DO_END_TRANS); } if (session->associated_xa() && !is_explicit) { ret = OB_TRANS_XA_RMFAIL; diff --git a/src/sql/session/ob_sql_session_info.cpp b/src/sql/session/ob_sql_session_info.cpp index 73026c4c8..2d7d95fec 100644 --- a/src/sql/session/ob_sql_session_info.cpp +++ b/src/sql/session/ob_sql_session_info.cpp @@ -169,7 +169,6 @@ ObSQLSessionInfo::ObSQLSessionInfo() : is_table_name_hidden_(false), piece_cache_(NULL), is_load_data_exec_session_(false), - is_registered_to_deadlock_(false), pl_exact_err_msg_(), is_ps_prepare_stage_(false), got_conn_res_(false), @@ -197,7 +196,6 @@ int ObSQLSessionInfo::init(uint32_t sessid, uint64_t proxy_sessid, UNUSED(tenant_id); int ret = OB_SUCCESS; static const int64_t PS_BUCKET_NUM = 64; - set_registered_to_deadlock(false); if (OB_FAIL(ObBasicSessionInfo::init(sessid, proxy_sessid, bucket_allocator, tz_info))) { LOG_WARN("fail to init basic session info", K(ret)); } else if (!is_acquire_from_pool() && @@ -325,7 +323,6 @@ void ObSQLSessionInfo::reset(bool skip_sys_var) prelock_ = false; proxy_version_ = 0; min_proxy_version_ps_ = 0; - set_registered_to_deadlock(false); if (OB_NOT_NULL(mem_context_)) { destroy_contexts_map(contexts_map_, mem_context_->get_malloc_allocator()); DESTROY_CONTEXT(mem_context_); diff --git a/src/sql/session/ob_sql_session_info.h b/src/sql/session/ob_sql_session_info.h index 7e743d53c..1b9b94bc7 100644 --- a/src/sql/session/ob_sql_session_info.h +++ b/src/sql/session/ob_sql_session_info.h @@ -963,8 +963,6 @@ public: cached_tenant_config_info_.refresh(); return cached_tenant_config_info_.get_enable_sql_extension(); } - bool is_registered_to_deadlock() const { return ATOMIC_LOAD(&is_registered_to_deadlock_); } - void set_registered_to_deadlock(bool state) { ATOMIC_SET(&is_registered_to_deadlock_, state); } bool is_ps_prepare_stage() const { return is_ps_prepare_stage_; } void set_is_ps_prepare_stage(bool v) { is_ps_prepare_stage_ = v; } int get_tenant_audit_trail_type(ObAuditTrailType &at_type) @@ -1139,8 +1137,6 @@ private: bool is_table_name_hidden_; void *piece_cache_; bool is_load_data_exec_session_; - // 记录session是否注册过死锁检测的信息 - bool is_registered_to_deadlock_; ObSqlString pl_exact_err_msg_; bool is_ps_prepare_stage_; // Record whether this session has got connection resource, which means it increased connections count. diff --git a/src/storage/tx/ob_trans_deadlock_adapter.cpp b/src/storage/tx/ob_trans_deadlock_adapter.cpp index 32df8d084..4df58e2cb 100644 --- a/src/storage/tx/ob_trans_deadlock_adapter.cpp +++ b/src/storage/tx/ob_trans_deadlock_adapter.cpp @@ -358,7 +358,6 @@ int ObTransDeadlockDetectorAdapter::register_remote_execution_to_deadlock_detect DETECT_LOG(WARN, "fail to register deadlock", PRINT_WRAPPER); } else { MTL(ObDeadLockDetectorMgr*)->set_timeout(self_tx_id, query_timeout); - session_guard->set_registered_to_deadlock(true); if (OB_FAIL(MTL(ObDeadLockDetectorMgr*)->block(self_tx_id, blocked_resources))) { DETECT_LOG(WARN, "block on resource failed", PRINT_WRAPPER); } else if (self_tx_scheduler != GCTX.self_addr()) { @@ -408,8 +407,7 @@ int ObTransDeadlockDetectorAdapter::remote_execution_replace_conflict_trans_ids_ DETECT_LOG(WARN, "replace block list failed", PRINT_WRAPPER); } } else { - try_unregister_deadlock_detector_(session_guard.get_session(), - self_tx_id, + unregister_from_deadlock_detector(self_tx_id, UnregisterPath::REPLACE_MEET_TOTAL_DIFFERENT_LIST); DETECT_LOG(WARN, "unregister detector cause meet total different block list", PRINT_WRAPPER); } @@ -426,6 +424,7 @@ int ObTransDeadlockDetectorAdapter::register_remote_execution_or_replace_conflic CHECK_DEADLOCK_ENABLED(); int ret = OB_SUCCESS; SessionGuard session_guard; + bool is_detector_exist = false; if (self_session_id == 1) { DETECT_LOG(INFO, "inner session no need register to deadlock", PRINT_WRAPPER); } else if (self_session_id == 0) { @@ -437,7 +436,12 @@ int ObTransDeadlockDetectorAdapter::register_remote_execution_or_replace_conflic } else if (!session_guard.is_valid()) { ret = OB_ERR_UNEXPECTED; DETECT_LOG(ERROR, "fail to get session info", PRINT_WRAPPER); - } else if (!session_guard->is_registered_to_deadlock()) { + } else if (OB_ISNULL(MTL(ObDeadLockDetectorMgr*))) { + ret = OB_ERR_UNEXPECTED; + DETECT_LOG(ERROR, "MTL ObDeadLockDetectorMgr is NULL", PRINT_WRAPPER); + } else if (OB_FAIL(MTL(ObDeadLockDetectorMgr*)->check_detector_exist(self_tx_id, is_detector_exist))) { + DETECT_LOG(WARN, "fail to get detector exist status", PRINT_WRAPPER); + } else if (!is_detector_exist) { if (OB_FAIL(register_remote_execution_to_deadlock_detector_(self_tx_id, self_session_id, conflict_tx_ids, session_guard))) { DETECT_LOG(WARN, "register new detector in remote execution failed", PRINT_WRAPPER); } else { @@ -646,15 +650,15 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob // no trans opened, for example:read-only trans } else if (is_rollback) {// statment is failed, maybe will try again, check if need register to deadlock detector if (session->get_query_timeout_ts() < ObClockGenerator::getCurrentTime()) { - try_unregister_deadlock_detector_(*session, desc->tid(), UnregisterPath::END_STMT_TIMEOUT); + unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_TIMEOUT); DETECT_LOG(INFO, "query timeout, no need register to deadlock", PRINT_WRAPPER); } else if (OB_FAIL(desc->fetch_conflict_txs(conflict_txs))) { DETECT_LOG(WARN, "fail to get conflict txs from desc", PRINT_WRAPPER); } else if (conflict_txs.empty()) { - try_unregister_deadlock_detector_(*session, desc->tid(), UnregisterPath::END_STMT_NO_CONFLICT); + unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_NO_CONFLICT); DETECT_LOG(INFO, "try unregister deadlock detecotr cause conflict array is empty", PRINT_WRAPPER); } else if (exec_ctx.get_errcode() != OB_TRY_LOCK_ROW_CONFLICT) { - try_unregister_deadlock_detector_(*session, desc->tid(), UnregisterPath::END_STMT_OTHER_ERR); + unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_OTHER_ERR); DETECT_LOG(INFO, "try unregister deadlock detecotr cause meet non-lock error", PRINT_WRAPPER); } else if (OB_FAIL(register_remote_execution_or_replace_conflict_trans_ids(desc->tid(), session->get_sessid(), @@ -662,10 +666,10 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob DETECT_LOG(WARN, "register or replace list failed", PRINT_WRAPPER); } else { desc->reset_conflict_txs(); - DETECT_LOG(INFO, "maintain deadlock info when end_stmt", PRINT_WRAPPER); + DETECT_LOG(TRACE, "maintain deadlock info when end_stmt", PRINT_WRAPPER); } } else {// statment is done, will not try again, all related deadlock info should be resetted - try_unregister_deadlock_detector_(*session, desc->tid(), UnregisterPath::END_STMT_DONE); + unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_DONE); DETECT_LOG(TRACE, "unregister from deadlock detector", KR(ret), K(desc->tid())); } return ret; @@ -814,26 +818,17 @@ void ObTransDeadlockDetectorAdapter::unregister_from_deadlock_detector(const ObT { int ret = common::OB_SUCCESS; ObDeadLockDetectorMgr *mgr = nullptr; - if (ObDeadLockDetectorMgr::is_deadlock_enabled()) { - if (nullptr == (mgr = MTL(ObDeadLockDetectorMgr*))) { - ret = OB_ERR_UNEXPECTED; - DETECT_LOG(WARN, "fail to get ObDeadLockDetectorMgr", K(self_trans_id), K(to_string(path))); - } else if (OB_FAIL(mgr->unregister_key(self_trans_id))) { + if (nullptr == (mgr = MTL(ObDeadLockDetectorMgr*))) { + ret = OB_ERR_UNEXPECTED; + DETECT_LOG(WARN, "fail to get ObDeadLockDetectorMgr", K(self_trans_id), K(to_string(path))); + } else if (OB_FAIL(mgr->unregister_key(self_trans_id))) { + if (OB_ENTRY_NOT_EXIST != ret) { DETECT_LOG(WARN, "unregister from deadlock detector failed", K(self_trans_id), K(to_string(path))); } else { - DETECT_LOG(INFO, "unregister from deadlock detector success", K(self_trans_id), K(to_string(path))); + ret = OB_SUCCESS;// it's ok if detector not exist } - } -} - -void ObTransDeadlockDetectorAdapter::try_unregister_deadlock_detector_(sql::ObSQLSessionInfo &session, - const ObTransID &trans_id, - UnregisterPath path) -{ - if (OB_UNLIKELY(session.is_registered_to_deadlock())) { - unregister_from_deadlock_detector(trans_id, path); - session.set_registered_to_deadlock(false); - DETECT_LOG(INFO, "unregister from deadlock detector", K(trans_id), K(to_string(path))); + } else { + DETECT_LOG(INFO, "unregister from deadlock detector success", K(self_trans_id), K(to_string(path))); } } diff --git a/src/storage/tx/ob_trans_deadlock_adapter.h b/src/storage/tx/ob_trans_deadlock_adapter.h index 09a460b38..5378fd9fd 100644 --- a/src/storage/tx/ob_trans_deadlock_adapter.h +++ b/src/storage/tx/ob_trans_deadlock_adapter.h @@ -76,7 +76,8 @@ class ObTransDeadlockDetectorAdapter END_STMT_OTHER_ERR, END_STMT_NO_CONFLICT, END_STMT_TIMEOUT, - REPLACE_MEET_TOTAL_DIFFERENT_LIST + REPLACE_MEET_TOTAL_DIFFERENT_LIST, + DO_END_TRANS, }; static const char* to_string(const UnregisterPath path) { @@ -99,6 +100,8 @@ class ObTransDeadlockDetectorAdapter return "END_STMT_TIMEOUT"; case UnregisterPath::REPLACE_MEET_TOTAL_DIFFERENT_LIST: return "REPLACE_MEET_TOTAL_DIFFERENT_LIST"; + case UnregisterPath::DO_END_TRANS: + return "DO_END_TRANS"; default: return "UNKNOWN"; } diff --git a/unittest/share/deadlock/CMakeLists.txt b/unittest/share/deadlock/CMakeLists.txt index 3ac5b4cb6..1820b83a2 100644 --- a/unittest/share/deadlock/CMakeLists.txt +++ b/unittest/share/deadlock/CMakeLists.txt @@ -1,2 +1,3 @@ storage_unittest(test_key_wrapper) storage_unittest(test_deadlock_utility) +storage_unittest(test_lcl_message_drop) diff --git a/unittest/share/deadlock/test_lcl_message_drop.cpp b/unittest/share/deadlock/test_lcl_message_drop.cpp new file mode 100644 index 000000000..96d4d4f30 --- /dev/null +++ b/unittest/share/deadlock/test_lcl_message_drop.cpp @@ -0,0 +1,257 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#include "lib/ob_errno.h" +#include +#include +#define private public +#define protected public +#include "share/deadlock/ob_deadlock_key_wrapper.h" +#include "share/deadlock/ob_deadlock_detector_mgr.h" +#include "test_key.h" +#include +#include +#include +#include +#include +#include "share/deadlock/ob_lcl_scheme/ob_lcl_batch_sender_thread.h" +#include "share/deadlock/ob_lcl_scheme/ob_lcl_message.h" +#include "storage/tx/ob_trans_define_v4.h" + +namespace oceanbase { +namespace unittest { + +using namespace common; +using namespace share::detector; +using namespace std; + + +class TestLCLMsgDrop : public ::testing::Test { +public: + TestLCLMsgDrop() {} + ~TestLCLMsgDrop() {} + virtual void SetUp() { share::ObTenantEnv::get_tenant_local()->id_ = 1; } + virtual void TearDown() {} + static ObLCLBatchSenderThread batch_sender_; + int port; +}; +ObLCLBatchSenderThread TestLCLMsgDrop::batch_sender_(nullptr); + +TEST_F(TestLCLMsgDrop, always_keep) {// the first 2048 will always success + ASSERT_EQ(OB_SUCCESS, batch_sender_.init()); + ASSERT_EQ(OB_SUCCESS, batch_sender_.start()); + ATOMIC_STORE(&batch_sender_.allow_send_, false); + ObLCLMessage mock_message; + ObAddr mock_addr(ObAddr::VER::IPV4, "127.0.0.1", 0); + UserBinaryKey mock_self_key, mock_dest_key; + ObLCLLabel mock_lcl_label(1, ObDetectorPriority(1)); + mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1); + ASSERT_EQ(OB_SUCCESS, mock_self_key.set_user_key(transaction::ObTransID(1))); + ASSERT_EQ(OB_SUCCESS, mock_dest_key.set_user_key(transaction::ObTransID(1))); + + for (int i = 0; i < LCL_MSG_CACHE_LIMIT / 2; ++i) { + mock_addr.set_port(i + 1); + ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr, + mock_dest_key, + ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1), + mock_self_key, + 0, + mock_lcl_label, + 1)); + ObDependencyResource mock_resource(mock_addr, mock_dest_key); + ObDependencyResource mock_resource2(mock_addr, mock_dest_key); + OB_ASSERT(mock_resource == mock_resource2); + ASSERT_EQ(OB_SUCCESS, batch_sender_.cache_msg(mock_resource, mock_message)); + ObLCLMessage read_message; + ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.get(mock_resource, read_message)); + DETECT_LOG(INFO, "print mock resource message", K(read_message)); + } +} + +TEST_F(TestLCLMsgDrop, random_drop_25_percentage) { + int ret = OB_SUCCESS; + ObLCLMessage mock_message; + ObAddr mock_addr(ObAddr::VER::IPV4, "127.0.0.1", 0); + UserBinaryKey mock_self_key, mock_dest_key; + ObLCLLabel mock_lcl_label(1, ObDetectorPriority(1)); + mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1); + ASSERT_EQ(OB_SUCCESS, mock_self_key.set_user_key(transaction::ObTransID(1))); + ASSERT_EQ(OB_SUCCESS, mock_dest_key.set_user_key(transaction::ObTransID(1))); + for (int i = LCL_MSG_CACHE_LIMIT / 2; i < LCL_MSG_CACHE_LIMIT / 2 + LCL_MSG_CACHE_LIMIT / 8; ++i) { + mock_addr.set_port(i + 1); + ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr, + mock_dest_key, + ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1), + mock_self_key, + 0, + mock_lcl_label, + 1)); + ObDependencyResource mock_resource(mock_addr, mock_dest_key); + ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.insert(mock_resource, mock_message)); + } + + mock_addr.set_port(65535); + int fail_times = 0, succ_times = 0; + for (int i = 0; i < 100000; ++i) { + ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr, + mock_dest_key, + ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1), + mock_self_key, + 0, + mock_lcl_label, + 1)); + ObDependencyResource mock_resource(mock_addr, mock_dest_key); + if (OB_SUCC(batch_sender_.cache_msg(mock_resource, mock_message))) { + ++succ_times; + batch_sender_.lcl_msg_map_.erase(mock_resource); + } else { + ++fail_times; + } + } + int fail_percentage = fail_times * 100 / (succ_times + fail_times); + ASSERT_GE(fail_percentage, 20); + ASSERT_LE(fail_percentage, 30); + DETECT_LOG(INFO, "print drop percentage", K(fail_percentage)); +} + +TEST_F(TestLCLMsgDrop, random_drop_75_percentage) { + int ret = OB_SUCCESS; + ObLCLMessage mock_message; + ObAddr mock_addr(ObAddr::VER::IPV4, "127.0.0.1", 0); + UserBinaryKey mock_self_key, mock_dest_key; + ObLCLLabel mock_lcl_label(1, ObDetectorPriority(1)); + mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1); + ASSERT_EQ(OB_SUCCESS, mock_self_key.set_user_key(transaction::ObTransID(1))); + ASSERT_EQ(OB_SUCCESS, mock_dest_key.set_user_key(transaction::ObTransID(1))); + for (int i = LCL_MSG_CACHE_LIMIT / 2 + LCL_MSG_CACHE_LIMIT / 8; i < LCL_MSG_CACHE_LIMIT / 2 + LCL_MSG_CACHE_LIMIT * 3 / 8; ++i) { + mock_addr.set_port(i + 1); + ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr, + mock_dest_key, + ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1), + mock_self_key, + 0, + mock_lcl_label, + 1)); + ObDependencyResource mock_resource(mock_addr, mock_dest_key); + ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.insert(mock_resource, mock_message)); + } + + mock_addr.set_port(65535); + int fail_times = 0, succ_times = 0; + for (int i = 0; i < 100000; ++i) { + ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr, + mock_dest_key, + ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1), + mock_self_key, + 0, + mock_lcl_label, + 1)); + ObDependencyResource mock_resource(mock_addr, mock_dest_key); + if (OB_SUCC(batch_sender_.cache_msg(mock_resource, mock_message))) { + ++succ_times; + batch_sender_.lcl_msg_map_.erase(mock_resource); + } else { + ++fail_times; + } + } + int fail_percentage = fail_times * 100 / (succ_times + fail_times); + ASSERT_GE(fail_percentage, 70); + ASSERT_LE(fail_percentage, 80); + DETECT_LOG(INFO, "print drop percentage", K(fail_percentage)); +} + +TEST_F(TestLCLMsgDrop, always_drop) { + int ret = OB_SUCCESS; + ObLCLMessage mock_message; + ObAddr mock_addr(ObAddr::VER::IPV4, "127.0.0.1", 0); + UserBinaryKey mock_self_key, mock_dest_key; + ObLCLLabel mock_lcl_label(1, ObDetectorPriority(1)); + mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1); + ASSERT_EQ(OB_SUCCESS, mock_self_key.set_user_key(transaction::ObTransID(1))); + ASSERT_EQ(OB_SUCCESS, mock_dest_key.set_user_key(transaction::ObTransID(1))); + for (int i = LCL_MSG_CACHE_LIMIT / 2 + LCL_MSG_CACHE_LIMIT * 3 / 8; i < LCL_MSG_CACHE_LIMIT; ++i) { + mock_addr.set_port(i + 1); + ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr, + mock_dest_key, + ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1), + mock_self_key, + 0, + mock_lcl_label, + 1)); + ObDependencyResource mock_resource(mock_addr, mock_dest_key); + ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.insert(mock_resource, mock_message)); + } + + mock_addr.set_port(65535); + for (int i = 0; i < 100000; ++i) { + ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr, + mock_dest_key, + ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1), + mock_self_key, + 0, + mock_lcl_label, + 1)); + ObDependencyResource mock_resource(mock_addr, mock_dest_key); + ASSERT_EQ(OB_BUF_NOT_ENOUGH, batch_sender_.cache_msg(mock_resource, mock_message)); + } +} + +TEST_F(TestLCLMsgDrop, message_merge_when_reach_limit) { + int ret = OB_SUCCESS; + ObLCLMessage mock_message; + ObLCLMessage read_message; + ObAddr mock_addr(ObAddr::VER::IPV4, "127.0.0.1", 1); + UserBinaryKey mock_self_key, mock_dest_key; + ObLCLLabel mock_lcl_label(0, ObDetectorPriority(0)); + mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1); + ASSERT_EQ(OB_SUCCESS, mock_self_key.set_user_key(transaction::ObTransID(1))); + ASSERT_EQ(OB_SUCCESS, mock_dest_key.set_user_key(transaction::ObTransID(1))); + + ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr, + mock_dest_key, + ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1), + mock_self_key, + 0, + mock_lcl_label, + 1)); + ObDependencyResource mock_resource(mock_addr, mock_dest_key); + ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.get(mock_resource, read_message)); + DETECT_LOG(INFO, "print mock resource message", K(read_message)); + ASSERT_EQ(OB_SUCCESS, batch_sender_.cache_msg(mock_resource, mock_message)); + ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.get(mock_resource, read_message)); + ASSERT_EQ(0, read_message.label_.id_); + + new (&mock_lcl_label) ObLCLLabel(1, ObDetectorPriority(1)); + mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1); + ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr, + mock_dest_key, + ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1), + mock_self_key, + 1, + mock_lcl_label, + 1)); + ASSERT_EQ(OB_SUCCESS, batch_sender_.cache_msg(mock_resource, mock_message)); + ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.get(mock_resource, read_message)); + ASSERT_EQ(1, read_message.label_.id_); +} + +}// namespace unittest +}// namespace oceanbase + +int main(int argc, char **argv) +{ + system("rm -rf test_lcl_message_drop.log"); + oceanbase::common::ObLogger &logger = oceanbase::common::ObLogger::get_logger(); + logger.set_file_name("test_lcl_message_drop.log", false); + logger.set_log_level(OB_LOG_LEVEL_DEBUG); + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file