allow force stop archive
This commit is contained in:
		| @ -312,7 +312,7 @@ int ObArchiveSchedulerService::process_() | ||||
|   if (OB_FAIL(ret)) { | ||||
|   } else if (no_dest) { | ||||
|   } else if (archive_mode.is_noarchivelog()) { | ||||
|     if (no_round || round.state_.is_stop()) { | ||||
|     if (no_round || round.state_.is_stop() || round.state_.is_stopping()) { | ||||
|     } else if (OB_FAIL(tenant_scheduler.disable_archive(dest_no))) { | ||||
|       LOG_WARN("failed to disable archive", K(ret), K(tenant_id), K(dest_no), K(dest_state)); | ||||
|     } | ||||
|  | ||||
| @ -763,6 +763,7 @@ int ObArchiveHandler::do_checkpoint_(share::ObTenantArchiveRoundAttr &round_info | ||||
|   ObDestRoundCheckpointer checkpointer; | ||||
|   SCN max_checkpoint_scn = SCN::min_scn(); | ||||
|   bool can = false; | ||||
|   bool allow_force_stop = false; | ||||
|   if (OB_FAIL(ObTenantArchiveMgr::decide_piece_id(round_info.start_scn_, round_info.base_piece_id_, round_info.piece_switch_interval_, round_info.checkpoint_scn_, since_piece_id))) { | ||||
|     LOG_WARN("failed to calc since piece id", K(ret), K(round_info)); | ||||
|   } else if (OB_FAIL(archive_table_op_.get_dest_round_summary(*sql_proxy_, round_info.dest_id_, round_info.round_id_, since_piece_id, summary))) { | ||||
| @ -776,6 +777,9 @@ int ObArchiveHandler::do_checkpoint_(share::ObTenantArchiveRoundAttr &round_info | ||||
|     LOG_WARN("tenant can not do archive", K(ret), K_(tenant_id)); | ||||
|   } else if (OB_FAIL(checkpointer.init(&round_handler_, piece_generated_cb, round_checkpoint_cb, max_checkpoint_scn))) { | ||||
|     LOG_WARN("failed to init checkpointer", K(ret), K(round_info)); | ||||
|   } else if (round_info.state_.is_stopping() && OB_FAIL(check_allow_force_stop_(round_info, allow_force_stop))) { | ||||
|     LOG_WARN("failed to check allow force stop", K(ret), K(round_info)); | ||||
|   } else if (allow_force_stop && OB_FALSE_IT(checkpointer.set_allow_force_stop())) { | ||||
|   } else if (OB_FAIL(checkpointer.checkpoint(round_info, summary))) { | ||||
|     LOG_WARN("failed to do checkpoint.", K(ret), K(round_info), K(summary)); | ||||
|   } | ||||
| @ -835,3 +839,23 @@ int ObArchiveHandler::get_max_checkpoint_scn_(const uint64_t tenant_id, SCN &max | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
|  | ||||
| int ObArchiveHandler::check_allow_force_stop_(const ObTenantArchiveRoundAttr &round, bool &allow_force_stop) const | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   int64_t stopping_ts = 0; | ||||
|   const int64_t current_ts = ObTimeUtility::current_time(); | ||||
| #ifdef ERRSIM | ||||
|   const int64_t force_stop_threshold = GCONF.errsim_allow_force_archive_threshold;; | ||||
| #else | ||||
|   const int64_t force_stop_threshold = ALLOW_FORCE_STOP_THRESHOLD; | ||||
| #endif | ||||
|   allow_force_stop = false; | ||||
|   if (OB_FAIL(archive_table_op_.get_round_stopping_ts(*sql_proxy_, round.key_.dest_no_, stopping_ts))) { | ||||
|     LOG_WARN("failed to get round stopping ts.", K(ret), K(round)); | ||||
|   } else { | ||||
|     allow_force_stop = force_stop_threshold <= (current_ts - stopping_ts); | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
| @ -66,8 +66,10 @@ private: | ||||
|   int do_checkpoint_(share::ObTenantArchiveRoundAttr &round_info); | ||||
|   int check_archive_dest_validity_(const int64_t dest_no); | ||||
|   int get_max_checkpoint_scn_(const uint64_t tenant_id, share::SCN &max_checkpoint_scn) const; | ||||
|   int check_allow_force_stop_(const ObTenantArchiveRoundAttr &round, bool &allow_force_stop) const; | ||||
|  | ||||
| private: | ||||
|   static const int64_t ALLOW_FORCE_STOP_THRESHOLD = 10_min; | ||||
|   bool is_inited_; | ||||
|   uint64_t tenant_id_; // user tenant id | ||||
|   obrpc::ObSrvRpcProxy *rpc_proxy_; | ||||
|  | ||||
| @ -70,6 +70,11 @@ int ObDestRoundCheckpointer::init(ObArchiveRoundHandler *round_handler, const Pi | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| void ObDestRoundCheckpointer::set_allow_force_stop() | ||||
| { | ||||
|   allow_force_stop_ = true; | ||||
|   LOG_INFO("set allow force stop"); | ||||
| } | ||||
|  | ||||
| int ObDestRoundCheckpointer::checkpoint(const ObTenantArchiveRoundAttr &round_info, const ObDestRoundSummary &summary) | ||||
| { | ||||
| @ -268,8 +273,15 @@ int ObDestRoundCheckpointer::checkpoint_(const ObTenantArchiveRoundAttr &old_rou | ||||
|   if (OB_FAIL(generate_pieces_(old_round_info, summary, result))) { | ||||
|     LOG_WARN("failed to generate pieces", K(ret), K(old_round_info), K(summary)); | ||||
|   } else if (OB_FAIL(round_checkpoint_cb_(round_handler_->get_sql_proxy(), old_round_info, result.new_round_info_))) { | ||||
|     LOG_WARN("failed to call round_checkpoint_cb", K(ret), K(old_round_info), K(summary), K(result)); | ||||
|   } else if (OB_FAIL(fill_generated_pieces_(result, pieces))){ | ||||
|     if (result.new_round_info_.state_.is_stop() && allow_force_stop_) { | ||||
|       ret = OB_SUCCESS; | ||||
|       LOG_INFO("allow_force_stop is set, ignore round_checkpoint_cb error", K(old_round_info), K(summary), K(result)); | ||||
|     } else { | ||||
|       LOG_WARN("failed to call round_checkpoint_cb", K(ret), K(old_round_info), K(summary), K(result)); | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   if (FAILEDx(fill_generated_pieces_(result, pieces))){ | ||||
|     LOG_WARN("failed to fill generated pieces", K(ret), K(old_round_info), K(summary), K(result)); | ||||
|   } else if (OB_FAIL(round_handler_->checkpoint_to(old_round_info, result.new_round_info_, pieces))) { | ||||
|     LOG_WARN("failed to checkpoint", K(ret), K(old_round_info), K(summary), K(result)); | ||||
| @ -303,8 +315,15 @@ int ObDestRoundCheckpointer::generate_pieces_(const ObTenantArchiveRoundAttr &ol | ||||
|       if (OB_FAIL(generate_one_piece_(old_round_info, result.new_round_info_, summary, piece_id, piece))) { | ||||
|         LOG_WARN("failed to generate one piece", K(ret), K(old_round_info), K(result), K(summary), K(piece_id)); | ||||
|       } else if (OB_FAIL(piece_generated_cb_(round_handler_->get_sql_proxy(), old_round_info, result, piece))) { | ||||
|         LOG_WARN("call piece_generated_cb_ failed", K(ret), K(old_round_info), K(piece)); | ||||
|       } else if (OB_FAIL(result.piece_list_.push_back(piece))) { | ||||
|         if (result.new_round_info_.state_.is_stop() && allow_force_stop_) { | ||||
|           ret = OB_SUCCESS; | ||||
|           LOG_INFO("allow_force_stop is set, ignore piece_generated_cb_ error", K(old_round_info), K(piece)); | ||||
|         } else { | ||||
|           LOG_WARN("call piece_generated_cb_ failed", K(ret), K(old_round_info), K(piece)); | ||||
|         } | ||||
|       } | ||||
|  | ||||
|       if (FAILEDx(result.piece_list_.push_back(piece))) { | ||||
|         LOG_WARN("failed to push back piece", K(ret), K(result), K(piece)); | ||||
|       } else if (piece.piece_info_.status_.is_frozen()) { | ||||
|         frozen_input_bytes += piece.piece_info_.input_bytes_; | ||||
|  | ||||
| @ -61,10 +61,11 @@ public: | ||||
|   typedef common::ObFunction<int(common::ObISQLClient *, const ObTenantArchiveRoundAttr &, const Result &, const GeneratedPiece &)> PieceGeneratedCb; | ||||
|   typedef common::ObFunction<int(common::ObISQLClient *, const ObTenantArchiveRoundAttr &, const ObTenantArchiveRoundAttr &)> RoundCheckpointCb; | ||||
|  | ||||
|   ObDestRoundCheckpointer() : is_inited_(false), round_handler_(nullptr), max_checkpoint_scn_() {} | ||||
|   ObDestRoundCheckpointer() : is_inited_(false), allow_force_stop_(false), round_handler_(nullptr), max_checkpoint_scn_() {} | ||||
|  | ||||
|   int init(ObArchiveRoundHandler *round_handler, const PieceGeneratedCb &piece_generated_cb,  | ||||
|       const RoundCheckpointCb &round_checkpoint_cb, const SCN &max_checkpoint_scn); | ||||
|   void set_allow_force_stop(); | ||||
|  | ||||
|   // This operation is allowed only if dest round is in BEGINNING/DOING/STOPPING state. | ||||
|   int checkpoint(const ObTenantArchiveRoundAttr &round_info, const ObDestRoundSummary &summary); | ||||
| @ -121,6 +122,7 @@ private: | ||||
|   int fill_generated_pieces_(const Result &result, common::ObIArray<ObTenantArchivePieceAttr> &pieces) const; | ||||
|  | ||||
|   bool is_inited_; | ||||
|   bool allow_force_stop_; | ||||
|   ObArchiveRoundHandler *round_handler_; | ||||
|   SCN max_checkpoint_scn_; | ||||
|   PieceGeneratedCb piece_generated_cb_; | ||||
|  | ||||
| @ -528,6 +528,42 @@ int ObArchivePersistHelper::get_round_by_dest_id(common::ObISQLClient &proxy, co | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObArchivePersistHelper::get_round_stopping_ts( | ||||
|     common::ObISQLClient &proxy, const int64_t dest_no, int64_t &ts) const | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   ObSqlString sql; | ||||
|   if (IS_NOT_INIT) { | ||||
|     ret = OB_NOT_INIT; | ||||
|     LOG_WARN("tenant archive table operator not init", K(ret)); | ||||
|   } else if (OB_FAIL(sql.append_fmt("select time_to_usec(gmt_modified) as stopping_ts from %s where tenant_id=%ld and dest_no=%ld and status='STOPPING'", | ||||
|                      OB_ALL_LOG_ARCHIVE_PROGRESS_TNAME, | ||||
|                      tenant_id_, | ||||
|                      dest_no))) { | ||||
|     LOG_WARN("failed to append fmt", K(ret)); | ||||
|   } else { | ||||
|     HEAP_VAR(ObMySQLProxy::ReadResult, res) { | ||||
|       ObMySQLResult *result = NULL; | ||||
|       if (OB_FAIL(proxy.read(res, get_exec_tenant_id(), sql.ptr()))) { | ||||
|         LOG_WARN("failed to exec sql", K(ret), K(sql)); | ||||
|       } else if (OB_ISNULL(result = res.get_result())) { | ||||
|         ret = OB_ERR_UNEXPECTED; | ||||
|         LOG_WARN("result is null", K(ret), K(sql)); | ||||
|       } else if (OB_FAIL(result->next())) { | ||||
|         if (OB_ITER_END == ret) { | ||||
|           ret = OB_ENTRY_NOT_EXIST; | ||||
|         } else { | ||||
|           LOG_WARN("failed to get next", K(ret)); | ||||
|         } | ||||
|       } else { | ||||
|         EXTRACT_INT_FIELD_MYSQL(*result, "stopping_ts", ts, int64_t); | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObArchivePersistHelper::del_round(common::ObISQLClient &proxy, const int64_t dest_no) const | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|  | ||||
| @ -113,6 +113,8 @@ public: | ||||
|   int clean_round_comment(common::ObISQLClient &proxy, const int64_t dest_no) const; | ||||
|   int get_round_by_dest_id(common::ObISQLClient &proxy, const int64_t dest_id, | ||||
|       const bool need_lock, ObTenantArchiveRoundAttr &round) const; | ||||
|   // Get stop archive time. Return OB_ENTRY_NOT_EXIST if no STOPPING round exist. | ||||
|   int get_round_stopping_ts(common::ObISQLClient &proxy, const int64_t dest_no, int64_t &ts) const; | ||||
|   int del_round(common::ObISQLClient &proxy, const int64_t dest_no) const; | ||||
|   int start_new_round(common::ObISQLClient &proxy, const ObTenantArchiveRoundAttr &new_round) const; | ||||
|   int switch_round_state_to(common::ObISQLClient &proxy, const ObTenantArchiveRoundAttr &round, | ||||
|  | ||||
| @ -281,7 +281,7 @@ int ObTenantArchiveRoundAttr::generate_next_round(const int64_t incarnation, | ||||
|   next_round.path_ = path; | ||||
|  | ||||
|   next_round.frozen_input_bytes_ = 0; | ||||
|   next_round.frozen_input_bytes_ = 0; | ||||
|   next_round.frozen_output_bytes_ = 0; | ||||
|   next_round.active_input_bytes_ = 0; | ||||
|   next_round.active_output_bytes_ = 0; | ||||
|  | ||||
|  | ||||
| @ -1533,6 +1533,9 @@ DEF_BOOL(_enable_tenant_leak_memory_protection, OB_CLUSTER_PARAMETER, "False", " | ||||
| DEF_TIME(_advance_checkpoint_timeout, OB_CLUSTER_PARAMETER, "30m", "[10s,180m]", | ||||
|          "the timeout for backup/migrate advance checkpoint Range: [10s,180m]", | ||||
|          ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); | ||||
| ERRSIM_DEF_TIME(errsim_allow_force_archive_threshold, OB_CLUSTER_PARAMETER, "600s", "[1s,2h]", | ||||
|                 "force stop archive threshold, Range: [1s,2h]", | ||||
|                 ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); | ||||
|  | ||||
| //transfer | ||||
| DEF_TIME(_transfer_start_rpc_timeout, OB_TENANT_PARAMETER, "10s", "[1ms,600s]", | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 wxhwang
					wxhwang