fix ObLSTxCtxMgr deadlock
This commit is contained in:
@ -199,7 +199,7 @@ int ObLSTxCtxMgr::init(const int64_t tenant_id,
|
||||
|
||||
void ObLSTxCtxMgr::destroy()
|
||||
{
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
if (IS_INIT) {
|
||||
ls_log_writer_.destroy();
|
||||
is_inited_ = false;
|
||||
@ -249,7 +249,6 @@ int ObLSTxCtxMgr::process_callback_(ObIArray<ObTxCommitCallback> &cb_array) cons
|
||||
|
||||
void ObLSTxCtxMgr::print_all_tx_ctx(const int64_t max_print, const bool verbose)
|
||||
{
|
||||
RLockGuard guard(rwlock_);
|
||||
print_all_tx_ctx_(max_print, verbose);
|
||||
}
|
||||
|
||||
@ -510,7 +509,6 @@ int ObLSTxCtxMgr::get_tx_ctx_(const ObTransID &tx_id, const bool for_replay, ObP
|
||||
int ObLSTxCtxMgr::iterator_tx_id_in_one_bucket(ObTxIDIterator& iter, int bucket_pos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
RLockGuard guard(rwlock_);
|
||||
|
||||
IteratorTxIDFunctor fn(iter);
|
||||
if (OB_FAIL(ls_tx_ctx_map_.for_each_in_one_bucket(fn, bucket_pos))) {
|
||||
@ -524,7 +522,6 @@ int ObLSTxCtxMgr::iterator_tx_id_in_one_bucket(ObTxIDIterator& iter, int bucket_
|
||||
int ObLSTxCtxMgr::iterator_tx_id(ObTxIDIterator& iter)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
RLockGuard guard(rwlock_);
|
||||
|
||||
IteratorTxIDFunctor fn(iter);
|
||||
if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
|
||||
@ -588,7 +585,7 @@ int ObLSTxCtxMgr::replay_start_working_log(const ObTxStartWorkingLog &log, int64
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
UNUSED(log);
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
ReplayTxStartWorkingLogFunctor fn(start_working_ts);
|
||||
if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
|
||||
TRANS_LOG(WARN, "[LsTxCtxMgr Role Change] replay start working log failed", KR(ret), K(ls_id_));
|
||||
@ -601,7 +598,7 @@ int ObLSTxCtxMgr::replay_start_working_log(const ObTxStartWorkingLog &log, int64
|
||||
int ObLSTxCtxMgr::on_start_working_log_cb_succ(int64_t start_working_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
StateHelper state_helper(ls_id_, state_);
|
||||
if (State::T_PENDING == state_ || State::T_BLOCKED_PENDING == state_) {
|
||||
SwitchToLeaderFunctor fn(start_working_ts);
|
||||
@ -640,7 +637,7 @@ int ObLSTxCtxMgr::on_start_working_log_cb_succ(int64_t start_working_ts)
|
||||
int ObLSTxCtxMgr::on_start_working_log_cb_fail()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
StateHelper state_helper(ls_id_, state_);
|
||||
if (OB_FAIL(state_helper.switch_state(Ops::SWL_CB_FAIL))) {
|
||||
TRANS_LOG(WARN, "switch state fail", KR(ret), K(tenant_id_), K(ls_id_));
|
||||
@ -666,7 +663,7 @@ int ObLSTxCtxMgr::switch_to_follower_forcedly()
|
||||
ObTimeGuard timeguard("ObLSTxCtxMgr::switch_to_follower_forcedly");
|
||||
ObSEArray<ObTxCommitCallback, 4> cb_array;
|
||||
{
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
StateHelper state_helper(ls_id_, state_);
|
||||
if (IS_NOT_INIT) {
|
||||
TRANS_LOG(ERROR, "ObLSTxCtxMgr not inited", K(ls_id_));
|
||||
@ -739,7 +736,7 @@ int ObLSTxCtxMgr::try_wait_gts_and_inc_max_commit_ts_()
|
||||
int ObLSTxCtxMgr::switch_to_leader()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
StateHelper state_helper(ls_id_, state_);
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -781,7 +778,7 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully()
|
||||
|
||||
ObSEArray<ObTxCommitCallback, 4> cb_array;
|
||||
{
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
timeguard.click();
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -836,7 +833,7 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully()
|
||||
int ObLSTxCtxMgr::resume_leader()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
StateHelper state_helper(ls_id_, state_);
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -886,7 +883,7 @@ int ObLSTxCtxMgr::stop(const bool graceful)
|
||||
const KillTransArg arg(graceful);
|
||||
ObTimeGuard timeguard("ctxmgr stop");
|
||||
{
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
if (OB_FAIL(ls_log_writer_.stop())) {
|
||||
TRANS_LOG(WARN, "ls_log_writer_ stop error", KR(ret));
|
||||
} else {
|
||||
@ -926,7 +923,7 @@ int ObLSTxCtxMgr::kill_all_tx(const bool graceful, bool &is_all_tx_cleaned_up)
|
||||
ObSEArray<ObTxCommitCallback, 4> cb_array;
|
||||
const KillTransArg arg(graceful);
|
||||
{
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
KillTxCtxFunctor fn(arg, cb_array);
|
||||
if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) {
|
||||
TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret));
|
||||
@ -947,7 +944,7 @@ int ObLSTxCtxMgr::block(bool &is_all_tx_cleaned_up)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
StateHelper state_helper(ls_id_, state_);
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
|
||||
if (OB_FAIL(state_helper.switch_state(Ops::BLOCK))) {
|
||||
TRANS_LOG(WARN, "switch state error", KR(ret), "manager", *this);
|
||||
@ -961,7 +958,7 @@ int ObLSTxCtxMgr::online()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
StateHelper state_helper(ls_id_, state_);
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
|
||||
if (OB_FAIL(state_helper.switch_state(Ops::ONLINE))) {
|
||||
TRANS_LOG(WARN, "switch state error", KR(ret), "manager", *this);
|
||||
@ -974,7 +971,6 @@ int ObLSTxCtxMgr::online()
|
||||
int ObLSTxCtxMgr::get_ls_min_uncommit_tx_prepare_version(int64_t &min_prepare_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
RLockGuard guard(rwlock_);
|
||||
|
||||
if (ATOMIC_LOAD(&total_tx_ctx_count_) > 0 || ls_tx_ctx_map_.count() > 0) {
|
||||
IterateMinPrepareVersionFunctor fn;
|
||||
@ -1297,7 +1293,6 @@ int ObLSTxCtxMgr::on_tx_ctx_table_flushed()
|
||||
int ObLSTxCtxMgr::get_min_start_log_ts(int64_t &min_start_log_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
RLockGuard guard(rwlock_);
|
||||
|
||||
GetMinStartLogTsFunctor fn;
|
||||
if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
|
||||
@ -1334,7 +1329,7 @@ int64_t ObLSTxCtxMgr::get_aggre_rec_log_ts_()
|
||||
int ObLSTxCtxMgr::refresh_aggre_rec_log_ts()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLockGuard guard(rwlock_);
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
|
||||
if (OB_INVALID_TIMESTAMP == prev_aggre_rec_log_ts_) {
|
||||
// We should remember the rec_log_ts before the tx ctx table is successfully
|
||||
|
@ -154,6 +154,7 @@ public:
|
||||
typedef common::RWLock RWLock;
|
||||
typedef RWLock::RLockGuard RLockGuard;
|
||||
typedef RWLock::WLockGuard WLockGuard;
|
||||
typedef RWLock::WLockGuardWithRetryInterval WLockGuardWithRetryInterval;
|
||||
|
||||
ObLSTxCtxMgr()
|
||||
: tx_log_adapter_(&log_adapter_def_), rwlock_(ObLatchIds::DEFAULT_SPIN_RWLOCK),
|
||||
@ -492,6 +493,8 @@ private:
|
||||
private:
|
||||
static const int64_t OB_TRANS_STATISTICS_INTERVAL = 60 * 1000 * 1000;
|
||||
static const int64_t OB_PARTITION_AUDIT_LOCAL_STORAGE_COUNT = 4;
|
||||
static const int64_t TRY_THRESOLD_US = 1 * 1000 *1000;
|
||||
static const int64_t RETRY_INTERVAL_US = 10 *1000;
|
||||
|
||||
private:
|
||||
int process_callback_(ObIArray<ObTxCommitCallback> &cb_array) const;
|
||||
|
Reference in New Issue
Block a user