[DETECT] add count down state to adapt sql retry deadlock
This commit is contained in:
@ -9,6 +9,7 @@
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <stdlib.h>
|
||||
#define USING_LOG_PREFIX STORAGE
|
||||
@ -332,7 +333,6 @@ TEST_F(ObTestMdsBeforeRecover, before_recover_test)
|
||||
do_recycle_and_gc_mds_table(TEST_LS_ID, TEST_TABLET_ID);
|
||||
get_latest_scn(TEST_LS_ID, TEST_TABLET_ID, scn_from_table);
|
||||
read_virtual_mds_stat(TEST_LS_ID, TEST_TABLET_ID, RESULT);
|
||||
OCCAM_LOG(INFO, "xuwang.txw debug", K(RESULT), K(TEST_LS_ID), K(TEST_TABLET_ID));
|
||||
ASSERT_EQ(OB_SUCCESS, LAST_SCN.convert_for_inner_table_field(scn_from_table));
|
||||
advance_checkpoint(TEST_LS_ID, LAST_SCN);
|
||||
write_result_to_file(BEFORE_RECOVER_RESULT_FILE);
|
||||
@ -372,7 +372,6 @@ TEST_F(ObTestMdsAfterRecover, after_recover_test)
|
||||
{
|
||||
read_virtual_mds_stat(TEST_LS_ID, TEST_TABLET_ID, RESULT);// to fill cache
|
||||
read_virtual_mds_stat(TEST_LS_ID, TEST_TABLET_ID, RESULT);
|
||||
OCCAM_LOG(INFO, "xuwang.txw debug", K(RESULT), K(TEST_LS_ID), K(TEST_TABLET_ID));
|
||||
write_result_to_file(AFTER_RECOVER_RESULT_FILE);
|
||||
ASSERT_EQ(true, compare_before_and_after_recover_results(BEFORE_RECOVER_RESULT_FILE, AFTER_RECOVER_RESULT_FILE));
|
||||
}
|
||||
|
||||
@ -9,6 +9,9 @@
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef DEBUG_FOR_MDS
|
||||
#define DEBUG_FOR_MDS
|
||||
#include "lib/ob_errno.h"
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
@ -214,7 +217,6 @@ TEST_F(TestMdsTransactionTest, test_mds_table_gc_and_recycle)
|
||||
// 5. 从LS找到tablet结构
|
||||
storage::ObTabletHandle tablet_handle;
|
||||
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle));
|
||||
MDS_LOG(ERROR, "xuwang.txw debug", K(tablet_id));
|
||||
// 6. 调用tablet接口写入多源数据,提交
|
||||
MdsCtx ctx1(mds::MdsWriter(ObTransID(1)));
|
||||
share::SCN rec_scn;
|
||||
@ -226,7 +228,6 @@ TEST_F(TestMdsTransactionTest, test_mds_table_gc_and_recycle)
|
||||
// ASSERT_EQ(OB_SUCCESS, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.get_rec_scn(rec_scn));
|
||||
// ASSERT_EQ(mock_scn(10), rec_scn);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
MDS_LOG(ERROR, "xuwang.txw debug1");
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->mds_table_flush(share::SCN::max_scn()));
|
||||
// 7. 检查mds table的存在情况
|
||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
@ -282,3 +283,4 @@ int main(int argc, char **argv)
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
#endif
|
||||
@ -62,6 +62,7 @@ public:
|
||||
virtual void set_timeout(const uint64_t timeout) = 0;
|
||||
virtual int register_timer_task() = 0;
|
||||
virtual void unregister_timer_task() = 0;
|
||||
virtual void dec_count_down_allow_detect() = 0;
|
||||
virtual int64_t to_string(char *buffer, const int64_t length) const = 0;// for debugging
|
||||
virtual const ObDetectorPriority &get_priority() const = 0;// return detector's priority
|
||||
// build a directed dependency relationship to other
|
||||
|
||||
@ -81,6 +81,7 @@ int ObDeadLockDetectorMgr::InnerAllocHandle::InnerFactory::create(const UserBina
|
||||
const CollectCallBack &on_collect_operation,
|
||||
const ObDetectorPriority &priority,
|
||||
const uint64_t start_delay,
|
||||
const uint32_t count_down_allow_detect,
|
||||
const bool auto_activate_when_detected,
|
||||
ObIDeadLockDetector *&p_detector)
|
||||
{
|
||||
@ -104,6 +105,7 @@ int ObDeadLockDetectorMgr::InnerAllocHandle::InnerFactory::create(const UserBina
|
||||
on_collect_operation,
|
||||
priority,
|
||||
start_delay,
|
||||
count_down_allow_detect,
|
||||
auto_activate_when_detected);
|
||||
if (false == static_cast<ObLCLNode*>(p_detector)->
|
||||
is_successfully_constructed()) {
|
||||
@ -415,6 +417,7 @@ int ObDeadLockDetectorMgr::process_notify_parent_message(
|
||||
},
|
||||
ObDetectorPriority(PRIORITY_RANGE::EXTREMELY_HIGH, 0),
|
||||
0,
|
||||
0,
|
||||
true,
|
||||
p_detector))) {
|
||||
DETECT_LOG(WARN, "create new detector instance failed", PRINT_WRAPPER);
|
||||
|
||||
@ -96,6 +96,7 @@ public:
|
||||
const CollectCallBack &on_collect_operation,
|
||||
const ObDetectorPriority &priority = ObDetectorPriority(0),
|
||||
const uint64_t start_delay = 0,
|
||||
const uint32_t count_down_allow_detect = 0,
|
||||
const bool auto_activate_when_detected = true);
|
||||
template<typename KeyType1, typename KeyType2>
|
||||
int add_parent(const KeyType1 &key, const KeyType2 &parent_key);
|
||||
@ -130,6 +131,8 @@ public:
|
||||
const common::ObIArray<ObDependencyResource> &new_list);
|
||||
template<typename T>
|
||||
int get_block_list(const T &src_key, common::ObIArray<ObDependencyResource> &cur_list);
|
||||
template<typename T>
|
||||
int dec_count_down_allow_detect(const T &src_key);
|
||||
// remove directed dependency relationship between two detector
|
||||
template<typename T1, typename T2>
|
||||
int activate(const T1 &src_key, const T2 &dest_key);
|
||||
@ -164,6 +167,7 @@ private:
|
||||
const CollectCallBack &on_collect_operation,
|
||||
const ObDetectorPriority &priority,
|
||||
const uint64_t start_delay,
|
||||
const uint32_t count_down_allow_detect,
|
||||
const bool auto_activate_when_detected,
|
||||
ObIDeadLockDetector *&p_detector);
|
||||
void release(ObIDeadLockDetector *p_detector);
|
||||
@ -236,6 +240,7 @@ int ObDeadLockDetectorMgr::register_key(const KeyType &key,
|
||||
const CollectCallBack &on_collect_operation,
|
||||
const ObDetectorPriority &priority,
|
||||
const uint64_t start_delay,
|
||||
const uint32_t count_down_allow_detect,
|
||||
const bool auto_activate_when_detected)
|
||||
{
|
||||
CHECK_INIT();
|
||||
@ -259,6 +264,7 @@ int ObDeadLockDetectorMgr::register_key(const KeyType &key,
|
||||
on_collect_operation,
|
||||
priority,
|
||||
start_delay,
|
||||
count_down_allow_detect,
|
||||
auto_activate_when_detected,
|
||||
p_detector))) {
|
||||
DETECT_LOG(WARN, "create new detector instance failed", PRINT_WRAPPER, KP(p_detector));
|
||||
@ -499,6 +505,28 @@ int ObDeadLockDetectorMgr::get_block_list(const T &src_key,
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
int ObDeadLockDetectorMgr::dec_count_down_allow_detect(const T &src_key)
|
||||
{
|
||||
CHECK_INIT();
|
||||
CHECK_ENABLED();
|
||||
#define PRINT_WRAPPER KR(ret)
|
||||
int ret = common::OB_SUCCESS;
|
||||
DetectorRefGuard ref_guard;
|
||||
UserBinaryKey src_user_key;
|
||||
|
||||
if (OB_FAIL(src_user_key.set_user_key(src_key))) {
|
||||
DETECT_LOG(WARN, "src_key serialzation failed", PRINT_WRAPPER);
|
||||
} else if (OB_FAIL(get_detector_(src_user_key, ref_guard))) {
|
||||
DETECT_LOG(WARN, "get_detector failed", PRINT_WRAPPER);
|
||||
} else {
|
||||
ref_guard.get_detector()->dec_count_down_allow_detect();
|
||||
}
|
||||
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
// call for removing directed dependency relationship between two detector(both in local)
|
||||
// thread-safe guaranteed
|
||||
//
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include "lib/atomic/ob_atomic.h"
|
||||
#include "share/config/ob_server_config.h"
|
||||
#include <algorithm>
|
||||
#include "share/ob_occam_time_guard.h"
|
||||
@ -42,6 +43,7 @@ ObLCLNode::ObLCLNode(const UserBinaryKey &user_key,
|
||||
const CollectCallBack &on_collect_operation,
|
||||
const ObDetectorPriority &priority,
|
||||
const uint64_t start_delay,
|
||||
const uint32_t count_down_allow_detect,
|
||||
const bool auto_activate_when_detected)
|
||||
:push_state_task_(*this),
|
||||
self_key_(user_key),
|
||||
@ -56,6 +58,7 @@ ObLCLNode::ObLCLNode(const UserBinaryKey &user_key,
|
||||
lcl_period_(0),
|
||||
last_report_waiting_for_period_(0),
|
||||
last_send_collect_info_period_(0),
|
||||
count_down_allow_detect_(count_down_allow_detect),
|
||||
lock_(ObLatchIds::DEADLOCK_LOCK)
|
||||
{
|
||||
#define PRINT_WRAPPER K(*this), K(resource_id), K(on_detect_operation),\
|
||||
@ -131,6 +134,18 @@ void ObLCLNode::unregister_timer_task()
|
||||
ATOMIC_STORE(&is_timer_task_canceled_, true);
|
||||
}
|
||||
|
||||
void ObLCLNode::dec_count_down_allow_detect()
|
||||
{
|
||||
uint32_t cnt = 0;
|
||||
bool cas_failed = false;
|
||||
do {
|
||||
cnt = ATOMIC_LOAD(&count_down_allow_detect_);
|
||||
if (cnt > 0) {
|
||||
cas_failed = (ATOMIC_CAS(&count_down_allow_detect_, cnt, cnt - 1) != cnt);
|
||||
}
|
||||
} while (cnt > 0 && cas_failed);
|
||||
}
|
||||
|
||||
int ObLCLNode::register_timer_with_necessary_retry_with_lock_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -805,7 +820,11 @@ int ObLCLNode::push_state_to_downstreams_with_lock_()
|
||||
}
|
||||
|
||||
CLICK();
|
||||
if (OB_FAIL(broadcast_(blocklist_snapshot, lclv_snapshot, public_label_snapshot))) {
|
||||
if (ATOMIC_LOAD(&count_down_allow_detect_) != 0) {
|
||||
DETECT_LOG_(INFO, "not allow do detect cause count_down_allow_detect_ is not dec to 0 yet",
|
||||
K(blocklist_snapshot), K(lclv_snapshot), K(public_label_snapshot),
|
||||
K(*this));
|
||||
} else if (OB_FAIL(broadcast_(blocklist_snapshot, lclv_snapshot, public_label_snapshot))) {
|
||||
DETECT_LOG_(WARN, "boardcast failed",
|
||||
K(blocklist_snapshot), K(lclv_snapshot), K(public_label_snapshot),
|
||||
K(*this));
|
||||
|
||||
@ -97,6 +97,7 @@ public:
|
||||
const CollectCallBack &on_collect_operation,
|
||||
const ObDetectorPriority &priority,
|
||||
const uint64_t start_delay,
|
||||
const uint32_t count_down_allow_detect,
|
||||
const bool auto_activate_when_detected);
|
||||
~ObLCLNode();
|
||||
public:
|
||||
@ -104,6 +105,7 @@ public:
|
||||
void set_timeout(const uint64_t timeout) override;
|
||||
int register_timer_task() override;
|
||||
void unregister_timer_task() override;
|
||||
void dec_count_down_allow_detect() override;
|
||||
bool is_successfully_constructed() const { return successfully_constructed_; }
|
||||
const ObDetectorPriority &get_priority() const override;// return detector's priority
|
||||
// build a directed dependency relationship to other
|
||||
@ -124,7 +126,8 @@ public:
|
||||
K_(public_label), K_(detect_callback),
|
||||
K_(auto_activate_when_detected), KTIME_(created_time), KTIME_(allow_detect_time),
|
||||
K_(is_timer_task_canceled), K_(block_list), K_(parent_list),
|
||||
K_(lcl_period), K_(last_send_collect_info_period), K(block_callback_list_.count()))
|
||||
K_(lcl_period), K_(last_send_collect_info_period), K(block_callback_list_.count()),
|
||||
K(ATOMIC_LOAD(&count_down_allow_detect_)))
|
||||
private:
|
||||
class PushStateTask : public common::ObTimeWheelTask
|
||||
{
|
||||
@ -183,6 +186,7 @@ private:
|
||||
int64_t last_report_waiting_for_period_;
|
||||
int64_t last_send_collect_info_period_;
|
||||
bool successfully_constructed_;
|
||||
uint32_t count_down_allow_detect_;
|
||||
mutable common::ObSpinLock lock_;
|
||||
};
|
||||
|
||||
|
||||
@ -232,7 +232,6 @@ bool ObLockWaitMgr::post_process(bool need_retry, bool& need_wait)
|
||||
}
|
||||
if (need_retry) {
|
||||
if ((need_wait = node->need_wait())) {
|
||||
TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR = true;
|
||||
// FIXME(xuwang.txw):create detector in check_timeout process
|
||||
// below code must keep current order to fix concurrency bug
|
||||
// more info see
|
||||
@ -578,6 +577,7 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
|
||||
ObFunction<int(bool&, bool&)> &rechecker)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR = false;
|
||||
Node *node = NULL;
|
||||
if (OB_NOT_NULL(node = get_thread_node())) {
|
||||
Key key(&row_key);
|
||||
@ -609,6 +609,7 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
|
||||
tx_id,
|
||||
holder_tx_id);
|
||||
node->set_need_wait();
|
||||
TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR = true;// to tell end_stmt() not register deadlock
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -629,6 +630,7 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
|
||||
ObFunction<int(bool&)> &check_need_wait)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR = false;
|
||||
Node *node = NULL;
|
||||
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
@ -665,6 +667,7 @@ int ObLockWaitMgr::post_lock(const int tmp_ret,
|
||||
holder_tx_id);
|
||||
node->set_need_wait();
|
||||
node->set_lock_mode(lock_mode);
|
||||
TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR = true;// to tell end_stmt() not register deadlock
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -365,8 +365,7 @@ int UserMdsNode<K, V>::fill_event_(observer::MdsEvent &event,
|
||||
if (FALSE_IT(databuff_printf(stack_buffer, buffer_size, pos, "%s", to_cstring(user_data_)))) {
|
||||
} else if (FALSE_IT(event.info_str_.assign_ptr(stack_buffer, pos))) {
|
||||
} else if (FALSE_IT(last_pos = pos)) {
|
||||
} else if (MDS_FAIL(databuff_printf(stack_buffer, buffer_size, pos, "%s", p_mds_row_ ? to_cstring(*p_mds_row_->key_) : "NULL"))) {
|
||||
MDS_LOG(WARN, "fail to_string user key", K(*this));
|
||||
} else if (FALSE_IT(databuff_printf(stack_buffer, buffer_size, pos, "%s", p_mds_row_ ? to_cstring(*p_mds_row_->key_) : "NULL"))) {
|
||||
} else if (FALSE_IT(event.key_str_.assign_ptr(&stack_buffer[last_pos], pos - last_pos))) {
|
||||
} else if (FALSE_IT(last_pos = pos)) {
|
||||
} else {
|
||||
|
||||
@ -354,7 +354,8 @@ int ObTransDeadlockDetectorAdapter::register_remote_execution_to_deadlock_detect
|
||||
on_detect_op,
|
||||
on_collect_op,
|
||||
~session_guard->get_tx_desc()->get_active_ts(),
|
||||
3_s))) {
|
||||
3_s,
|
||||
10))) {
|
||||
DETECT_LOG(WARN, "fail to register deadlock", PRINT_WRAPPER);
|
||||
} else {
|
||||
MTL(ObDeadLockDetectorMgr*)->set_timeout(self_tx_id, query_timeout);
|
||||
@ -406,6 +407,7 @@ int ObTransDeadlockDetectorAdapter::remote_execution_replace_conflict_trans_ids_
|
||||
if (OB_FAIL(MTL(ObDeadLockDetectorMgr*)->replace_block_list(self_tx_id, blocked_resources))) {
|
||||
DETECT_LOG(WARN, "replace block list failed", PRINT_WRAPPER);
|
||||
}
|
||||
(void) MTL(ObDeadLockDetectorMgr*)->dec_count_down_allow_detect(self_tx_id);
|
||||
} else {
|
||||
unregister_from_deadlock_detector(self_tx_id,
|
||||
UnregisterPath::REPLACE_MEET_TOTAL_DIFFERENT_LIST);
|
||||
@ -642,6 +644,8 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob
|
||||
DETECT_LOG(ERROR, "session is NULL", PRINT_WRAPPER);
|
||||
} else if (++step && session->is_inner()) {
|
||||
DETECT_LOG(TRACE, "inner session no need register to deadlock", PRINT_WRAPPER);
|
||||
} else if (++step && memtable::TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR) {
|
||||
DETECT_LOG(TRACE, "will register deadlock in LockWaitMgr::post_process after end_stmt()", PRINT_WRAPPER);
|
||||
} else if (++step && OB_ISNULL(desc = session->get_tx_desc())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
DETECT_LOG(ERROR, "desc in session is NULL", PRINT_WRAPPER);
|
||||
@ -659,8 +663,6 @@ int ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(sql::Ob
|
||||
} else if (++step && exec_ctx.get_errcode() != OB_TRY_LOCK_ROW_CONFLICT) {
|
||||
unregister_from_deadlock_detector(desc->tid(), UnregisterPath::END_STMT_OTHER_ERR);
|
||||
DETECT_LOG(INFO, "try unregister deadlock detecotr cause meet non-lock error", PRINT_WRAPPER);
|
||||
} else if (++step && memtable::TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR) {
|
||||
DETECT_LOG(INFO, "thread local flag marked local execution, no need register to deadlock here", PRINT_WRAPPER);
|
||||
} else if (++step && OB_FAIL(register_remote_execution_or_replace_conflict_trans_ids(desc->tid(),
|
||||
session->get_sessid(),
|
||||
conflict_txs))) {
|
||||
|
||||
Reference in New Issue
Block a user