From 356df530661254277f294921352c6e791822fa99 Mon Sep 17 00:00:00 2001 From: WenJinyu <596324105@qq.com> Date: Mon, 3 Jul 2023 02:53:55 +0000 Subject: [PATCH] Supplementary cancellation of ObTransferBackfillTXDagNet task during log stream GC --- .../ob_transfer_backfill_tx.cpp | 63 +++++++++++++++++-- .../ob_transfer_backfill_tx.h | 3 + .../high_availability/ob_transfer_handler.cpp | 38 +++++++++++ .../high_availability/ob_transfer_handler.h | 3 + .../high_availability/ob_transfer_service.cpp | 2 + src/storage/ls/ob_ls.cpp | 8 +++ 6 files changed, 112 insertions(+), 5 deletions(-) diff --git a/src/storage/high_availability/ob_transfer_backfill_tx.cpp b/src/storage/high_availability/ob_transfer_backfill_tx.cpp index c0bc766463..db60c82cd7 100644 --- a/src/storage/high_availability/ob_transfer_backfill_tx.cpp +++ b/src/storage/high_availability/ob_transfer_backfill_tx.cpp @@ -54,9 +54,6 @@ int ObTransferWorkerMgr::init(ObLS *dest_ls) ret = OB_INVALID_ARGUMENT; LOG_WARN("ls is nullptr", K(ret)); } else { - share::ObTaskId task_id; - task_id.init(GCONF.self_addr_); - task_id_ = task_id; tenant_id_ = MTL_ID(); dest_ls_ = dest_ls; is_inited_ = true; @@ -64,9 +61,14 @@ int ObTransferWorkerMgr::init(ObLS *dest_ls) return ret; } -void ObTransferWorkerMgr::update_task_id_() +void ObTransferWorkerMgr::reset_task_id() { task_id_.reset(); +} + +void ObTransferWorkerMgr::update_task_id_() +{ + reset_task_id(); task_id_.init(GCONF.self_addr_); } int ObTransferWorkerMgr::get_need_backfill_tx_tablets_(ObTransferBackfillTXParam ¶m) @@ -295,7 +297,7 @@ int ObTransferWorkerMgr::process() if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("transfer work not init", K(ret)); - } else if (OB_FAIL(check_task_exist_(task_id_, is_exist))) { + } else if (task_id_.is_valid() && OB_FAIL(check_task_exist_(task_id_, is_exist))) { LOG_WARN("failed to check task exist", K(ret), "ls_id", dest_ls_->get_ls_id(), K(*this)); } else if (is_exist) { // only one transfer backfill tx task is allowed to execute at a time @@ -346,6 +348,42 @@ int ObTransferWorkerMgr::check_task_exist_( return ret; } +int ObTransferWorkerMgr::cancel_dag_net() +{ + int ret = OB_SUCCESS; + ObTenantDagScheduler *scheduler = nullptr; + bool is_exist = false; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("transfer worker do not init", K(ret)); + } else if (task_id_.is_invalid()) { + // do nothing + } else if (OB_FAIL(check_task_exist_(task_id_, is_exist))) { + LOG_WARN("fail to check task exist", K(ret), K_(task_id)); + } else if (is_exist) { + if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("failed to get ObTenantDagScheduler from MTL", K(ret), KPC(this)); + } else if (OB_FAIL(scheduler->cancel_dag_net(task_id_))) { + LOG_WARN("failed to cancel dag net", K(ret), K(this)); + } + if (OB_FAIL(ret)) { + } else { + int64_t start_ts = ObTimeUtil::current_time(); + do { + if (OB_FAIL(check_task_exist_(task_id_, is_exist))) { + LOG_WARN("fail to check task exist", K(ret), K_(task_id)); + } else if (is_exist && REACH_TIME_INTERVAL(60 * 1000 * 1000)) { + ret = OB_EAGAIN; + LOG_WARN("cancel dag task cost too much time", K(ret), K_(task_id), + "cost_time", ObTimeUtil::current_time() - start_ts); + } + } while (is_exist && OB_SUCC(ret)); + } + } + return ret; +} + ERRSIM_POINT_DEF(EN_ERRSIM_ALLOW_TRANSFER_BACKFILL_TX); int ObTransferWorkerMgr::do_transfer_backfill_tx_(const ObTransferBackfillTXParam ¶m) @@ -692,6 +730,21 @@ int ObTransferBackfillTXDagNet::clear_dag_net_ctx() return ret; } +int ObTransferBackfillTXDagNet::deal_with_cancel() +{ + int ret = OB_SUCCESS; + const int32_t result = OB_CANCELED; + const bool need_retry = false; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer backfill tx dag net do not init", K(ret)); + } else if (OB_FAIL(ctx_.set_result(result, need_retry))) { + LOG_WARN("failed to set result", K(ret), KPC(this)); + } + return ret; +} + /******************ObBaseTransferBackfillTXDag*********************/ ObBaseTransferBackfillTXDag::ObBaseTransferBackfillTXDag(const share::ObDagType::ObDagTypeEnum &dag_type) : ObStorageHADag(dag_type) diff --git a/src/storage/high_availability/ob_transfer_backfill_tx.h b/src/storage/high_availability/ob_transfer_backfill_tx.h index 698029cd91..1f00f6f362 100644 --- a/src/storage/high_availability/ob_transfer_backfill_tx.h +++ b/src/storage/high_availability/ob_transfer_backfill_tx.h @@ -45,6 +45,8 @@ public: ~ObTransferWorkerMgr(); int init(ObLS *dest_ls); int process(); + int cancel_dag_net(); + void reset_task_id(); TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(task_id), KP_(dest_ls)); private: int check_task_exist_(const share::ObTaskId &task_id, bool &is_exist); @@ -108,6 +110,7 @@ public: virtual int fill_comment(char *buf, const int64_t buf_len) const override; virtual int fill_dag_net_key(char *buf, const int64_t buf_len) const override; virtual int clear_dag_net_ctx(); + virtual int deal_with_cancel() override; ObTransferBackfillTXCtx *get_ctx() { return &ctx_; } const share::ObLSID &get_ls_id() const { return ctx_.src_ls_id_; } diff --git a/src/storage/high_availability/ob_transfer_handler.cpp b/src/storage/high_availability/ob_transfer_handler.cpp index baa259f85c..6acb9227c2 100644 --- a/src/storage/high_availability/ob_transfer_handler.cpp +++ b/src/storage/high_availability/ob_transfer_handler.cpp @@ -1732,6 +1732,44 @@ int ObTransferHandler::record_server_event_(const int32_t result, const int64_t return ret; } +int ObTransferHandler::safe_to_destroy(bool &is_safe) +{ + int ret = OB_SUCCESS; + is_safe = false; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ls transfer handler do not init", K(ret)); + } else { + if (OB_FAIL(transfer_worker_mgr_.cancel_dag_net())) { + LOG_WARN("failed to cancel dag net", K(ret), KPC(ls_)); + } else { + is_safe = true; + } + } + return ret; +} + +int ObTransferHandler::offline() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ls transfer handler do not init", K(ret)); + } else { + int retry_cnt = 0; + do { + if (OB_FAIL(transfer_worker_mgr_.cancel_dag_net())) { + LOG_WARN("failed to cancel dag net", K(ret), KPC(ls_)); + } + } while (retry_cnt ++ < 3/*max retry cnt*/ && OB_EAGAIN == ret); + } + return ret; +} + +void ObTransferHandler::online() +{ + transfer_worker_mgr_.reset_task_id(); +} } } diff --git a/src/storage/high_availability/ob_transfer_handler.h b/src/storage/high_availability/ob_transfer_handler.h index 191fc56483..2481b50759 100644 --- a/src/storage/high_availability/ob_transfer_handler.h +++ b/src/storage/high_availability/ob_transfer_handler.h @@ -65,6 +65,9 @@ public: const share::SCN &scn) override final; virtual share::SCN get_rec_scn() override final { return share::SCN::max_scn(); } virtual int flush(share::SCN &scn) override final; + int safe_to_destroy(bool &is_safe); + int offline(); + void online(); private: int get_transfer_task_(share::ObTransferTaskInfo &task_info); diff --git a/src/storage/high_availability/ob_transfer_service.cpp b/src/storage/high_availability/ob_transfer_service.cpp index 5eca5c1d57..14bfd18d8d 100644 --- a/src/storage/high_availability/ob_transfer_service.cpp +++ b/src/storage/high_availability/ob_transfer_service.cpp @@ -170,6 +170,8 @@ int ObTransferService::get_ls_id_array_() } else if (OB_ISNULL(ls)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls should not be NULL", K(ret), KP(ls)); + } else if (!(ls->get_log_handler()->is_replay_enabled())) { + LOG_INFO("log handler not enable replay, should not schduler transfer hander", "ls_id", ls->get_ls_id()); } else if (OB_FAIL(ls_id_array_.push_back(ls->get_ls_id()))) { LOG_WARN("failed to push ls id into array", K(ret), KPC(ls)); } diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index 1b9002d85b..62128c0759 100755 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -642,6 +642,7 @@ bool ObLS::safe_to_destroy() bool is_data_check_point_safe = false; bool is_dup_table_handler_safe = false; bool is_log_handler_safe = false; + bool is_transfer_handler_safe = false; if (OB_FAIL(ls_tablet_svr_.safe_to_destroy(is_tablet_service_safe))) { LOG_WARN("ls tablet service check safe to destroy failed", K(ret), KPC(this)); @@ -658,6 +659,9 @@ bool ObLS::safe_to_destroy() } else if (OB_FAIL(log_handler_.safe_to_destroy(is_log_handler_safe))) { LOG_WARN("log_handler_ check safe to destroy failed", K(ret), KPC(this)); } else if (!is_log_handler_safe) { + } else if (OB_FAIL(transfer_handler_.safe_to_destroy(is_transfer_handler_safe))) { + LOG_WARN("transfer_handler_ check safe to destroy failed", K(ret), KPC(this)); + } else if (!is_transfer_handler_safe) { } else { if (1 == ref_mgr_.get_total_ref_cnt()) { // only has one ref at the safe destroy task is_safe = true; @@ -672,6 +676,7 @@ bool ObLS::safe_to_destroy() K(is_tablet_service_safe), K(is_data_check_point_safe), K(is_dup_table_handler_safe), K(is_ls_restore_handler_safe), K(is_log_handler_safe), + K(is_transfer_handler_safe), "ls_ref", ref_mgr_.get_total_ref_cnt(), K(ret), KP(this), KPC(this)); ref_mgr_.print(); @@ -876,6 +881,8 @@ int ObLS::offline_() LOG_WARN("checkpoint executor offline failed", K(ret), K(ls_meta_)); } else if (OB_FAIL(ls_restore_handler_.offline())) { LOG_WARN("failed to offline ls restore handler", K(ret)); + } else if (OB_FAIL(transfer_handler_.offline())) { + LOG_WARN("transfer_handler failed", K(ret), K(ls_meta_)); } else if (OB_FAIL(log_handler_.offline())) { LOG_WARN("failed to offline log", K(ret)); // TODO: delete it if apply sequence @@ -1006,6 +1013,7 @@ int ObLS::online() LOG_WARN("weak read handler online failed", K(ret), K(ls_meta_)); } else if (OB_FAIL(online_compaction_())) { LOG_WARN("compaction online failed", K(ret), K(ls_meta_)); + } else if (FALSE_IT(transfer_handler_.online())) { } else if (OB_FAIL(ls_restore_handler_.online())) { LOG_WARN("ls restore handler online failed", K(ret)); } else if (FALSE_IT(checkpoint_executor_.online())) {