[DeadLock] fix detector bug

This commit is contained in:
obdev 2023-02-20 12:47:38 +00:00 committed by ob-robot
parent 2e954397ec
commit b1faf451eb
13 changed files with 428 additions and 86 deletions

View File

@ -211,8 +211,9 @@ ObDependencyResource& ObDependencyResource::operator=(const ObDependencyResource
uint64_t ObDependencyResource::hash() const
{
uint64_t hash_val = 0;
hash_val = murmurhash(&addr_, sizeof(addr_), hash_val);
hash_val = murmurhash(&user_key_, sizeof(user_key_), hash_val);
hash_val = addr_.hash();
uint64_t key_hash = user_key_.hash();
hash_val = murmurhash(&key_hash, sizeof(key_hash), hash_val);
return hash_val;
}
@ -225,6 +226,8 @@ bool ObDependencyResource::operator<(const ObDependencyResource &rhs) const
{
if (addr_ < rhs.addr_) {
return true;
} else if (addr_ > rhs.addr_) {
return false;
} else {
if (user_key_ < rhs.user_key_) {
return true;

View File

@ -32,6 +32,11 @@ namespace share
{
namespace detector
{
// if msg in map count below LCL_MSG_CACHE_LIMIT/2, all pending msg is accepted
// if msg in map count greater than LCL_MSG_CACHE_LIMIT/2, but less than LCL_MSG_CACHE_LIMIT,
// random drop appending msg, drop probability depends on how many msg keeping in map,
// if msg count in map reach LCL_MSG_CACHE_LIMIT, drop probability is 100%, no more msg is accepted.
constexpr int64_t LCL_MSG_CACHE_LIMIT = 4096;
class ObLCLMessage;
class ObDependencyResource;

View File

@ -105,6 +105,8 @@ public:
const KeyType2 &parent_key);
template<typename KeyType>
int set_timeout(const KeyType &key, const int64_t timeout);
template<typename KeyType>
int check_detector_exist(const KeyType &key, bool &exist);
// ungister resource operation
template<typename KeyType>
int unregister_key(const KeyType &key);
@ -281,6 +283,28 @@ int ObDeadLockDetectorMgr::register_key(const KeyType &key,
return ret;
#undef PRINT_WRAPPER
}
template<typename KeyType>
int ObDeadLockDetectorMgr::check_detector_exist(const KeyType &key, bool &exist)
{
CHECK_INIT();
CHECK_ARGS(key);
#define PRINT_WRAPPER KR(ret), K(key)
int ret = common::OB_SUCCESS;
UserBinaryKey user_key;
DetectorRefGuard ref_guard;
if (OB_FAIL(user_key.set_user_key(key))) {
DETECT_LOG(WARN, "user key serialization failed", PRINT_WRAPPER);
} else if (OB_FAIL(get_detector_(user_key, ref_guard))) {
if (OB_ENTRY_NOT_EXIST == ret) {
exist = false;
ret = OB_SUCCESS;
}
} else {
exist = true;
}
return ret;
#undef PRINT_WRAPPER
}
// unregister a user specified key
// unregister action means:
// 1. the detector instance associated with user specified key will be released
@ -295,7 +319,6 @@ template<typename KeyType>
int ObDeadLockDetectorMgr::unregister_key(const KeyType &key)
{
CHECK_INIT();
CHECK_ENABLED();
CHECK_ARGS(key);
#define PRINT_WRAPPER KR(ret), K(key)
int ret = common::OB_SUCCESS;

View File

@ -10,6 +10,7 @@
* See the Mulan PubL v2 for more details.
*/
#include "lib/ob_errno.h"
#include "share/ob_occam_time_guard.h"
#include "ob_lcl_batch_sender_thread.h"
#include "lib/atomic/ob_atomic.h"
@ -19,6 +20,8 @@
#include "ob_lcl_parameters.h"
#include "share/deadlock/ob_deadlock_arg_checker.h"
#include "share/deadlock/ob_deadlock_detector_rpc.h"
#include <cstdlib>
#include <exception>
namespace oceanbase
{
@ -38,10 +41,15 @@ bool ObLCLBatchSenderThread::RemoveIfOp::operator()(const ObDependencyResource &
int temp_ret = OB_SUCCESS;
DETECT_TIME_GUARD(100_ms);
if (OB_SUCCESS != (temp_ret = lcl_message_list_.push_back(lcl_msg))) {
if (lcl_message_list_.count() >= LCL_MSG_CACHE_LIMIT) {
temp_ret = OB_BUF_NOT_ENOUGH;
ret = false;
DETECT_LOG_RET(WARN, temp_ret, "LCL message fetch failed",
KR(temp_ret), K(lcl_msg));
} else if (OB_SUCCESS != (temp_ret = lcl_message_list_.push_back(lcl_msg))) {
ret = false;
DETECT_LOG_RET(WARN, temp_ret, "push lcl message to lcl_message_list failed",
KR(temp_ret), K(lcl_msg));
KR(temp_ret), K(lcl_msg));
}
return ret;
}
@ -64,7 +72,7 @@ int ObLCLBatchSenderThread::init()
int ret = OB_SUCCESS;
if (OB_FAIL(share::ObThreadPool::init())) {
DETECT_LOG(WARN, "init thread failed", K(ret), KP(this), K(MTL_ID()));
} else if (OB_FAIL(lcl_msg_map_.init(MEMORY_LABEL, MTL_ID()))) {
} else if (OB_FAIL(lcl_msg_map_.init("LCLSender", MTL_ID()))) {
DETECT_LOG(WARN, "init thread failed", K(ret), KP(this), K(MTL_ID()));
} else {
is_inited_ = true;
@ -85,28 +93,64 @@ int ObLCLBatchSenderThread::start()
return ret;
}
int ObLCLBatchSenderThread::cache_msg(const ObDependencyResource &key,
const ObLCLMessage &lcl_msg)
int ObLCLBatchSenderThread::cache_msg(const ObDependencyResource &key, const ObLCLMessage &lcl_msg)
{
CHECK_INIT_AND_START();
#define PRINT_WRAPPER KR(ret), K(key), K(lcl_msg)
#define PRINT_WRAPPER KR(ret), K(key), K(lcl_msg), K(can_insert), K(random_drop_percentage)
int ret = OB_SUCCESS;
CHECK_INIT_AND_START();
ObLCLBatchSenderThread::MergeOp op(lcl_msg);
int64_t random_drop_percentage = 0;
DETECT_TIME_GUARD(100_ms);
do {
int64_t msg_count = lcl_msg_map_.count();
bool can_insert = false;
if (msg_count < LCL_MSG_CACHE_LIMIT / 2) {// always keep
can_insert = true;
} else if (msg_count < LCL_MSG_CACHE_LIMIT) {// random drop
int64_t keep_threshold = LCL_MSG_CACHE_LIMIT / 2;
// more keeping messages means higher probability to drop new appended one
// if reach LCL_MSG_CACHE_LIMIT, definitely drop
random_drop_percentage = (msg_count - keep_threshold) * 100 / keep_threshold;
can_insert = distribution_(random_generator_) > random_drop_percentage;
} else {// always drop
can_insert = false;
random_drop_percentage = 100;
}
if (OB_FAIL(insert_or_merge_(key, lcl_msg, can_insert))) {
DETECT_LOG(WARN, "lcl message is droped", PRINT_WRAPPER);
}
return ret;
#undef PRINT_WRAPPER
}
int ObLCLBatchSenderThread::insert_or_merge_(const ObDependencyResource &key,
const ObLCLMessage &lcl_message,
const bool can_insert)
{
#define PRINT_WRAPPER KR(ret), K(key), K(lcl_message), K(can_insert), K(msg_count)
DETECT_TIME_GUARD(100_ms);
int ret = OB_SUCCESS;
ObLCLBatchSenderThread::MergeOp op(lcl_message);
int64_t msg_count = lcl_msg_map_.count();
do {// there may be concurrent problem, so need retry until success or meet can't handle failure
if (OB_SUCCESS != ret) {
DETECT_LOG(INFO, "try again", PRINT_WRAPPER);
}
if (OB_SUCC(lcl_msg_map_.insert(key, lcl_msg))) {
// do nothing
} else if (OB_ENTRY_EXIST != ret) {
DETECT_LOG(WARN, "this error can't handle", PRINT_WRAPPER);
break;
} else if (OB_SUCC(lcl_msg_map_.operate(key, op))) {
// do nothing
} else if (OB_ENTRY_NOT_EXIST != ret) {
DETECT_LOG(WARN, "this error can't handle", PRINT_WRAPPER);
if (can_insert) {// try insert first, if exist, try update merge then
if (OB_SUCC(lcl_msg_map_.insert(key, lcl_message))) {
} else if (OB_ENTRY_EXIST != ret) {
DETECT_LOG(WARN, "this error can't handle", PRINT_WRAPPER);
break;
} else if (OB_SUCC(lcl_msg_map_.operate(key, op))) {
} else if (OB_ENTRY_NOT_EXIST != ret) {
DETECT_LOG(WARN, "this error can't handle", PRINT_WRAPPER);
}
} else {// just try update merge
if (OB_FAIL(lcl_msg_map_.operate(key, op))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_BUF_NOT_ENOUGH;
}
}
break;// no matter success or not, no retry
}
} while (CLICK() && (OB_ENTRY_NOT_EXIST == ret) && ATOMIC_LOAD(&is_running_));
return ret;
@ -149,18 +193,16 @@ void ObLCLBatchSenderThread::record_summary_info_and_logout_when_necessary_(int6
total_busy_time_ += diff;
if (total_record_time_ > 5L * 1000L * 1000L) {// 5s
double duty_ratio = double(total_busy_time_) / total_record_time_ * 100;
int duty_ratio_percentage = double(total_busy_time_) / total_record_time_ * 100;
int64_t total_constructed_detector = ATOMIC_LOAD(&ObIDeadLockDetector::total_constructed_count);
int64_t total_destructed_detector = ATOMIC_LOAD(&ObIDeadLockDetector::total_destructed_count);
int64_t total_alived_detector = total_constructed_detector - total_destructed_detector;
DETECT_LOG(INFO, "ObLCLBatchSenderThread periodic report summary info",
DETECT_LOG(INFO, "ObLCLBatchSenderThread periodic report summary info", K(duty_ratio_percentage),
K(total_constructed_detector), K(total_destructed_detector),
K(total_alived_detector), K(duty_ratio),
K(int64_t(ObServerConfig::get_instance()._lcl_op_interval)), K(*this));
K(total_alived_detector), K(_lcl_op_interval), K(lcl_msg_map_.count()), K(*this));
total_record_time_ = 0;
total_busy_time_ = 0;
over_night_times_ = 0;
duty_ratio = 0;
}
}
@ -173,6 +215,7 @@ void ObLCLBatchSenderThread::run1()
int64_t diff = 0;
ObArray<ObLCLMessage> mock_lcl_message_list;
mock_lcl_message_list.set_label("LCLArray");
ObLCLBatchSenderThread::RemoveIfOp op(mock_lcl_message_list);
lib::set_thread_name("LCLSender");
while(ATOMIC_LOAD(&is_running_)) {
@ -190,17 +233,21 @@ void ObLCLBatchSenderThread::run1()
DETECT_TIME_GUARD(50_ms < _lcl_op_interval ? 50_ms : _lcl_op_interval);
begin_ts = ObClockGenerator::getRealClock();
mock_lcl_message_list.reset();
if (OB_FAIL(lcl_msg_map_.remove_if(op))) {
DETECT_LOG(WARN, "can't fill mock_lcl_message_list", KR(ret));
}
CLICK();
for (int64_t idx = 0; idx < mock_lcl_message_list.count(); ++idx) {
const ObLCLMessage &msg = mock_lcl_message_list.at(idx);
if (OB_FAIL(mgr_->get_rpc().post_lcl_message(msg.get_addr(), msg))) {
DETECT_LOG(WARN, "send LCL msg failed", KR(ret), K(msg));
CLICK();
} else {
DETECT_LOG(DEBUG, "send LCL msg success", K(msg));
if (ATOMIC_LOAD(&allow_send_)) {
if (OB_FAIL(lcl_msg_map_.remove_if(op))) {
DETECT_LOG(WARN, "can't fill mock_lcl_message_list", KR(ret));
lcl_msg_map_.reset();// if fetch failed, remove all
}
CLICK();
for (int64_t idx = 0; idx < mock_lcl_message_list.count(); ++idx) {
const ObLCLMessage &msg = mock_lcl_message_list.at(idx);
if (OB_ISNULL(mgr_)) {
} else if (OB_FAIL(mgr_->get_rpc().post_lcl_message(msg.get_addr(), msg))) {
DETECT_LOG(WARN, "send LCL msg failed", KR(ret), K(msg));
CLICK();
} else {
DETECT_LOG(DEBUG, "send LCL msg success", K(msg));
}
}
}
}

View File

@ -20,6 +20,8 @@
#include "lib/container/ob_array.h"
#include "lib/hash/ob_linear_hash_map.h"
#include "share/deadlock/ob_deadlock_detector_common_define.h"
#include <random>
#include <ctime>
namespace oceanbase
{
@ -31,14 +33,19 @@ class ObDeadLockDetectorMgr;
class ObLCLBatchSenderThread : public share::ObThreadPool
{
using RandomGenerator = std::mt19937;// high quanlity random generator advised by cppreference
using RandomDistribution = std::uniform_int_distribution<>;// random range
public:
ObLCLBatchSenderThread(ObDeadLockDetectorMgr *mgr) :
is_inited_(false),
is_running_(false),
allow_send_(true),
total_record_time_(0),
total_busy_time_(0),
over_night_times_(0),
mgr_(mgr) {}
mgr_(mgr),
random_generator_(std::time(nullptr)),
distribution_(0, 100) {}
~ObLCLBatchSenderThread() { destroy(); }
int init();
int start();
@ -47,8 +54,7 @@ public:
void destroy();
void run1();
public:
int cache_msg(const ObDependencyResource &key,
const ObLCLMessage &lcl_msg);
int cache_msg(const ObDependencyResource &key, const ObLCLMessage &lcl_msg);
TO_STRING_KV(KP(this), K_(is_inited), K_(is_running), K_(total_record_time), K_(over_night_times));
private:
class RemoveIfOp
@ -68,16 +74,22 @@ private:
const ObLCLMessage &lcl_message_;
};
private:
int insert_or_merge_(const ObDependencyResource &key,
const ObLCLMessage &lcl_message,
const bool can_insert);
int64_t update_and_get_lcl_op_interval_();
void record_summary_info_and_logout_when_necessary_(int64_t, int64_t, int64_t);
private:
bool is_inited_;
bool is_running_;
bool allow_send_; // for unittest mock used
int64_t total_record_time_;
int64_t total_busy_time_;
int64_t over_night_times_;
ObDeadLockDetectorMgr* mgr_;
common::ObLinearHashMap<ObDependencyResource, ObLCLMessage> lcl_msg_map_;
RandomGenerator random_generator_;
RandomDistribution distribution_;
};
}

View File

@ -496,9 +496,7 @@ int ObLCLNode::broadcast_(const BlockList &list,
lclv,
public_label,
ObClockGenerator::getRealClock());
if (CLICK() && OB_FAIL(MTL(ObDeadLockDetectorMgr*)->sender_thread_.cache_msg(list.at(idx), msg))) {
DETECT_LOG_(WARN, "cache message failed", KR(ret), K(msg), K(list), K(list), K(*this), K(lbt()));
}
MTL(ObDeadLockDetectorMgr*)->sender_thread_.cache_msg(list.at(idx), msg);
}
return ret;
@ -843,7 +841,7 @@ int ObLCLNode::push_state_to_downstreams_with_lock_()
void ObLCLNode::update_lcl_period_if_necessary_with_lock_()
{
int ret = OB_SUCCESS;
DETECT_TIME_GUARD(100_us);
DETECT_TIME_GUARD(10_ms);
int64_t current_ts = ObClockGenerator::getRealClock();
int64_t new_period_ = current_ts / PERIOD;
int64_t timeout_ts = 0;
@ -869,7 +867,7 @@ bool ObLCLNode::if_phase_match_(const int64_t ts,
int64_t my_phase = ts / PHASE_TIME;
int64_t msg_phase = msg.get_send_ts() / PHASE_TIME;
DETECT_TIME_GUARD(100_us);
DETECT_TIME_GUARD(10_ms);
if (my_phase != msg_phase) {
ret = false;
}

View File

@ -407,13 +407,18 @@ int ObSqlTransControl::do_end_trans_(ObSQLSessionInfo *session,
{
int ret = OB_SUCCESS;
transaction::ObTxDesc *&tx_ptr = session->get_tx_desc();
if (session->is_registered_to_deadlock()) {
if (OB_SUCC(MTL(share::detector::ObDeadLockDetectorMgr*)->unregister_key(tx_ptr->tid()))) {
DETECT_LOG(INFO, "unregister deadlock detector in do end trans", KPC(tx_ptr));
} else {
DETECT_LOG(WARN, "unregister deadlock detector in do end trans failed", KPC(tx_ptr));
}
session->set_registered_to_deadlock(false);
bool is_detector_exist = false;
int tmp_ret = OB_SUCCESS;
if (OB_ISNULL(MTL(share::detector::ObDeadLockDetectorMgr*))) {
tmp_ret = OB_BAD_NULL_ERROR;
DETECT_LOG(WARN, "MTL ObDeadLockDetectorMgr is NULL", K(tmp_ret), K(tx_ptr->tid()));
} else if (OB_TMP_FAIL(MTL(share::detector::ObDeadLockDetectorMgr*)->
check_detector_exist(tx_ptr->tid(), is_detector_exist))) {
DETECT_LOG(WARN, "fail to check detector exist, may causing detector leak", K(tmp_ret),
K(tx_ptr->tid()));
} else if (is_detector_exist) {
ObTransDeadlockDetectorAdapter::unregister_from_deadlock_detector(tx_ptr->tid(),
ObTransDeadlockDetectorAdapter::UnregisterPath::DO_END_TRANS);
}
if (session->associated_xa() && !is_explicit) {
ret = OB_TRANS_XA_RMFAIL;

View File

@ -169,7 +169,6 @@ ObSQLSessionInfo::ObSQLSessionInfo() :
is_table_name_hidden_(false),
piece_cache_(NULL),
is_load_data_exec_session_(false),
is_registered_to_deadlock_(false),
pl_exact_err_msg_(),
is_ps_prepare_stage_(false),
got_conn_res_(false),
@ -197,7 +196,6 @@ int ObSQLSessionInfo::init(uint32_t sessid, uint64_t proxy_sessid,
UNUSED(tenant_id);
int ret = OB_SUCCESS;
static const int64_t PS_BUCKET_NUM = 64;
set_registered_to_deadlock(false);
if (OB_FAIL(ObBasicSessionInfo::init(sessid, proxy_sessid, bucket_allocator, tz_info))) {
LOG_WARN("fail to init basic session info", K(ret));
} else if (!is_acquire_from_pool() &&
@ -325,7 +323,6 @@ void ObSQLSessionInfo::reset(bool skip_sys_var)
prelock_ = false;
proxy_version_ = 0;
min_proxy_version_ps_ = 0;
set_registered_to_deadlock(false);
if (OB_NOT_NULL(mem_context_)) {
destroy_contexts_map(contexts_map_, mem_context_->get_malloc_allocator());
DESTROY_CONTEXT(mem_context_);

View File

@ -963,8 +963,6 @@ public:
cached_tenant_config_info_.refresh();
return cached_tenant_config_info_.get_enable_sql_extension();
}
bool is_registered_to_deadlock() const { return ATOMIC_LOAD(&is_registered_to_deadlock_); }
void set_registered_to_deadlock(bool state) { ATOMIC_SET(&is_registered_to_deadlock_, state); }
bool is_ps_prepare_stage() const { return is_ps_prepare_stage_; }
void set_is_ps_prepare_stage(bool v) { is_ps_prepare_stage_ = v; }
int get_tenant_audit_trail_type(ObAuditTrailType &at_type)
@ -1139,8 +1137,6 @@ private:
bool is_table_name_hidden_;
void *piece_cache_;
bool is_load_data_exec_session_;
// 记录session是否注册过死锁检测的信息
bool is_registered_to_deadlock_;
ObSqlString pl_exact_err_msg_;
bool is_ps_prepare_stage_;
// Record whether this session has got connection resource, which means it increased connections count.

View File

@ -358,7 +358,6 @@ int ObTransDeadlockDetectorAdapter::register_remote_execution_to_deadlock_detect
DETECT_LOG(WARN, "fail to register deadlock", PRINT_WRAPPER);
} else {
MTL(ObDeadLockDetectorMgr*)->set_timeout(self_tx_id, query_timeout);
session_guard->set_registered_to_deadlock(true);
if (OB_FAIL(MTL(ObDeadLockDetectorMgr*)->block(self_tx_id, blocked_resources))) {
DETECT_LOG(WARN, "block on resource failed", PRINT_WRAPPER);
} else if (self_tx_scheduler != GCTX.self_addr()) {
@ -408,8 +407,7 @@ int ObTransDeadlockDetectorAdapter::remote_execution_replace_conflict_trans_ids_
DETECT_LOG(WARN, "replace block list failed", PRINT_WRAPPER);
}
} else {
try_unregister_deadlock_detector_(session_guard.get_session(),
self_tx_id,
unregister_from_deadlock_detector(self_tx_id,
UnregisterPath::REPLACE_MEET_TOTAL_DIFFERENT_LIST);
DETECT_LOG(WARN, "unregister detector cause meet total different block list", PRINT_WRAPPER);
}
@ -426,6 +424,7 @@ int ObTransDeadlockDetectorAdapter::register_remote_execution_or_replace_conflic
CHECK_DEADLOCK_ENABLED();
int ret = OB_SUCCESS;
SessionGuard session_guard;
bool is_detector_exist = false;
if (self_session_id == 1) {
DETECT_LOG(INFO, "inner session no need register to deadlock", PRINT_WRAPPER);
} else if (self_session_id == 0) {
@ -437,7 +436,12 @@ int ObTransDeadlockDetectorAdapter::register_remote_execution_or_replace_conflic
} else if (!session_guard.is_valid()) {
ret = OB_ERR_UNEXPECTED;
DETECT_LOG(ERROR, "fail to get session info", PRINT_WRAPPER);
} else if (!session_guard->is_registered_to_deadlock()) {
} else if (OB_ISNULL(MTL(ObDeadLockDetectorMgr*))) {
ret = OB_ERR_UNEXPECTED;
DETECT_LOG(ERROR, "MTL ObDeadLockDetectorMgr is NULL", PRINT_WRAPPER);
} else if (OB_FAIL(MTL(ObDeadLockDetectorMgr*)->check_detector_exist(self_tx_id, is_detector_exist))) {
DETECT_LOG(WARN, "fail to get detector exist status", PRINT_WRAPPER);
} else if (!is_detector_exist) {
if (OB_FAIL(register_remote_execution_to_deadlock_detector_(self_tx_id, self_session_id, conflict_tx_ids, session_guard))) {
DETECT_LOG(WARN, "register new detector in remote execution failed", PRINT_WRAPPER);
} else {
@ -646,15 +650,15 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob
// no trans opened, for example:read-only trans
} else if (is_rollback) {// statment is failed, maybe will try again, check if need register to deadlock detector
if (session->get_query_timeout_ts() < ObClockGenerator::getCurrentTime()) {
try_unregister_deadlock_detector_(*session, desc->tid(), UnregisterPath::END_STMT_TIMEOUT);
unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_TIMEOUT);
DETECT_LOG(INFO, "query timeout, no need register to deadlock", PRINT_WRAPPER);
} else if (OB_FAIL(desc->fetch_conflict_txs(conflict_txs))) {
DETECT_LOG(WARN, "fail to get conflict txs from desc", PRINT_WRAPPER);
} else if (conflict_txs.empty()) {
try_unregister_deadlock_detector_(*session, desc->tid(), UnregisterPath::END_STMT_NO_CONFLICT);
unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_NO_CONFLICT);
DETECT_LOG(INFO, "try unregister deadlock detecotr cause conflict array is empty", PRINT_WRAPPER);
} else if (exec_ctx.get_errcode() != OB_TRY_LOCK_ROW_CONFLICT) {
try_unregister_deadlock_detector_(*session, desc->tid(), UnregisterPath::END_STMT_OTHER_ERR);
unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_OTHER_ERR);
DETECT_LOG(INFO, "try unregister deadlock detecotr cause meet non-lock error", PRINT_WRAPPER);
} else if (OB_FAIL(register_remote_execution_or_replace_conflict_trans_ids(desc->tid(),
session->get_sessid(),
@ -662,10 +666,10 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob
DETECT_LOG(WARN, "register or replace list failed", PRINT_WRAPPER);
} else {
desc->reset_conflict_txs();
DETECT_LOG(INFO, "maintain deadlock info when end_stmt", PRINT_WRAPPER);
DETECT_LOG(TRACE, "maintain deadlock info when end_stmt", PRINT_WRAPPER);
}
} else {// statment is done, will not try again, all related deadlock info should be resetted
try_unregister_deadlock_detector_(*session, desc->tid(), UnregisterPath::END_STMT_DONE);
unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_DONE);
DETECT_LOG(TRACE, "unregister from deadlock detector", KR(ret), K(desc->tid()));
}
return ret;
@ -814,26 +818,17 @@ void ObTransDeadlockDetectorAdapter::unregister_from_deadlock_detector(const ObT
{
int ret = common::OB_SUCCESS;
ObDeadLockDetectorMgr *mgr = nullptr;
if (ObDeadLockDetectorMgr::is_deadlock_enabled()) {
if (nullptr == (mgr = MTL(ObDeadLockDetectorMgr*))) {
ret = OB_ERR_UNEXPECTED;
DETECT_LOG(WARN, "fail to get ObDeadLockDetectorMgr", K(self_trans_id), K(to_string(path)));
} else if (OB_FAIL(mgr->unregister_key(self_trans_id))) {
if (nullptr == (mgr = MTL(ObDeadLockDetectorMgr*))) {
ret = OB_ERR_UNEXPECTED;
DETECT_LOG(WARN, "fail to get ObDeadLockDetectorMgr", K(self_trans_id), K(to_string(path)));
} else if (OB_FAIL(mgr->unregister_key(self_trans_id))) {
if (OB_ENTRY_NOT_EXIST != ret) {
DETECT_LOG(WARN, "unregister from deadlock detector failed", K(self_trans_id), K(to_string(path)));
} else {
DETECT_LOG(INFO, "unregister from deadlock detector success", K(self_trans_id), K(to_string(path)));
ret = OB_SUCCESS;// it's ok if detector not exist
}
}
}
void ObTransDeadlockDetectorAdapter::try_unregister_deadlock_detector_(sql::ObSQLSessionInfo &session,
const ObTransID &trans_id,
UnregisterPath path)
{
if (OB_UNLIKELY(session.is_registered_to_deadlock())) {
unregister_from_deadlock_detector(trans_id, path);
session.set_registered_to_deadlock(false);
DETECT_LOG(INFO, "unregister from deadlock detector", K(trans_id), K(to_string(path)));
} else {
DETECT_LOG(INFO, "unregister from deadlock detector success", K(self_trans_id), K(to_string(path)));
}
}

View File

@ -76,7 +76,8 @@ class ObTransDeadlockDetectorAdapter
END_STMT_OTHER_ERR,
END_STMT_NO_CONFLICT,
END_STMT_TIMEOUT,
REPLACE_MEET_TOTAL_DIFFERENT_LIST
REPLACE_MEET_TOTAL_DIFFERENT_LIST,
DO_END_TRANS,
};
static const char* to_string(const UnregisterPath path)
{
@ -99,6 +100,8 @@ class ObTransDeadlockDetectorAdapter
return "END_STMT_TIMEOUT";
case UnregisterPath::REPLACE_MEET_TOTAL_DIFFERENT_LIST:
return "REPLACE_MEET_TOTAL_DIFFERENT_LIST";
case UnregisterPath::DO_END_TRANS:
return "DO_END_TRANS";
default:
return "UNKNOWN";
}

View File

@ -1,2 +1,3 @@
storage_unittest(test_key_wrapper)
storage_unittest(test_deadlock_utility)
storage_unittest(test_lcl_message_drop)

View File

@ -0,0 +1,257 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "lib/ob_errno.h"
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#define private public
#define protected public
#include "share/deadlock/ob_deadlock_key_wrapper.h"
#include "share/deadlock/ob_deadlock_detector_mgr.h"
#include "test_key.h"
#include <string>
#include <iostream>
#include <thread>
#include <chrono>
#include <algorithm>
#include "share/deadlock/ob_lcl_scheme/ob_lcl_batch_sender_thread.h"
#include "share/deadlock/ob_lcl_scheme/ob_lcl_message.h"
#include "storage/tx/ob_trans_define_v4.h"
namespace oceanbase {
namespace unittest {
using namespace common;
using namespace share::detector;
using namespace std;
class TestLCLMsgDrop : public ::testing::Test {
public:
TestLCLMsgDrop() {}
~TestLCLMsgDrop() {}
virtual void SetUp() { share::ObTenantEnv::get_tenant_local()->id_ = 1; }
virtual void TearDown() {}
static ObLCLBatchSenderThread batch_sender_;
int port;
};
ObLCLBatchSenderThread TestLCLMsgDrop::batch_sender_(nullptr);
TEST_F(TestLCLMsgDrop, always_keep) {// the first 2048 will always success
ASSERT_EQ(OB_SUCCESS, batch_sender_.init());
ASSERT_EQ(OB_SUCCESS, batch_sender_.start());
ATOMIC_STORE(&batch_sender_.allow_send_, false);
ObLCLMessage mock_message;
ObAddr mock_addr(ObAddr::VER::IPV4, "127.0.0.1", 0);
UserBinaryKey mock_self_key, mock_dest_key;
ObLCLLabel mock_lcl_label(1, ObDetectorPriority(1));
mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1);
ASSERT_EQ(OB_SUCCESS, mock_self_key.set_user_key(transaction::ObTransID(1)));
ASSERT_EQ(OB_SUCCESS, mock_dest_key.set_user_key(transaction::ObTransID(1)));
for (int i = 0; i < LCL_MSG_CACHE_LIMIT / 2; ++i) {
mock_addr.set_port(i + 1);
ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr,
mock_dest_key,
ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1),
mock_self_key,
0,
mock_lcl_label,
1));
ObDependencyResource mock_resource(mock_addr, mock_dest_key);
ObDependencyResource mock_resource2(mock_addr, mock_dest_key);
OB_ASSERT(mock_resource == mock_resource2);
ASSERT_EQ(OB_SUCCESS, batch_sender_.cache_msg(mock_resource, mock_message));
ObLCLMessage read_message;
ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.get(mock_resource, read_message));
DETECT_LOG(INFO, "print mock resource message", K(read_message));
}
}
TEST_F(TestLCLMsgDrop, random_drop_25_percentage) {
int ret = OB_SUCCESS;
ObLCLMessage mock_message;
ObAddr mock_addr(ObAddr::VER::IPV4, "127.0.0.1", 0);
UserBinaryKey mock_self_key, mock_dest_key;
ObLCLLabel mock_lcl_label(1, ObDetectorPriority(1));
mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1);
ASSERT_EQ(OB_SUCCESS, mock_self_key.set_user_key(transaction::ObTransID(1)));
ASSERT_EQ(OB_SUCCESS, mock_dest_key.set_user_key(transaction::ObTransID(1)));
for (int i = LCL_MSG_CACHE_LIMIT / 2; i < LCL_MSG_CACHE_LIMIT / 2 + LCL_MSG_CACHE_LIMIT / 8; ++i) {
mock_addr.set_port(i + 1);
ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr,
mock_dest_key,
ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1),
mock_self_key,
0,
mock_lcl_label,
1));
ObDependencyResource mock_resource(mock_addr, mock_dest_key);
ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.insert(mock_resource, mock_message));
}
mock_addr.set_port(65535);
int fail_times = 0, succ_times = 0;
for (int i = 0; i < 100000; ++i) {
ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr,
mock_dest_key,
ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1),
mock_self_key,
0,
mock_lcl_label,
1));
ObDependencyResource mock_resource(mock_addr, mock_dest_key);
if (OB_SUCC(batch_sender_.cache_msg(mock_resource, mock_message))) {
++succ_times;
batch_sender_.lcl_msg_map_.erase(mock_resource);
} else {
++fail_times;
}
}
int fail_percentage = fail_times * 100 / (succ_times + fail_times);
ASSERT_GE(fail_percentage, 20);
ASSERT_LE(fail_percentage, 30);
DETECT_LOG(INFO, "print drop percentage", K(fail_percentage));
}
TEST_F(TestLCLMsgDrop, random_drop_75_percentage) {
int ret = OB_SUCCESS;
ObLCLMessage mock_message;
ObAddr mock_addr(ObAddr::VER::IPV4, "127.0.0.1", 0);
UserBinaryKey mock_self_key, mock_dest_key;
ObLCLLabel mock_lcl_label(1, ObDetectorPriority(1));
mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1);
ASSERT_EQ(OB_SUCCESS, mock_self_key.set_user_key(transaction::ObTransID(1)));
ASSERT_EQ(OB_SUCCESS, mock_dest_key.set_user_key(transaction::ObTransID(1)));
for (int i = LCL_MSG_CACHE_LIMIT / 2 + LCL_MSG_CACHE_LIMIT / 8; i < LCL_MSG_CACHE_LIMIT / 2 + LCL_MSG_CACHE_LIMIT * 3 / 8; ++i) {
mock_addr.set_port(i + 1);
ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr,
mock_dest_key,
ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1),
mock_self_key,
0,
mock_lcl_label,
1));
ObDependencyResource mock_resource(mock_addr, mock_dest_key);
ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.insert(mock_resource, mock_message));
}
mock_addr.set_port(65535);
int fail_times = 0, succ_times = 0;
for (int i = 0; i < 100000; ++i) {
ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr,
mock_dest_key,
ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1),
mock_self_key,
0,
mock_lcl_label,
1));
ObDependencyResource mock_resource(mock_addr, mock_dest_key);
if (OB_SUCC(batch_sender_.cache_msg(mock_resource, mock_message))) {
++succ_times;
batch_sender_.lcl_msg_map_.erase(mock_resource);
} else {
++fail_times;
}
}
int fail_percentage = fail_times * 100 / (succ_times + fail_times);
ASSERT_GE(fail_percentage, 70);
ASSERT_LE(fail_percentage, 80);
DETECT_LOG(INFO, "print drop percentage", K(fail_percentage));
}
TEST_F(TestLCLMsgDrop, always_drop) {
int ret = OB_SUCCESS;
ObLCLMessage mock_message;
ObAddr mock_addr(ObAddr::VER::IPV4, "127.0.0.1", 0);
UserBinaryKey mock_self_key, mock_dest_key;
ObLCLLabel mock_lcl_label(1, ObDetectorPriority(1));
mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1);
ASSERT_EQ(OB_SUCCESS, mock_self_key.set_user_key(transaction::ObTransID(1)));
ASSERT_EQ(OB_SUCCESS, mock_dest_key.set_user_key(transaction::ObTransID(1)));
for (int i = LCL_MSG_CACHE_LIMIT / 2 + LCL_MSG_CACHE_LIMIT * 3 / 8; i < LCL_MSG_CACHE_LIMIT; ++i) {
mock_addr.set_port(i + 1);
ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr,
mock_dest_key,
ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1),
mock_self_key,
0,
mock_lcl_label,
1));
ObDependencyResource mock_resource(mock_addr, mock_dest_key);
ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.insert(mock_resource, mock_message));
}
mock_addr.set_port(65535);
for (int i = 0; i < 100000; ++i) {
ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr,
mock_dest_key,
ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1),
mock_self_key,
0,
mock_lcl_label,
1));
ObDependencyResource mock_resource(mock_addr, mock_dest_key);
ASSERT_EQ(OB_BUF_NOT_ENOUGH, batch_sender_.cache_msg(mock_resource, mock_message));
}
}
TEST_F(TestLCLMsgDrop, message_merge_when_reach_limit) {
int ret = OB_SUCCESS;
ObLCLMessage mock_message;
ObLCLMessage read_message;
ObAddr mock_addr(ObAddr::VER::IPV4, "127.0.0.1", 1);
UserBinaryKey mock_self_key, mock_dest_key;
ObLCLLabel mock_lcl_label(0, ObDetectorPriority(0));
mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1);
ASSERT_EQ(OB_SUCCESS, mock_self_key.set_user_key(transaction::ObTransID(1)));
ASSERT_EQ(OB_SUCCESS, mock_dest_key.set_user_key(transaction::ObTransID(1)));
ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr,
mock_dest_key,
ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1),
mock_self_key,
0,
mock_lcl_label,
1));
ObDependencyResource mock_resource(mock_addr, mock_dest_key);
ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.get(mock_resource, read_message));
DETECT_LOG(INFO, "print mock resource message", K(read_message));
ASSERT_EQ(OB_SUCCESS, batch_sender_.cache_msg(mock_resource, mock_message));
ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.get(mock_resource, read_message));
ASSERT_EQ(0, read_message.label_.id_);
new (&mock_lcl_label) ObLCLLabel(1, ObDetectorPriority(1));
mock_lcl_label.addr_ = ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1);
ASSERT_EQ(OB_SUCCESS, mock_message.set_args(mock_addr,
mock_dest_key,
ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 1),
mock_self_key,
1,
mock_lcl_label,
1));
ASSERT_EQ(OB_SUCCESS, batch_sender_.cache_msg(mock_resource, mock_message));
ASSERT_EQ(OB_SUCCESS, batch_sender_.lcl_msg_map_.get(mock_resource, read_message));
ASSERT_EQ(1, read_message.label_.id_);
}
}// namespace unittest
}// namespace oceanbase
int main(int argc, char **argv)
{
system("rm -rf test_lcl_message_drop.log");
oceanbase::common::ObLogger &logger = oceanbase::common::ObLogger::get_logger();
logger.set_file_name("test_lcl_message_drop.log", false);
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}