diff --git a/src/storage/high_availability/ob_transfer_handler.cpp b/src/storage/high_availability/ob_transfer_handler.cpp index c1e68563aa..a407c2c57b 100644 --- a/src/storage/high_availability/ob_transfer_handler.cpp +++ b/src/storage/high_availability/ob_transfer_handler.cpp @@ -65,7 +65,9 @@ ObTransferHandler::ObTransferHandler() retry_count_(0), transfer_worker_mgr_(), round_(0), - gts_seq_(share::SCN::base_scn()) + gts_seq_(share::SCN::base_scn()), + transfer_handler_lock_(), + transfer_handler_enabled_(true) { } @@ -322,9 +324,12 @@ int ObTransferHandler::process() { int ret = OB_SUCCESS; ObCurTraceId::init(GCONF.self_addr_); + common::SpinRLockGuard guard(transfer_handler_lock_); if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("transfer handler do not init", K(ret)); + } else if (!transfer_handler_enabled_) { + LOG_INFO("transfer handler do not enable, ls may offline"); } else if (OB_FAIL(do_leader_transfer_())) { LOG_WARN("failed to do leader transfer", K(ret)); } else if (OB_FAIL(do_worker_transfer_())) { @@ -2148,12 +2153,33 @@ int ObTransferHandler::offline() ret = OB_NOT_INIT; LOG_WARN("ls transfer handler do not init", K(ret)); } else { - int retry_cnt = 0; - do { - if (OB_FAIL(transfer_worker_mgr_.cancel_dag_net())) { - LOG_WARN("failed to cancel dag net", K(ret), KPC(ls_)); + const int64_t timeout_us = 1 * 1000 * 1000; //1s; + bool lock_succ = false; + if (OB_FAIL(transfer_handler_lock_.wrlock(timeout_us))) { + if (ret == OB_TIMEOUT) { + ret = OB_EAGAIN; } - } while (retry_cnt ++ < 3/*max retry cnt*/ && OB_EAGAIN == ret); + LOG_WARN("[TRANSFER] lock transfer_handler_lock_ failed", K(ret), K(timeout_us)); + } else { + lock_succ = true; + transfer_handler_enabled_ = false; + } + + if (lock_succ) { + int64_t tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(transfer_handler_lock_.unlock())) { + LOG_WARN("[RetainCtxMgr] unlock retain_ctx_mgr failed", K(tmp_ret)); + } + } + + if (OB_SUCC(ret)) { + int retry_cnt = 0; + do { + if (OB_FAIL(transfer_worker_mgr_.cancel_dag_net())) { + LOG_WARN("failed to cancel dag net", K(ret), KPC(ls_)); + } + } while (retry_cnt ++ < 3/*max retry cnt*/ && OB_EAGAIN == ret); + } } return ret; } @@ -2161,6 +2187,11 @@ int ObTransferHandler::offline() void ObTransferHandler::online() { transfer_worker_mgr_.reset_task_id(); + { + common::SpinWLockGuard guard(transfer_handler_lock_); + transfer_handler_enabled_ = true; + } + wakeup_(); } int ObTransferHandler::stop_tablets_schedule_medium_(const ObIArray &tablet_ids, bool &succ_stop) diff --git a/src/storage/high_availability/ob_transfer_handler.h b/src/storage/high_availability/ob_transfer_handler.h index d6522f91e9..49f8092e05 100644 --- a/src/storage/high_availability/ob_transfer_handler.h +++ b/src/storage/high_availability/ob_transfer_handler.h @@ -273,6 +273,8 @@ private: ObTransferWorkerMgr transfer_worker_mgr_; int64_t round_; share::SCN gts_seq_; + common::SpinRWLock transfer_handler_lock_; + bool transfer_handler_enabled_; DISALLOW_COPY_AND_ASSIGN(ObTransferHandler); }; } diff --git a/src/storage/high_availability/ob_transfer_service.cpp b/src/storage/high_availability/ob_transfer_service.cpp index 05497b12e2..707d00bb65 100644 --- a/src/storage/high_availability/ob_transfer_service.cpp +++ b/src/storage/high_availability/ob_transfer_service.cpp @@ -226,6 +226,8 @@ int ObTransferService::do_transfer_handler_(const share::ObLSID &ls_id) } else if (OB_ISNULL(ls = ls_handle.get_ls())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls should not be NULL", K(ret), KP(ls), K(ls_id)); + } else if (ls->is_offline()) { + LOG_INFO("ls is during offline, cannot schedule transfer handler", K(ls_id)); } else if (OB_FAIL(ls->get_transfer_handler()->process())) { LOG_WARN("failed to process transfer", K(ret), KP(ls)); } else {