From 8362d8c9d73e6ac1f88d8e859153655d2a27104c Mon Sep 17 00:00:00 2001 From: wxhwang Date: Fri, 9 Feb 2024 20:09:42 +0000 Subject: [PATCH] allow force stop archive --- .../backup/ob_archive_scheduler_service.cpp | 2 +- .../backup/ob_tenant_archive_scheduler.cpp | 24 +++++++++++++ .../backup/ob_tenant_archive_scheduler.h | 2 ++ src/share/backup/ob_archive_checkpoint.cpp | 27 +++++++++++--- src/share/backup/ob_archive_checkpoint.h | 4 ++- .../backup/ob_archive_persist_helper.cpp | 36 +++++++++++++++++++ src/share/backup/ob_archive_persist_helper.h | 2 ++ src/share/backup/ob_archive_struct.cpp | 2 +- src/share/parameter/ob_parameter_seed.ipp | 3 ++ 9 files changed, 95 insertions(+), 7 deletions(-) diff --git a/src/rootserver/backup/ob_archive_scheduler_service.cpp b/src/rootserver/backup/ob_archive_scheduler_service.cpp index 5a5d2c8e41..6614e2785e 100644 --- a/src/rootserver/backup/ob_archive_scheduler_service.cpp +++ b/src/rootserver/backup/ob_archive_scheduler_service.cpp @@ -312,7 +312,7 @@ int ObArchiveSchedulerService::process_() if (OB_FAIL(ret)) { } else if (no_dest) { } else if (archive_mode.is_noarchivelog()) { - if (no_round || round.state_.is_stop()) { + if (no_round || round.state_.is_stop() || round.state_.is_stopping()) { } else if (OB_FAIL(tenant_scheduler.disable_archive(dest_no))) { LOG_WARN("failed to disable archive", K(ret), K(tenant_id), K(dest_no), K(dest_state)); } diff --git a/src/rootserver/backup/ob_tenant_archive_scheduler.cpp b/src/rootserver/backup/ob_tenant_archive_scheduler.cpp index ec85f28f8e..724401cd96 100644 --- a/src/rootserver/backup/ob_tenant_archive_scheduler.cpp +++ b/src/rootserver/backup/ob_tenant_archive_scheduler.cpp @@ -763,6 +763,7 @@ int ObArchiveHandler::do_checkpoint_(share::ObTenantArchiveRoundAttr &round_info ObDestRoundCheckpointer checkpointer; SCN max_checkpoint_scn = SCN::min_scn(); bool can = false; + bool allow_force_stop = false; if (OB_FAIL(ObTenantArchiveMgr::decide_piece_id(round_info.start_scn_, round_info.base_piece_id_, round_info.piece_switch_interval_, round_info.checkpoint_scn_, since_piece_id))) { LOG_WARN("failed to calc since piece id", K(ret), K(round_info)); } else if (OB_FAIL(archive_table_op_.get_dest_round_summary(*sql_proxy_, round_info.dest_id_, round_info.round_id_, since_piece_id, summary))) { @@ -776,6 +777,9 @@ int ObArchiveHandler::do_checkpoint_(share::ObTenantArchiveRoundAttr &round_info LOG_WARN("tenant can not do archive", K(ret), K_(tenant_id)); } else if (OB_FAIL(checkpointer.init(&round_handler_, piece_generated_cb, round_checkpoint_cb, max_checkpoint_scn))) { LOG_WARN("failed to init checkpointer", K(ret), K(round_info)); + } else if (round_info.state_.is_stopping() && OB_FAIL(check_allow_force_stop_(round_info, allow_force_stop))) { + LOG_WARN("failed to check allow force stop", K(ret), K(round_info)); + } else if (allow_force_stop && OB_FALSE_IT(checkpointer.set_allow_force_stop())) { } else if (OB_FAIL(checkpointer.checkpoint(round_info, summary))) { LOG_WARN("failed to do checkpoint.", K(ret), K(round_info), K(summary)); } @@ -835,3 +839,23 @@ int ObArchiveHandler::get_max_checkpoint_scn_(const uint64_t tenant_id, SCN &max } return ret; } + + +int ObArchiveHandler::check_allow_force_stop_(const ObTenantArchiveRoundAttr &round, bool &allow_force_stop) const +{ + int ret = OB_SUCCESS; + int64_t stopping_ts = 0; + const int64_t current_ts = ObTimeUtility::current_time(); +#ifdef ERRSIM + const int64_t force_stop_threshold = GCONF.errsim_allow_force_archive_threshold;; +#else + const int64_t force_stop_threshold = ALLOW_FORCE_STOP_THRESHOLD; +#endif + allow_force_stop = false; + if (OB_FAIL(archive_table_op_.get_round_stopping_ts(*sql_proxy_, round.key_.dest_no_, stopping_ts))) { + LOG_WARN("failed to get round stopping ts.", K(ret), K(round)); + } else { + allow_force_stop = force_stop_threshold <= (current_ts - stopping_ts); + } + return ret; +} \ No newline at end of file diff --git a/src/rootserver/backup/ob_tenant_archive_scheduler.h b/src/rootserver/backup/ob_tenant_archive_scheduler.h index f913c5b18d..5d4ffbbe48 100644 --- a/src/rootserver/backup/ob_tenant_archive_scheduler.h +++ b/src/rootserver/backup/ob_tenant_archive_scheduler.h @@ -66,8 +66,10 @@ private: int do_checkpoint_(share::ObTenantArchiveRoundAttr &round_info); int check_archive_dest_validity_(const int64_t dest_no); int get_max_checkpoint_scn_(const uint64_t tenant_id, share::SCN &max_checkpoint_scn) const; + int check_allow_force_stop_(const ObTenantArchiveRoundAttr &round, bool &allow_force_stop) const; private: + static const int64_t ALLOW_FORCE_STOP_THRESHOLD = 10_min; bool is_inited_; uint64_t tenant_id_; // user tenant id obrpc::ObSrvRpcProxy *rpc_proxy_; diff --git a/src/share/backup/ob_archive_checkpoint.cpp b/src/share/backup/ob_archive_checkpoint.cpp index 9f9f3c43ee..ce9fdca49f 100644 --- a/src/share/backup/ob_archive_checkpoint.cpp +++ b/src/share/backup/ob_archive_checkpoint.cpp @@ -70,6 +70,11 @@ int ObDestRoundCheckpointer::init(ObArchiveRoundHandler *round_handler, const Pi return ret; } +void ObDestRoundCheckpointer::set_allow_force_stop() +{ + allow_force_stop_ = true; + LOG_INFO("set allow force stop"); +} int ObDestRoundCheckpointer::checkpoint(const ObTenantArchiveRoundAttr &round_info, const ObDestRoundSummary &summary) { @@ -268,8 +273,15 @@ int ObDestRoundCheckpointer::checkpoint_(const ObTenantArchiveRoundAttr &old_rou if (OB_FAIL(generate_pieces_(old_round_info, summary, result))) { LOG_WARN("failed to generate pieces", K(ret), K(old_round_info), K(summary)); } else if (OB_FAIL(round_checkpoint_cb_(round_handler_->get_sql_proxy(), old_round_info, result.new_round_info_))) { - LOG_WARN("failed to call round_checkpoint_cb", K(ret), K(old_round_info), K(summary), K(result)); - } else if (OB_FAIL(fill_generated_pieces_(result, pieces))){ + if (result.new_round_info_.state_.is_stop() && allow_force_stop_) { + ret = OB_SUCCESS; + LOG_INFO("allow_force_stop is set, ignore round_checkpoint_cb error", K(old_round_info), K(summary), K(result)); + } else { + LOG_WARN("failed to call round_checkpoint_cb", K(ret), K(old_round_info), K(summary), K(result)); + } + } + + if (FAILEDx(fill_generated_pieces_(result, pieces))){ LOG_WARN("failed to fill generated pieces", K(ret), K(old_round_info), K(summary), K(result)); } else if (OB_FAIL(round_handler_->checkpoint_to(old_round_info, result.new_round_info_, pieces))) { LOG_WARN("failed to checkpoint", K(ret), K(old_round_info), K(summary), K(result)); @@ -303,8 +315,15 @@ int ObDestRoundCheckpointer::generate_pieces_(const ObTenantArchiveRoundAttr &ol if (OB_FAIL(generate_one_piece_(old_round_info, result.new_round_info_, summary, piece_id, piece))) { LOG_WARN("failed to generate one piece", K(ret), K(old_round_info), K(result), K(summary), K(piece_id)); } else if (OB_FAIL(piece_generated_cb_(round_handler_->get_sql_proxy(), old_round_info, result, piece))) { - LOG_WARN("call piece_generated_cb_ failed", K(ret), K(old_round_info), K(piece)); - } else if (OB_FAIL(result.piece_list_.push_back(piece))) { + if (result.new_round_info_.state_.is_stop() && allow_force_stop_) { + ret = OB_SUCCESS; + LOG_INFO("allow_force_stop is set, ignore piece_generated_cb_ error", K(old_round_info), K(piece)); + } else { + LOG_WARN("call piece_generated_cb_ failed", K(ret), K(old_round_info), K(piece)); + } + } + + if (FAILEDx(result.piece_list_.push_back(piece))) { LOG_WARN("failed to push back piece", K(ret), K(result), K(piece)); } else if (piece.piece_info_.status_.is_frozen()) { frozen_input_bytes += piece.piece_info_.input_bytes_; diff --git a/src/share/backup/ob_archive_checkpoint.h b/src/share/backup/ob_archive_checkpoint.h index ce720ed4ed..d058922d10 100644 --- a/src/share/backup/ob_archive_checkpoint.h +++ b/src/share/backup/ob_archive_checkpoint.h @@ -61,10 +61,11 @@ public: typedef common::ObFunction PieceGeneratedCb; typedef common::ObFunction RoundCheckpointCb; - ObDestRoundCheckpointer() : is_inited_(false), round_handler_(nullptr), max_checkpoint_scn_() {} + ObDestRoundCheckpointer() : is_inited_(false), allow_force_stop_(false), round_handler_(nullptr), max_checkpoint_scn_() {} int init(ObArchiveRoundHandler *round_handler, const PieceGeneratedCb &piece_generated_cb, const RoundCheckpointCb &round_checkpoint_cb, const SCN &max_checkpoint_scn); + void set_allow_force_stop(); // This operation is allowed only if dest round is in BEGINNING/DOING/STOPPING state. int checkpoint(const ObTenantArchiveRoundAttr &round_info, const ObDestRoundSummary &summary); @@ -121,6 +122,7 @@ private: int fill_generated_pieces_(const Result &result, common::ObIArray &pieces) const; bool is_inited_; + bool allow_force_stop_; ObArchiveRoundHandler *round_handler_; SCN max_checkpoint_scn_; PieceGeneratedCb piece_generated_cb_; diff --git a/src/share/backup/ob_archive_persist_helper.cpp b/src/share/backup/ob_archive_persist_helper.cpp index 674da14adf..339eb378d7 100644 --- a/src/share/backup/ob_archive_persist_helper.cpp +++ b/src/share/backup/ob_archive_persist_helper.cpp @@ -528,6 +528,42 @@ int ObArchivePersistHelper::get_round_by_dest_id(common::ObISQLClient &proxy, co return ret; } +int ObArchivePersistHelper::get_round_stopping_ts( + common::ObISQLClient &proxy, const int64_t dest_no, int64_t &ts) const +{ + int ret = OB_SUCCESS; + ObSqlString sql; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("tenant archive table operator not init", K(ret)); + } else if (OB_FAIL(sql.append_fmt("select time_to_usec(gmt_modified) as stopping_ts from %s where tenant_id=%ld and dest_no=%ld and status='STOPPING'", + OB_ALL_LOG_ARCHIVE_PROGRESS_TNAME, + tenant_id_, + dest_no))) { + LOG_WARN("failed to append fmt", K(ret)); + } else { + HEAP_VAR(ObMySQLProxy::ReadResult, res) { + ObMySQLResult *result = NULL; + if (OB_FAIL(proxy.read(res, get_exec_tenant_id(), sql.ptr()))) { + LOG_WARN("failed to exec sql", K(ret), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result is null", K(ret), K(sql)); + } else if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + ret = OB_ENTRY_NOT_EXIST; + } else { + LOG_WARN("failed to get next", K(ret)); + } + } else { + EXTRACT_INT_FIELD_MYSQL(*result, "stopping_ts", ts, int64_t); + } + } + } + + return ret; +} + int ObArchivePersistHelper::del_round(common::ObISQLClient &proxy, const int64_t dest_no) const { int ret = OB_SUCCESS; diff --git a/src/share/backup/ob_archive_persist_helper.h b/src/share/backup/ob_archive_persist_helper.h index cd3127b832..d32d95c2eb 100644 --- a/src/share/backup/ob_archive_persist_helper.h +++ b/src/share/backup/ob_archive_persist_helper.h @@ -113,6 +113,8 @@ public: int clean_round_comment(common::ObISQLClient &proxy, const int64_t dest_no) const; int get_round_by_dest_id(common::ObISQLClient &proxy, const int64_t dest_id, const bool need_lock, ObTenantArchiveRoundAttr &round) const; + // Get stop archive time. Return OB_ENTRY_NOT_EXIST if no STOPPING round exist. + int get_round_stopping_ts(common::ObISQLClient &proxy, const int64_t dest_no, int64_t &ts) const; int del_round(common::ObISQLClient &proxy, const int64_t dest_no) const; int start_new_round(common::ObISQLClient &proxy, const ObTenantArchiveRoundAttr &new_round) const; int switch_round_state_to(common::ObISQLClient &proxy, const ObTenantArchiveRoundAttr &round, diff --git a/src/share/backup/ob_archive_struct.cpp b/src/share/backup/ob_archive_struct.cpp index 2df720f18b..9687d9f7ae 100644 --- a/src/share/backup/ob_archive_struct.cpp +++ b/src/share/backup/ob_archive_struct.cpp @@ -281,7 +281,7 @@ int ObTenantArchiveRoundAttr::generate_next_round(const int64_t incarnation, next_round.path_ = path; next_round.frozen_input_bytes_ = 0; - next_round.frozen_input_bytes_ = 0; + next_round.frozen_output_bytes_ = 0; next_round.active_input_bytes_ = 0; next_round.active_output_bytes_ = 0; diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 1425849451..9d2e3bac60 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -1533,6 +1533,9 @@ DEF_BOOL(_enable_tenant_leak_memory_protection, OB_CLUSTER_PARAMETER, "False", " DEF_TIME(_advance_checkpoint_timeout, OB_CLUSTER_PARAMETER, "30m", "[10s,180m]", "the timeout for backup/migrate advance checkpoint Range: [10s,180m]", ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +ERRSIM_DEF_TIME(errsim_allow_force_archive_threshold, OB_CLUSTER_PARAMETER, "600s", "[1s,2h]", + "force stop archive threshold, Range: [1s,2h]", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); //transfer DEF_TIME(_transfer_start_rpc_timeout, OB_TENANT_PARAMETER, "10s", "[1ms,600s]",