From 55f776959ed788e93ea603f2e8ec8008c0345667 Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 7 Nov 2022 04:35:44 +0000 Subject: [PATCH] add diagnose virtual table --- .../applyservice/ob_log_apply_service.cpp | 34 ++ .../applyservice/ob_log_apply_service.h | 8 + src/logservice/ob_garbage_collector.cpp | 21 +- src/logservice/ob_garbage_collector.h | 34 ++ src/logservice/ob_log_base_type.h | 4 +- src/logservice/ob_log_handler.cpp | 26 ++ src/logservice/ob_log_handler.h | 9 + src/logservice/ob_log_service.cpp | 44 ++ src/logservice/ob_log_service.h | 3 + src/logservice/ob_ls_adapter.cpp | 21 + src/logservice/ob_ls_adapter.h | 3 + src/logservice/palf/log_state_mgr.cpp | 13 + src/logservice/palf/log_state_mgr.h | 1 + src/logservice/palf/palf_handle.cpp | 6 + src/logservice/palf/palf_handle.h | 4 +- src/logservice/palf/palf_handle_impl.cpp | 9 + src/logservice/palf/palf_handle_impl.h | 15 + .../rcservice/ob_role_change_handler.cpp | 10 +- .../rcservice/ob_role_change_handler.h | 43 +- .../rcservice/ob_role_change_service.cpp | 45 +- .../rcservice/ob_role_change_service.h | 2 + .../replayservice/ob_log_replay_service.cpp | 59 +-- .../replayservice/ob_log_replay_service.h | 3 +- .../replayservice/ob_replay_status.cpp | 186 ++++++-- .../replayservice/ob_replay_status.h | 43 +- src/observer/CMakeLists.txt | 1 + .../ob_all_virtual_ha_diagnose.cpp | 234 ++++++++++ .../ob_all_virtual_ha_diagnose.h | 75 ++++ .../virtual_table/ob_all_virtual_log_stat.cpp | 6 +- .../ob_virtual_table_iterator_factory.cpp | 14 + .../ob_inner_table_schema.12301_12350.cpp | 399 ++++++++++++++++++ src/share/inner_table/ob_inner_table_schema.h | 11 +- .../ob_inner_table_schema_constants.h | 2 + .../inner_table/ob_inner_table_schema_def.py | 40 +- .../checkpoint/ob_checkpoint_executor.cpp | 40 +- .../checkpoint/ob_checkpoint_executor.h | 17 +- src/storage/ls/ob_ls.cpp | 33 ++ src/storage/ls/ob_ls.h | 28 ++ src/storage/tx_storage/ob_ls_service.cpp | 28 ++ src/storage/tx_storage/ob_ls_service.h | 2 + .../r/mysql/inner_table_overall.result | 1 + .../logservice/test_role_change_handler.cpp | 3 +- 42 files changed, 1468 insertions(+), 112 deletions(-) create mode 100644 src/observer/virtual_table/ob_all_virtual_ha_diagnose.cpp create mode 100644 src/observer/virtual_table/ob_all_virtual_ha_diagnose.h diff --git a/src/logservice/applyservice/ob_log_apply_service.cpp b/src/logservice/applyservice/ob_log_apply_service.cpp index 7f56561df..686ebfd90 100644 --- a/src/logservice/applyservice/ob_log_apply_service.cpp +++ b/src/logservice/applyservice/ob_log_apply_service.cpp @@ -757,6 +757,18 @@ int ObApplyStatus::handle_drop_cb() return ret; } +int ObApplyStatus::diagnose(ApplyDiagnoseInfo &diagnose_info) +{ + int ret = OB_SUCCESS; + int64_t min_unapplied_scn = OB_INVALID_TIMESTAMP; + if (OB_FAIL(get_min_unapplied_log_ts_ns(min_unapplied_scn))) { + CLOG_LOG(WARN, "get_min_unapplied_log_ts_ns failed", KPC(this), K(ret)); + } else { + diagnose_info.max_applied_scn_ = min_unapplied_scn - 1; + } + return ret; +} + int ObApplyStatus::submit_task_to_apply_service_(ObApplyServiceTask &task) { int ret = OB_SUCCESS; @@ -1378,6 +1390,28 @@ int ObLogApplyService::stat_for_each(const common::ObFunctiondiagnose(diagnose_info))) { + CLOG_LOG(WARN, "apply status diagnose failed", K(ret), K(id)); + } else { + CLOG_LOG(TRACE, "apply service diagnose success", K(id)); + } + return ret; +} + int ObLogApplyService::handle_cb_queue_(ObApplyStatus *apply_status, ObApplyServiceQueueTask *cb_queue, bool &is_timeslice_run_out) diff --git a/src/logservice/applyservice/ob_log_apply_service.h b/src/logservice/applyservice/ob_log_apply_service.h index f472f5c84..432ca841c 100644 --- a/src/logservice/applyservice/ob_log_apply_service.h +++ b/src/logservice/applyservice/ob_log_apply_service.h @@ -58,6 +58,12 @@ struct LSApplyStat K(pending_cnt_)); }; +struct ApplyDiagnoseInfo +{ + int64_t max_applied_scn_; + TO_STRING_KV(K(max_applied_scn_)); +}; + class ObApplyFsCb : public palf::PalfFSCb { public: @@ -164,6 +170,7 @@ public: int get_min_unapplied_log_ts_ns(int64_t &log_ts); int stat(LSApplyStat &stat) const; int handle_drop_cb(); + int diagnose(ApplyDiagnoseInfo &diagnose_info); TO_STRING_KV(K(ls_id_), K(role_), K(proposal_id_), @@ -251,6 +258,7 @@ public: int push_task(ObApplyServiceTask *task); int wait_append_sync(const share::ObLSID &ls_id); int stat_for_each(const common::ObFunction &func); + int diagnose(const share::ObLSID &id, ApplyDiagnoseInfo &diagnose_info); public: class GetApplyStatusFunctor { diff --git a/src/logservice/ob_garbage_collector.cpp b/src/logservice/ob_garbage_collector.cpp index 04d9cc70a..9c25fb19c 100644 --- a/src/logservice/ob_garbage_collector.cpp +++ b/src/logservice/ob_garbage_collector.cpp @@ -298,7 +298,8 @@ DEFINE_GET_SERIALIZE_SIZE(ObGCLSLog) ObGCHandler::ObGCHandler() : is_inited_(false), rwlock_(), ls_(NULL), - gc_seq_invalid_member_(-1) + gc_seq_invalid_member_(-1), + gc_start_ts_(OB_INVALID_TIMESTAMP) { } @@ -312,6 +313,7 @@ void ObGCHandler::reset() WLockGuard wlock_guard(rwlock_); gc_seq_invalid_member_ = -1; ls_ = NULL; + gc_start_ts_ = OB_INVALID_TIMESTAMP; is_inited_ = false; } @@ -770,6 +772,7 @@ void ObGCHandler::handle_gc_ls_dropping_(const ObGarbageCollector::LSStatus &ls_ ObRole role; ObLSID ls_id = ls_->get_ls_id(); LSGCState gc_state = INVALID_LS_GC_STATE; + gc_start_ts_ = ObTimeUtility::current_time(); if (OB_FAIL(get_palf_role_(role))) { CLOG_LOG(WARN, "get_palf_role_ failed", K(ls_id)); } else if (ObRole::LEADER != role) { @@ -824,6 +827,22 @@ void ObGCHandler::handle_gc_ls_offline_(ObGarbageCollector::LSStatus &ls_status) } } +int ObGCHandler::diagnose(GCDiagnoseInfo &diagnose_info) const +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + CLOG_LOG(WARN, "GC handler not init"); + } else { + RLockGuard wlock_guard(rwlock_); + if (OB_FAIL(ls_->get_gc_state(diagnose_info.gc_state_))) { + CLOG_LOG(WARN, "get_gc_state failed", K(ls_id)); + } else { + diagnose_info.gc_start_ts_ = gc_start_ts_; + } + } + return ret; +} + //---------------ObGarbageCollector---------------// void ObGarbageCollector::GCCandidate::set_ls_status(const share::ObLSStatus &ls_status) { diff --git a/src/logservice/ob_garbage_collector.h b/src/logservice/ob_garbage_collector.h index eacc0aae5..4b1526db3 100644 --- a/src/logservice/ob_garbage_collector.h +++ b/src/logservice/ob_garbage_collector.h @@ -65,6 +65,38 @@ enum LSGCState MAX_LS_GC_STATE = 6, }; +static inline +int gc_state_to_string(const LSGCState gc_state, + char *str, + const int64_t str_len) +{ + int ret = OB_SUCCESS; + if (gc_state == INVALID_LS_GC_STATE) { + strncpy(str ,"INVALID_STATE", str_len); + } else if (gc_state == NORMAL) { + strncpy(str ,"NORMAL", str_len); + } else if (gc_state == LS_BLOCKED) { + strncpy(str ,"LS_BLOCKED", str_len); + } else if (gc_state == WAIT_OFFLINE) { + strncpy(str ,"WAIT_OFFLINE", str_len); + } else if (gc_state == LS_OFFLINE) { + strncpy(str ,"LS_OFFLINE", str_len); + } else if (gc_state == WAIT_GC) { + strncpy(str ,"WAIT_GC", str_len); + } else { + ret = OB_INVALID_ARGUMENT; + } + return ret; +} + +struct GCDiagnoseInfo +{ + LSGCState gc_state_; + int64_t gc_start_ts_; + TO_STRING_KV(K(gc_state_), + K(gc_start_ts_)); +}; + class ObGCLSLog { public: @@ -189,6 +221,7 @@ public: int check_ls_can_offline(); int gc_check_invalid_member_seq(const int64_t gc_seq, bool &need_gc); static bool is_valid_ls_gc_state(const LSGCState &state); + int diagnose(GCDiagnoseInfo &diagnose_info) const; // for replay virtual int replay(const void *buffer, @@ -267,6 +300,7 @@ private: RWLock rwlock_; //for leader revoke/takeover submit log storage::ObLS *ls_; int64_t gc_seq_invalid_member_; //缓存gc检查当前ls不在成员列表时的轮次 + int64_t gc_start_ts_; }; } // namespace logservice diff --git a/src/logservice/ob_log_base_type.h b/src/logservice/ob_log_base_type.h index 9cb571e2c..697f8afed 100644 --- a/src/logservice/ob_log_base_type.h +++ b/src/logservice/ob_log_base_type.h @@ -72,7 +72,9 @@ int log_base_type_to_string(const ObLogBaseType log_type, const int64_t str_len) { int ret = OB_SUCCESS; - if (log_type == TRANS_SERVICE_LOG_BASE_TYPE) { + if (log_type == INVALID_LOG_BASE_TYPE) { + strncpy(str ,"INVALID_TYPE", str_len); + } else if (log_type == TRANS_SERVICE_LOG_BASE_TYPE) { strncpy(str ,"TRANS_SERVICE", str_len); } else if (log_type == TABLET_OP_LOG_BASE_TYPE) { strncpy(str ,"TABLET_OP", str_len); diff --git a/src/logservice/ob_log_handler.cpp b/src/logservice/ob_log_handler.cpp index 5ec00246a..c090c4235 100644 --- a/src/logservice/ob_log_handler.cpp +++ b/src/logservice/ob_log_handler.cpp @@ -1307,5 +1307,31 @@ int ObLogHandler::unregister_rebuild_cb() return ret; } +int ObLogHandler::diagnose(LogHandlerDiagnoseInfo &diagnose_info) const +{ + int ret = OB_SUCCESS; + RLockGuard guard(lock_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else { + diagnose_info.log_handler_role_ = ATOMIC_LOAD(&role_); + diagnose_info.log_handler_proposal_id_ = ATOMIC_LOAD(&proposal_id_); + } + return ret; +} + +int ObLogHandler::diagnose_palf(palf::PalfDiagnoseInfo &diagnose_info) const +{ + int ret = OB_SUCCESS; + RLockGuard guard(lock_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_FAIL(palf_handle_.diagnose(diagnose_info))) { + CLOG_LOG(WARN, "palf handle diagnose failed", K(ret), KPC(this)); + } else { + // do nothing + } + return ret; +} } // end namespace logservice } // end napespace oceanbase diff --git a/src/logservice/ob_log_handler.h b/src/logservice/ob_log_handler.h index af3dc587b..fd28bb75f 100644 --- a/src/logservice/ob_log_handler.h +++ b/src/logservice/ob_log_handler.h @@ -45,6 +45,13 @@ class ObLogApplyService; class ObApplyStatus; class ObLogReplayService; class AppendCb; +struct LogHandlerDiagnoseInfo { + common::ObRole log_handler_role_; + int64_t log_handler_proposal_id_; + TO_STRING_KV(K(log_handler_role_), + K(log_handler_proposal_id_)); +}; + class ObILogHandler { public: @@ -502,6 +509,8 @@ public: int enable_vote() override final; int register_rebuild_cb(palf::PalfRebuildCb *rebuild_cb) override final; int unregister_rebuild_cb() override final; + int diagnose(LogHandlerDiagnoseInfo &diagnose_info) const; + int diagnose_palf(palf::PalfDiagnoseInfo &diagnose_info) const; TO_STRING_KV(K_(role), K_(proposal_id), KP(palf_env_), K(is_in_stop_state_), K(is_inited_)); private: int submit_config_change_cmd_(const LogConfigChangeCmd &req); diff --git a/src/logservice/ob_log_service.cpp b/src/logservice/ob_log_service.cpp index b8b4ae643..db28cfcd2 100644 --- a/src/logservice/ob_log_service.cpp +++ b/src/logservice/ob_log_service.cpp @@ -613,5 +613,49 @@ int ObLogService::create_ls_(const share::ObLSID &id, } return ret; } + +int ObLogService::diagnose_role_change(RCDiagnoseInfo &diagnose_info) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "log_service is not inited", K(ret)); + } else if (OB_FAIL(role_change_service_.diagnose(diagnose_info))) { + CLOG_LOG(WARN, "role_change_service diagnose failed", K(ret)); + } else { + // do nothing + } + return ret; +} + +int ObLogService::diagnose_replay(const share::ObLSID &id, + ReplayDiagnoseInfo &diagnose_info) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "log_service is not inited", K(ret)); + } else if (OB_FAIL(replay_service_.diagnose(id, diagnose_info))) { + CLOG_LOG(WARN, "replay_service diagnose failed", K(ret), K(id)); + } else { + // do nothing + } + return ret; +} + +int ObLogService::diagnose_apply(const share::ObLSID &id, + ApplyDiagnoseInfo &diagnose_info) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "log_service is not inited", K(ret)); + } else if (OB_FAIL(apply_service_.diagnose(id, diagnose_info))) { + CLOG_LOG(WARN, "apply_service diagnose failed", K(ret), K(id)); + } else { + // do nothing + } + return ret; +} }//end of namespace logservice }//end of namespace oceanbase diff --git a/src/logservice/ob_log_service.h b/src/logservice/ob_log_service.h index 630b95761..89c4782ce 100644 --- a/src/logservice/ob_log_service.h +++ b/src/logservice/ob_log_service.h @@ -181,6 +181,9 @@ public: int iterate_palf(const ObFunction &func); int iterate_apply(const ObFunction &func); int iterate_replay(const ObFunction &func); + int diagnose_role_change(RCDiagnoseInfo &diagnose_info); + int diagnose_replay(const share::ObLSID &id, ReplayDiagnoseInfo &diagnose_info); + int diagnose_apply(const share::ObLSID &id, ApplyDiagnoseInfo &diagnose_info); palf::PalfEnv *get_palf_env() { return palf_env_; } // TODO by yunlong: temp solution, will by removed after Reporter be added in MTL diff --git a/src/logservice/ob_ls_adapter.cpp b/src/logservice/ob_ls_adapter.cpp index 3c96b5924..f733f7b3e 100644 --- a/src/logservice/ob_ls_adapter.cpp +++ b/src/logservice/ob_ls_adapter.cpp @@ -11,6 +11,7 @@ */ #include "ob_ls_adapter.h" +#include "replayservice/ob_log_replay_service.h" #include "replayservice/ob_replay_status.h" #include "storage/tx_storage/ob_ls_service.h" #include "storage/tx_storage/ob_ls_handle.h" @@ -58,6 +59,7 @@ int ObLSAdapter::replay(ObLogReplayTask *replay_task) int ret = OB_SUCCESS; ObLS *ls = NULL; ObLSHandle ls_handle; + int64_t start_ts = ObTimeUtility::fast_current_time(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; CLOG_LOG(ERROR, "ObLSAdapter not inited", K(ret)); @@ -73,6 +75,25 @@ int ObLSAdapter::replay(ObLogReplayTask *replay_task) replay_task->log_ts_))) { CLOG_LOG(WARN, "log stream do replay failed", K(ret), KPC(replay_task)); } + if (OB_EAGAIN == ret) { + if (common::OB_INVALID_TIMESTAMP == replay_task->first_handle_ts_) { + replay_task->first_handle_ts_ = start_ts; + replay_task->print_error_ts_ = start_ts; + } else if ((start_ts - replay_task->print_error_ts_) > MAX_SINGLE_RETRY_WARNING_TIME_THRESOLD) { + replay_task->retry_cost_ = start_ts - replay_task->first_handle_ts_; + CLOG_LOG(WARN, "single replay task retry cost too much time. replay may be delayed", + KPC(replay_task)); + replay_task->print_error_ts_ = start_ts; + } + } + replay_task->replay_cost_ = ObTimeUtility::fast_current_time() - start_ts; + if (replay_task->replay_cost_ > MAX_SINGLE_REPLAY_WARNING_TIME_THRESOLD) { + if (replay_task->replay_cost_ > MAX_SINGLE_REPLAY_ERROR_TIME_THRESOLD && !get_replay_is_writing_throttling()) { + CLOG_LOG(ERROR, "single replay task cost too much time. replay may be delayed", KPC(replay_task)); + } else { + CLOG_LOG(WARN, "single replay task cost too much time", KPC(replay_task)); + } + } return ret; } diff --git a/src/logservice/ob_ls_adapter.h b/src/logservice/ob_ls_adapter.h index 5bca6966d..841691b24 100644 --- a/src/logservice/ob_ls_adapter.h +++ b/src/logservice/ob_ls_adapter.h @@ -36,6 +36,9 @@ public: virtual int replay(ObLogReplayTask *replay_task); virtual int wait_append_sync(const share::ObLSID &ls_id); private: +const int64_t MAX_SINGLE_REPLAY_WARNING_TIME_THRESOLD = 100 * 1000; //100ms + const int64_t MAX_SINGLE_REPLAY_ERROR_TIME_THRESOLD = 1000 * 1000; //1s 单条日志回放执行时间超过此值报error + const int64_t MAX_SINGLE_RETRY_WARNING_TIME_THRESOLD = 5 * 1000 * 1000; //1s 单条日志回放重试超过此值报error bool is_inited_; storage::ObLSService *ls_service_; }; diff --git a/src/logservice/palf/log_state_mgr.cpp b/src/logservice/palf/log_state_mgr.cpp index 3ecdb5d9e..7ec7f981e 100644 --- a/src/logservice/palf/log_state_mgr.cpp +++ b/src/logservice/palf/log_state_mgr.cpp @@ -1090,5 +1090,18 @@ LogReplicaType LogStateMgr::get_replica_type() const { return ATOMIC_LOAD(&replica_type_); } + +int LogStateMgr::get_election_role(common::ObRole &role, int64_t &epoch) const +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_FAIL(election_->get_role(role, epoch))) { + PALF_LOG(WARN, "get elect role failed", K(ret)); + } else { + // do nothing + } + return ret; +} } // namespace palf } // namespace oceanbase diff --git a/src/logservice/palf/log_state_mgr.h b/src/logservice/palf/log_state_mgr.h index 771a105db..98af1781b 100644 --- a/src/logservice/palf/log_state_mgr.h +++ b/src/logservice/palf/log_state_mgr.h @@ -94,6 +94,7 @@ public: virtual int disable_vote(); virtual int enable_vote(); virtual LogReplicaType get_replica_type() const; + virtual int get_election_role(common::ObRole &role, int64_t &epoch) const; TO_STRING_KV(KP(this), K_(self), K_(palf_id), "role", role_to_string(role_), \ "state", replica_state_to_string(state_), K_(prepare_meta), K_(leader), K_(leader_epoch), \ K_(is_sync_enabled), K_(pending_end_lsn), K_(scan_disk_log_finished), K_(last_check_start_id), \ diff --git a/src/logservice/palf/palf_handle.cpp b/src/logservice/palf/palf_handle.cpp index af8c3d001..5961347fc 100644 --- a/src/logservice/palf/palf_handle.cpp +++ b/src/logservice/palf/palf_handle.cpp @@ -624,5 +624,11 @@ int PalfHandle::stat(PalfStat &palf_stat) const return palf_handle_impl_->stat(palf_stat); } +int PalfHandle::diagnose(PalfDiagnoseInfo &diagnose_info) const +{ + CHECK_VALID; + return palf_handle_impl_->diagnose(diagnose_info); +} + } // end namespace palf } // end namespace oceanbase diff --git a/src/logservice/palf/palf_handle.h b/src/logservice/palf/palf_handle.h index 16fc3ea3e..189250500 100644 --- a/src/logservice/palf/palf_handle.h +++ b/src/logservice/palf/palf_handle.h @@ -368,7 +368,7 @@ public: // OB_NOT_MASTER: self is not active leader // OB_EAGAIN: another change_acess_mode is running, try again later // NB: 1. if return OB_EAGAIN, caller need execute 'change_access_mode' again. - // 2. before execute 'change_access_mode', caller need execute 'get_access_mode' to + // 2. before execute 'change_access_mode', caller need execute 'get_access_mode' to // get 'mode_version' and pass it to 'change_access_mode' int change_access_mode(const int64_t proposal_id, const int64_t mode_version, @@ -427,6 +427,8 @@ public: int reset_election_priority(); int stat(PalfStat &palf_stat) const; + // @param [out] diagnose info, current diagnose info of palf + int diagnose(PalfDiagnoseInfo &diagnose_info) const; TO_STRING_KV(KP(palf_handle_impl_), KP(rc_cb_), KP(fs_cb_)); private: palf::PalfHandleImpl *palf_handle_impl_; diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index 5b462ed38..f5f36ca96 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -3508,5 +3508,14 @@ int PalfHandleImpl::stat(PalfStat &palf_stat) return ret; } +int PalfHandleImpl::diagnose(PalfDiagnoseInfo &diagnose_info) const +{ + int ret = OB_SUCCESS; + state_mgr_.get_role_and_state(diagnose_info.palf_role_, diagnose_info.palf_state_); + diagnose_info.palf_proposal_id_ = state_mgr_.get_proposal_id(); + state_mgr_.get_election_role(diagnose_info.election_role_, diagnose_info.election_epoch_); + return ret; +} + } // end namespace palf } // end namespace oceanbase diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index 3d8148a7b..67368822c 100644 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -84,6 +84,19 @@ struct PalfStat { K_(replica_type), K_(base_lsn), K_(end_lsn), K_(end_ts_ns), K_(max_lsn)); }; +struct PalfDiagnoseInfo { + common::ObRole election_role_; + int64_t election_epoch_; + common::ObRole palf_role_; + palf::ObReplicaState palf_state_; + int64_t palf_proposal_id_; + TO_STRING_KV(K(election_role_), + K(election_epoch_), + K(palf_role_), + K(palf_state_), + K(palf_proposal_id_)); +}; + struct LSKey { LSKey() : id_(-1) {} explicit LSKey(const int64_t id) : id_(id) {} @@ -576,6 +589,7 @@ public: virtual int revoke_leader(const int64_t proposal_id) = 0; virtual int stat(PalfStat &palf_stat) = 0; virtual int get_palf_epoch(int64_t &palf_epoch) const = 0; + virtual int diagnose(PalfDiagnoseInfo &diagnose_info) const = 0; }; class PalfHandleImpl : public IPalfHandleImpl @@ -842,6 +856,7 @@ public: const RegisterReturn reg_ret) override final; int handle_learner_req(const LogLearner &server, const LogLearnerReqType req_type) override final; int get_palf_epoch(int64_t &palf_epoch) const; + int diagnose(PalfDiagnoseInfo &diagnose_info) const; TO_STRING_KV(K_(palf_id), K_(self), K_(has_set_deleted)); private: int do_init_mem_(const int64_t palf_id, diff --git a/src/logservice/rcservice/ob_role_change_handler.cpp b/src/logservice/rcservice/ob_role_change_handler.cpp index 9105b07d3..0bb0a148e 100644 --- a/src/logservice/rcservice/ob_role_change_handler.cpp +++ b/src/logservice/rcservice/ob_role_change_handler.cpp @@ -20,6 +20,13 @@ namespace oceanbase using namespace common; namespace logservice { +void RCDiagnoseInfo::reset() +{ + id_ = -1; + state_ = TakeOverState::INVALID_TAKE_OVER_STATE; + log_type_ = ObLogBaseType::INVALID_LOG_BASE_TYPE; +} + ObRoleChangeHandler::ObRoleChangeHandler(): sub_role_change_handler_arr_() { reset(); @@ -80,7 +87,7 @@ void ObRoleChangeHandler::switch_to_follower_forcedly() } } -int ObRoleChangeHandler::switch_to_leader() +int ObRoleChangeHandler::switch_to_leader(RCDiagnoseInfo &diagnose_info) { int ret = OB_SUCCESS; ObSpinLockGuard guard(lock_); @@ -88,6 +95,7 @@ int ObRoleChangeHandler::switch_to_leader() ObIRoleChangeSubHandler *handler = sub_role_change_handler_arr_[i]; char sub_role_change_handler_str[OB_LOG_BASE_TYPE_STR_MAX_LEN] = {'\0'}; ObLogBaseType base_type = static_cast(i); + diagnose_info.log_type_ = base_type; bool has_defined_to_string = false; if (OB_SUCCESS == log_base_type_to_string(base_type, sub_role_change_handler_str, OB_LOG_BASE_TYPE_STR_MAX_LEN)) { diff --git a/src/logservice/rcservice/ob_role_change_handler.h b/src/logservice/rcservice/ob_role_change_handler.h index 5bf4a4b89..3c8f108a3 100644 --- a/src/logservice/rcservice/ob_role_change_handler.h +++ b/src/logservice/rcservice/ob_role_change_handler.h @@ -23,6 +23,47 @@ namespace oceanbase { namespace logservice { +enum TakeOverState { + INVALID_TAKE_OVER_STATE = 0, + WAIT_REPLAY_DONE = 1, + WAIT_RC_HANDLER_DONE = 2, + TAKE_OVER_FINISH = 3, + UNKNOWN_TAKE_OVER_STATE = 4, + MAX_TAKE_OVER_STATE = 5 +}; + +static inline +int takeover_state_to_string(const TakeOverState log_type, + char *str, + const int64_t str_len) +{ + int ret = OB_SUCCESS; + if (log_type == INVALID_TAKE_OVER_STATE) { + strncpy(str ,"INVALID_STATE", str_len); + } else if (log_type == WAIT_REPLAY_DONE) { + strncpy(str ,"WAIT_REPLAY_DONE", str_len); + } else if (log_type == WAIT_RC_HANDLER_DONE) { + strncpy(str ,"WAIT_RC_HANDLER_DONE", str_len); + } else if (log_type == TAKE_OVER_FINISH) { + strncpy(str ,"FINISH", str_len); + } else if (log_type == UNKNOWN_TAKE_OVER_STATE) { + strncpy(str ,"UNKNOWN", str_len); + } else { + ret = OB_INVALID_ARGUMENT; + } + return ret; +} + +struct RCDiagnoseInfo { + RCDiagnoseInfo() { reset(); } + void reset(); + int64_t id_; + TakeOverState state_; + ObLogBaseType log_type_; //仅当处于WAIT_RC_HANDLER_DONE时才有意义 + TO_STRING_KV(K(state_), + K(log_type_)); +}; + class ObRoleChangeHandler { public: ObRoleChangeHandler(); @@ -32,7 +73,7 @@ public: void unregister_handler(const ObLogBaseType &type); void switch_to_follower_forcedly(); - int switch_to_leader(); + int switch_to_leader(RCDiagnoseInfo &diagnose_info); // @retval: // 1. OB_SUCCESS // 2. OB_LS_NEED_REVOKE, ObRoleChangeService need revoke this LS. diff --git a/src/logservice/rcservice/ob_role_change_service.cpp b/src/logservice/rcservice/ob_role_change_service.cpp index a848087d4..d8744193b 100644 --- a/src/logservice/rcservice/ob_role_change_service.cpp +++ b/src/logservice/rcservice/ob_role_change_service.cpp @@ -57,6 +57,7 @@ ObRoleChangeService::ObRoleChangeService() : ls_service_(NULL), apply_service_(NULL), replay_service_(NULL), tg_id_(-1), + cur_task_info_(), is_inited_(false) { } @@ -82,6 +83,7 @@ int ObRoleChangeService::init(storage::ObLSService *ls_service, } else if (OB_FAIL(TG_CREATE_TENANT(tg_id, tg_id_))) { CLOG_LOG(WARN, "ObRoleChangeService TG_CREATE failed", K(ret)); } else { + cur_task_info_.reset(); ls_service_ = ls_service; apply_service_ = apply_service; replay_service_ = replay_service; @@ -128,6 +130,7 @@ void ObRoleChangeService::destroy() (void)wait(); TG_DESTROY(tg_id_); is_inited_ = false; + cur_task_info_.reset(); tg_id_ = -1; ls_service_ = NULL; apply_service_ = NULL; @@ -271,7 +274,7 @@ int ObRoleChangeService::handle_role_change_cb_event_for_restore_handler_( new_proposal_id, is_pending_state))) { CLOG_LOG(WARN, "ObRestoreHandler prepare_switch_role failed", K(ret), K(curr_role), K(curr_proposal_id), K(new_role), K(new_proposal_id)); - } else if (true == is_pending_state + } else if (true == is_pending_state || false == check_need_execute_role_change_(curr_proposal_id, curr_role, new_proposal_id, new_role)) { CLOG_LOG(INFO, "no need change role", K(ret), K(is_pending_state), K(curr_role), K(curr_proposal_id), K(new_role), K(new_proposal_id)); @@ -330,7 +333,7 @@ int ObRoleChangeService::handle_role_change_cb_event_for_log_handler_( } else if (true == is_pending_state) { CLOG_LOG(INFO, "curr state of palf is follower pending, need ignore this signal", K(curr_role), K(curr_proposal_id), K(new_role), K(new_proposal_id), K(is_pending_state)); - } else if (true == is_pending_state + } else if (true == is_pending_state || false == check_need_execute_role_change_(curr_proposal_id, curr_role, new_proposal_id, new_role)) { CLOG_LOG(INFO, "no need change role", K(ret), K(is_pending_state), K(curr_role), K(curr_proposal_id), K(new_role), K(new_proposal_id)); @@ -389,10 +392,10 @@ int ObRoleChangeService::handle_change_leader_event_for_restore_handler_( curr_proposal_id, new_role, new_proposal_id, is_pending_state))) { CLOG_LOG(WARN, "ObRestoreHandler prepare_switch_role failed", K(ret), K(curr_role), K(curr_proposal_id), K(new_role), K(new_proposal_id)); - } else if (true == is_pending_state + } else if (true == is_pending_state || curr_proposal_id != new_proposal_id || LEADER != curr_role || LEADER != new_role) { ls->get_log_restore_handler()->change_leader_to(dst_addr); - CLOG_LOG(INFO, "no need execute switch_leader_to_follower_gracefully_restore_, change leader directlly", + CLOG_LOG(INFO, "no need execute switch_leader_to_follower_gracefully_restore_, change leader directlly", K(ret), K(is_pending_state), K(curr_proposal_id), K(new_proposal_id), K(curr_role), K(new_role)); } else if (OB_FAIL(switch_leader_to_follower_gracefully_restore_(dst_addr, curr_proposal_id, ls))) { CLOG_LOG(WARN, "switch_leader_to_follower_gracefully_restore_ failed", K(ret), K(ls), K(dst_addr)); @@ -417,11 +420,11 @@ int ObRoleChangeService::handle_change_leader_event_for_log_handler_( curr_proposal_id, new_role, new_proposal_id, is_pending_state))) { CLOG_LOG(WARN, "ObLogHandler prepare_switch_role failed", K(ret), K(curr_role), K(curr_proposal_id), K(new_role), K(new_proposal_id)); - } else if (true == is_pending_state + } else if (true == is_pending_state || curr_proposal_id != new_proposal_id || LEADER != curr_role || LEADER != new_role) { // when log handler is not LEDAER, we also need execute change_leader_to, otherwise, the leader can not be changed by election. ls->get_log_handler()->change_leader_to(dst_addr); - CLOG_LOG(INFO, "no need execute switch_leader_to_follower_gracefully, change leader directlly", + CLOG_LOG(INFO, "no need execute switch_leader_to_follower_gracefully, change leader directlly", K(ret), K(is_pending_state), K(curr_proposal_id), K(new_proposal_id), K(curr_role), K(new_role)); } else if (OB_FAIL(switch_leader_to_follower_gracefully_(new_proposal_id, curr_proposal_id, dst_addr, ls))) { @@ -445,6 +448,8 @@ int ObRoleChangeService::switch_follower_to_leader_( ObTimeGuard time_guard("switch_to_leader", EACH_ROLE_CHANGE_COST_MAX_TIME); ObLogHandler *log_handler = ls->get_log_handler(); ObRoleChangeHandler *role_change_handler = ls->get_role_change_handler(); + ATOMIC_SET(&cur_task_info_.state_, TakeOverState::WAIT_REPLAY_DONE); + ATOMIC_SET(&cur_task_info_.id_, ls->get_ls_id().id()); if (OB_FAIL(log_handler->get_end_lsn(end_lsn))) { CLOG_LOG(WARN, "get_end_lsn failed", K(ret), KPC(ls)); // NB: order is vital!!! @@ -460,10 +465,13 @@ int ObRoleChangeService::switch_follower_to_leader_( || OB_FAIL(replay_service_->switch_to_leader(ls_id))) { } else if (FALSE_IT(log_handler->switch_role(new_role, new_proposal_id))) { CLOG_LOG(WARN, "ObLogHandler switch role failed", K(ret), K(new_role), K(new_proposal_id)); + } else if (FALSE_IT(ATOMIC_SET(&cur_task_info_.state_, TakeOverState::WAIT_RC_HANDLER_DONE))) { } else if (FALSE_IT(time_guard.click("role_change_handler->switch_to_leader")) - || OB_FAIL(role_change_handler->switch_to_leader())) { + || OB_FAIL(role_change_handler->switch_to_leader(cur_task_info_))) { CLOG_LOG(WARN, "ObRoleChangeHandler switch_to_leader failed", K(ret), KPC(ls)); } else { + ATOMIC_SET(&cur_task_info_.state_, TakeOverState::TAKE_OVER_FINISH); + ATOMIC_SET(&cur_task_info_.log_type_, ObLogBaseType::INVALID_LOG_BASE_TYPE); CLOG_LOG(INFO, "switch_follower_to_leader_ success", K(ret), KPC(ls)); } if (OB_FAIL(ret)) { @@ -548,7 +556,7 @@ int ObRoleChangeService::switch_leader_to_follower_gracefully_( K(new_role), K(new_proposal_id), K(dst_addr)); // wait apply service done my fail, we need : // 1. switch log handler to origin status. - // 2. resume role change handler + // 2. resume role change handler log_handler->switch_role(LEADER, curr_proposal_id); if (OB_FAIL(role_change_handler->resume_to_leader())) { CLOG_LOG(WARN, "resume to leader failed", K(ret), KPC(ls)); @@ -613,13 +621,17 @@ int ObRoleChangeService::switch_follower_to_leader_restore_( const ObRole new_role = LEADER; ObLogRestoreHandler *log_restore_handler = ls->get_log_restore_handler(); ObRoleChangeHandler *restore_role_change_handler = ls->get_restore_role_change_handler(); + ATOMIC_SET(&cur_task_info_.state_, TakeOverState::WAIT_RC_HANDLER_DONE); + ATOMIC_SET(&cur_task_info_.id_, ls->get_ls_id().id()); ObTimeGuard time_guard("switch_follower_to_leader_restore_", EACH_ROLE_CHANGE_COST_MAX_TIME); if (FALSE_IT(log_restore_handler->switch_role(new_role, new_proposal_id))) { } else if (FALSE_IT(time_guard.click("restore_role_change_handler->switch_to_leader")) - || OB_FAIL(restore_role_change_handler->switch_to_leader())) { + || OB_FAIL(restore_role_change_handler->switch_to_leader(cur_task_info_))) { CLOG_LOG(WARN, "restore_role_change_handler switch_to_leader failed", K(ret), K(new_role), K(new_proposal_id), K(ls)); } else { + ATOMIC_SET(&cur_task_info_.state_, TakeOverState::TAKE_OVER_FINISH); + ATOMIC_SET(&cur_task_info_.log_type_, ObLogBaseType::INVALID_LOG_BASE_TYPE); } if (OB_FAIL(ret)) { log_restore_handler->revoke_leader(); @@ -802,5 +814,20 @@ bool ObRoleChangeService::check_need_execute_role_change_( { return curr_proposal_id != new_proposal_id || curr_role != new_role; } + +int ObRoleChangeService::diagnose(RCDiagnoseInfo &diagnose_info) +{ + int ret = OB_SUCCESS; + if (diagnose_info.id_ == ATOMIC_LOAD(&cur_task_info_.id_)) { + // 当前日志流的切主任务正在被处理 + diagnose_info.state_ = ATOMIC_LOAD(&cur_task_info_.state_); + diagnose_info.log_type_ = ATOMIC_LOAD(&cur_task_info_.log_type_); + } else { + // 当前日志流切主任务尚未被处理, 可能是因为其他日志流的切主任务卡住 + diagnose_info.state_ = logservice::TakeOverState::UNKNOWN_TAKE_OVER_STATE; + diagnose_info.log_type_ = logservice::ObLogBaseType::INVALID_LOG_BASE_TYPE; + } + return ret; +} } // end namespace logservice } // end namespace oceanbase diff --git a/src/logservice/rcservice/ob_role_change_service.h b/src/logservice/rcservice/ob_role_change_service.h index 8897eaa1a..bd62c3cff 100644 --- a/src/logservice/rcservice/ob_role_change_service.h +++ b/src/logservice/rcservice/ob_role_change_service.h @@ -61,6 +61,7 @@ public: void handle(void *task); int on_role_change(const int64_t id) final override; int on_need_change_leader(const int64_t ls_id, const common::ObAddr &dst_addr) final override; + int diagnose(RCDiagnoseInfo &diagnose_info); private: int submit_role_change_event_(const RoleChangeEvent &event); @@ -136,6 +137,7 @@ private: logservice::ObLogApplyService *apply_service_; logservice::ObILogReplayService *replay_service_; int tg_id_; + RCDiagnoseInfo cur_task_info_; // for diagnose bool is_inited_; }; } // end namespace logservice diff --git a/src/logservice/replayservice/ob_log_replay_service.cpp b/src/logservice/replayservice/ob_log_replay_service.cpp index d33f59ced..f06b789a3 100644 --- a/src/logservice/replayservice/ob_log_replay_service.cpp +++ b/src/logservice/replayservice/ob_log_replay_service.cpp @@ -746,7 +746,8 @@ void ObLogReplayService::process_replay_ret_code_(const int ret_code, if (OB_SUCCESS != ret_code) { int64_t cur_ts = ObTimeUtility::fast_current_time(); if (replay_status.is_fatal_error(ret_code)) { - replay_status.set_err_info(replay_task.lsn_, cur_ts, ret_code); + replay_status.set_err_info(replay_task.lsn_, replay_task.log_ts_, replay_task.log_type_, + replay_task.replay_hint_, false, cur_ts, ret_code); CLOG_LOG(ERROR, "replay task encount fatal error", K(replay_status), K(replay_task), K(ret_code)); } else {/*do nothing*/} @@ -802,10 +803,8 @@ int ObLogReplayService::do_replay_task_(ObLogReplayTask *replay_task, { int ret = OB_SUCCESS; ObLS *ls; - int64_t start_ts = ObTimeUtility::fast_current_time(); get_replay_queue_index() = replay_queue_idx; ObLogReplayBuffer *replay_log_buff = NULL; - int64_t retry_time = 0; bool need_replay = false; if (OB_ISNULL(replay_status) || OB_ISNULL(replay_task)) { ret = OB_INVALID_ARGUMENT; @@ -848,24 +847,6 @@ int ObLogReplayService::do_replay_task_(ObLogReplayTask *replay_task, } } CLOG_LOG(TRACE, "do replay task", KPC(replay_task), KPC(replay_status)); - } else if (OB_EAGAIN == ret) { - if (common::OB_INVALID_TIMESTAMP == replay_task->first_handle_ts_) { - replay_task->first_handle_ts_ = start_ts; - replay_task->print_error_ts_ = start_ts; - } else if ((start_ts - replay_task->print_error_ts_) > MAX_SINGLE_RETRY_WARNING_TIME_THRESOLD) { - retry_time = start_ts - replay_task->first_handle_ts_; - CLOG_LOG(WARN, "single replay task retry cost too much time. replay may be delayed", - K(retry_time), KPC(replay_task), KPC(replay_status)); - replay_task->print_error_ts_ = start_ts; - } - } - int64_t replay_used_time = ObTimeUtility::fast_current_time() - start_ts; - if (replay_used_time > MAX_REPLAY_TIME_PER_ROUND) { - if (replay_used_time > MAX_SINGLE_REPLAY_WARNING_TIME_THRESOLD && !get_replay_is_writing_throttling()) { - CLOG_LOG(ERROR, "single replay task cost too much time. replay may be delayed", K(replay_used_time), KPC(replay_task), KPC(replay_status)); - } else { - CLOG_LOG(WARN, "single replay task cost too much time", K(replay_used_time), KPC(replay_task), KPC(replay_status)); - } } get_replay_queue_index() = -1; get_replay_is_writing_throttling() = false; @@ -940,6 +921,8 @@ int ObLogReplayService::try_submit_remained_log_replay_task_(ObReplayServiceSubm LSN cur_log_lsn = replay_task->lsn_; int64_t cur_log_size = replay_task->log_size_; int64_t cur_log_ts = replay_task->log_ts_; + int64_t replay_hint = replay_task->replay_hint_; + ObLogBaseType log_type = replay_task->log_type_; if (OB_FAIL(check_can_submit_log_replay_task_(replay_task, replay_status))) { // do nothing } else if (OB_SUCC(submit_log_replay_task_(*replay_task, *replay_status))) { @@ -953,7 +936,7 @@ int ObLogReplayService::try_submit_remained_log_replay_task_(ObReplayServiceSubm // log info回退 int64_t cur_ts = common::ObTimeUtility::fast_current_time(); CLOG_LOG(ERROR, "failed to update_next_submit_log_info", KR(ret), KPC(replay_status), K(cur_log_lsn), K(cur_log_size), K(cur_log_ts)); - replay_status->set_err_info(cur_log_lsn, cur_ts, ret); + replay_status->set_err_info(cur_log_lsn, cur_log_ts, log_type, replay_hint, true, cur_ts, ret); } } } @@ -1105,14 +1088,16 @@ int ObLogReplayService::fetch_and_submit_single_log_(ObReplayStatus &replay_stat CLOG_LOG(ERROR, "failed to update_next_to_submit_log_ts_allow_equal", KR(tmp_ret), K(cur_log_submit_ts), K(replay_status)); ret = OB_ERR_UNEXPECTED; - replay_status.set_err_info(cur_lsn, ObClockGenerator::getClock(), ret); + replay_status.set_err_info(cur_lsn, cur_log_submit_ts, replay_task->log_type_, + replay_task->replay_hint_, true, ObClockGenerator::getClock(), tmp_ret); free_replay_task_log_buf(replay_task); free_replay_task(replay_task); } else { submit_task->cache_replay_task(replay_task); } } else { - replay_status.set_err_info(cur_lsn, ObClockGenerator::getClock(), ret); + replay_status.set_err_info(cur_lsn, cur_log_submit_ts, replay_task->log_type_, + replay_task->replay_hint_ ,true, ObClockGenerator::getClock(), ret); free_replay_task_log_buf(replay_task); free_replay_task(replay_task); } @@ -1170,7 +1155,8 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta } else if (OB_FAIL(submit_task->update_next_to_submit_lsn(committed_end_lsn))) { // log info回退 CLOG_LOG(ERROR, "failed to update_next_submit_log_info", KR(ret), K(committed_end_lsn), KPC(replay_status)); - replay_status->set_err_info(committed_end_lsn, ObClockGenerator::getClock(), ret); + replay_status->set_err_info(committed_end_lsn, to_submit_log_ts, ObLogBaseType::INVALID_LOG_BASE_TYPE, + 0, true, ObClockGenerator::getClock(), ret); } else { CLOG_LOG(INFO, "no log to fetch but committed_end_lsn not reached, last log may be padding", KR(ret), K(to_submit_lsn), K(committed_end_lsn), K(to_submit_log_ts), KPC(replay_status)); @@ -1191,7 +1177,8 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta // log info回退 CLOG_LOG(ERROR, "failed to update_next_submit_log_info", KR(ret), K(to_submit_lsn), K(log_size), K(to_submit_log_ts)); - replay_status->set_err_info(to_submit_lsn, ObClockGenerator::getClock(), ret); + replay_status->set_err_info(to_submit_lsn, to_submit_log_ts, ObLogBaseType::INVALID_LOG_BASE_TYPE, + 0, true, ObClockGenerator::getClock(), ret); } } else if (OB_EAGAIN == ret) { // do nothing @@ -1420,6 +1407,26 @@ int ObLogReplayService::remove_all_ls_() return ret; } +int ObLogReplayService::diagnose(const share::ObLSID &id, + ReplayDiagnoseInfo &diagnose_info) +{ + int ret = OB_SUCCESS; + ObReplayStatus *replay_status = NULL; + ObReplayStatusGuard guard; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "replay service not init", K(ret)); + } else if (OB_FAIL(get_replay_status_(id, guard))) { + CLOG_LOG(WARN, "guard get replay status failed", K(ret), K(id)); + } else if (NULL == (replay_status = guard.get_replay_status())) { + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(WARN, "replay status is not exist", K(ret), K(id)); + } else if (OB_FAIL(replay_status->diagnose(diagnose_info))) { + CLOG_LOG(WARN, "replay status enable failed", K(ret), K(id)); + } + return ret; +} + bool ObLogReplayService::GetReplayStatusFunctor::operator()(const share::ObLSID &id, ObReplayStatus *replay_status) { diff --git a/src/logservice/replayservice/ob_log_replay_service.h b/src/logservice/replayservice/ob_log_replay_service.h index f78924d8c..4b46a9402 100644 --- a/src/logservice/replayservice/ob_log_replay_service.h +++ b/src/logservice/replayservice/ob_log_replay_service.h @@ -137,6 +137,7 @@ public: int update_replayable_point(const int64_t replayable_ts_ns); int stat_for_each(const common::ObFunction &func); int stat_all_ls_replay_process(int64_t &replayed_log_size, int64_t &unreplayed_log_size); + int diagnose(const share::ObLSID &id, ReplayDiagnoseInfo &diagnose_info); void inc_pending_task_size(const int64_t log_size); void dec_pending_task_size(const int64_t log_size); int64_t get_pending_task_size() const; @@ -192,8 +193,6 @@ private: int remove_all_ls_(); private: const int64_t MAX_REPLAY_TIME_PER_ROUND = 100 * 1000; //100ms - const int64_t MAX_SINGLE_REPLAY_WARNING_TIME_THRESOLD = 1000 * 1000; //1s 单条日志回放执行时间超过此值报error - const int64_t MAX_SINGLE_RETRY_WARNING_TIME_THRESOLD = 5 * 1000 * 1000; //1s 单条日志回放重试超过此值报error const int64_t MAX_SUBMIT_TIME_PER_ROUND = 1000 * 1000; //1s const int64_t TASK_QUEUE_WAIT_IN_GLOBAL_QUEUE_TIME_THRESHOLD = 5 * 1000 * 1000; //5s const int64_t PENDING_TASK_MEMORY_LIMIT = 128 * (1LL << 20); //128MB diff --git a/src/logservice/replayservice/ob_replay_status.cpp b/src/logservice/replayservice/ob_replay_status.cpp index 94a6e8d3b..e86f2a89b 100644 --- a/src/logservice/replayservice/ob_replay_status.cpp +++ b/src/logservice/replayservice/ob_replay_status.cpp @@ -408,6 +408,11 @@ int64_t ObReplayServiceReplayTask::idx() const int ObReplayServiceReplayTask::get_min_unreplayed_log_info(LSN &lsn, int64_t &log_ts, + int64_t &replay_hint, + ObLogBaseType &log_type, + int64_t &first_handle_ts, + int64_t &replay_cost, + int64_t &retry_cost, bool &is_queue_empty) { int ret = OB_SUCCESS; @@ -417,6 +422,11 @@ int ObReplayServiceReplayTask::get_min_unreplayed_log_info(LSN &lsn, if (NULL != top_item && NULL != (replay_task = static_cast(top_item))) { lsn = replay_task->lsn_; log_ts = replay_task->log_ts_; + replay_hint = replay_task->replay_hint_; + log_type = replay_task->log_type_; + first_handle_ts = replay_task->first_handle_ts_; + replay_cost = replay_task->replay_cost_; + retry_cost = replay_task->retry_cost_; is_queue_empty = false; } else { is_queue_empty = true; @@ -914,51 +924,58 @@ int ObReplayStatus::get_ls_id(share::ObLSID &id) } int ObReplayStatus::get_min_unreplayed_log_info(LSN &lsn, - int64_t &log_ts) + int64_t &log_ts, + int64_t &replay_hint, + ObLogBaseType &log_type, + int64_t &first_handle_ts, + int64_t &replay_cost, + int64_t &retry_cost) { int ret = OB_SUCCESS; int64_t base_log_ts = 0; - do { - RLockGuard rlock_guard(rwlock_); - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - CLOG_LOG(WARN, "replay status is not inited", K(ret)); - } else if (!is_enabled_) { - ret = OB_STATE_NOT_MATCH; - if (palf_reach_time_interval(1 * 1000 * 1000, get_log_info_debug_time_)) { - CLOG_LOG(WARN, "replay status is not enabled", K(ret), KPC(this)); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "replay status is not inited", K(ret)); + } else if (!is_enabled_) { + ret = OB_STATE_NOT_MATCH; + if (palf_reach_time_interval(1 * 1000 * 1000, get_log_info_debug_time_)) { + CLOG_LOG(WARN, "replay status is not enabled", K(ret), KPC(this)); + } + } else if (OB_FAIL(submit_log_task_.get_next_to_submit_log_info(lsn, log_ts))) { + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(ERROR, "get_next_to_submit_log_ts failed", K(ret)); + } else if (OB_FAIL(submit_log_task_.get_base_log_ts(base_log_ts))) { + CLOG_LOG(ERROR, "get_base_log_ts failed", K(ret)); + } else if (log_ts <= base_log_ts) { + //拉到的日志尚未超过过滤点 + log_ts = base_log_ts; + if (palf_reach_time_interval(5 * 1000 * 1000, get_log_info_debug_time_)) { + CLOG_LOG(INFO, "get_min_unreplayed_log_info in skip state", K(lsn), K(log_ts), KPC(this)); + } + } else { + LSN queue_lsn; + int64_t queue_ts = OB_INVALID_TIMESTAMP; + bool is_queue_empty = true; + for (int64_t i = 0; OB_SUCC(ret) && i < REPLAY_TASK_QUEUE_SIZE; ++i) { + if (OB_FAIL(task_queues_[i].get_min_unreplayed_log_info(queue_lsn, queue_ts, replay_hint, log_type, + first_handle_ts, replay_cost, retry_cost, is_queue_empty))) { + CLOG_LOG(ERROR, "task_queue get_min_unreplayed_log_info failed", K(ret), K(task_queues_[i])); + } else if (!is_queue_empty + && queue_lsn < lsn + && queue_ts < log_ts) { + lsn = queue_lsn; + log_ts = queue_ts; } } - } while (0); - if (OB_SUCC(ret)) { - if (OB_FAIL(submit_log_task_.get_next_to_submit_log_info(lsn, log_ts))) { - ret = OB_ERR_UNEXPECTED; - CLOG_LOG(ERROR, "get_next_to_submit_log_ts failed", K(ret)); - } else if (OB_FAIL(submit_log_task_.get_base_log_ts(base_log_ts))) { - CLOG_LOG(ERROR, "get_base_log_ts failed", K(ret)); - } else if (log_ts <= base_log_ts) { - //拉到的日志尚未超过过滤点 - log_ts = base_log_ts; - if (palf_reach_time_interval(5 * 1000 * 1000, get_log_info_debug_time_)) { - CLOG_LOG(INFO, "get_min_unreplayed_log_info in skip state", K(lsn), K(log_ts), KPC(this)); - } - } else { - LSN queue_lsn; - int64_t queue_ts = OB_INVALID_TIMESTAMP; - bool is_queue_empty = true; - for (int64_t i = 0; OB_SUCC(ret) && i < REPLAY_TASK_QUEUE_SIZE; ++i) { - if (OB_FAIL(task_queues_[i].get_min_unreplayed_log_info(queue_lsn, queue_ts, is_queue_empty))) { - CLOG_LOG(ERROR, "task_queue get_min_unreplayed_log_info failed", K(ret), K(task_queues_[i])); - } else if (!is_queue_empty - && queue_lsn < lsn - && queue_ts < log_ts) { - lsn = queue_lsn; - log_ts = queue_ts; - } - } - if (palf_reach_time_interval(5 * 1000 * 1000, get_log_info_debug_time_)) { - CLOG_LOG(INFO, "get_min_unreplayed_log_info", K(lsn), K(log_ts), KPC(this)); - } + if (palf_reach_time_interval(5 * 1000 * 1000, get_log_info_debug_time_)) { + CLOG_LOG(INFO, "get_min_unreplayed_log_info", K(lsn), K(log_ts), KPC(this)); + } + } + if (OB_SUCC(ret) && !is_enabled()) { + //double check + ret = OB_STATE_NOT_MATCH; + if (palf_reach_time_interval(1 * 1000 * 1000, get_log_info_debug_time_)) { + CLOG_LOG(WARN, "replay status is not enabled", K(ret), KPC(this)); } } return ret; @@ -966,14 +983,26 @@ int ObReplayStatus::get_min_unreplayed_log_info(LSN &lsn, int ObReplayStatus::get_min_unreplayed_lsn(LSN &lsn) { - int64_t unused = OB_INVALID_TIMESTAMP; - return get_min_unreplayed_log_info(lsn, unused); + int64_t unused_log_ts = OB_INVALID_TIMESTAMP; + int64_t unused_replay_hint = 0; + ObLogBaseType unused_log_type = ObLogBaseType::INVALID_LOG_BASE_TYPE; + int64_t unused_first_handle_ts = 0; + int64_t unused_replay_cost = 0; + int64_t unused_retry_cost = 0; + return get_min_unreplayed_log_info(lsn, unused_log_ts, unused_replay_hint, unused_log_type, + unused_first_handle_ts, unused_replay_cost, unused_retry_cost); } int ObReplayStatus::get_min_unreplayed_log_ts_ns(int64_t &log_ts) { - LSN unused; - return get_min_unreplayed_log_info(unused, log_ts); + LSN unused_lsn; + int64_t unused_replay_hint = 0; + int64_t unused_first_handle_ts = 0; + ObLogBaseType unused_log_type = ObLogBaseType::INVALID_LOG_BASE_TYPE; + int64_t unused_replay_cost = 0; + int64_t unused_retry_cost = 0; + return get_min_unreplayed_log_info(unused_lsn, log_ts, unused_replay_hint, unused_log_type, + unused_first_handle_ts, unused_replay_cost, unused_retry_cost); } int ObReplayStatus::get_replay_process(int64_t &replayed_log_size, @@ -1199,11 +1228,18 @@ int ObReplayStatus::check_replay_barrier(ObLogReplayTask *replay_task, return ret; } -void ObReplayStatus::set_err_info(const LSN &lsn, +void ObReplayStatus::set_err_info(const palf::LSN &lsn, + const uint64_t scn, + const ObLogBaseType &log_type, + const int64_t replay_hint, + const bool is_submit_err, const int64_t err_ts, const int err_ret) { err_info_.lsn_ = lsn; + err_info_.scn_ = scn; + err_info_.log_type_ = log_type; + err_info_.is_submit_err_ = is_submit_err; err_info_.err_ts_ = err_ts; err_info_.err_ret_ = err_ret; } @@ -1253,5 +1289,65 @@ int ObReplayStatus::stat(LSReplayStat &stat) const return ret; } +int ObReplayStatus::diagnose(ReplayDiagnoseInfo &diagnose_info) +{ + int ret = OB_SUCCESS; + RLockGuard rlock_guard(rwlock_); + LSN min_unreplayed_lsn; + int64_t min_unreplayed_scn; + int64_t replay_hint = 0; + ObLogBaseType log_type = ObLogBaseType::INVALID_LOG_BASE_TYPE; + char log_type_str[common::MAX_SERVICE_TYPE_BUF_LENGTH]; + int64_t first_handle_time = 0; + int64_t replay_cost = 0; + int64_t retry_cost = 0; + int replay_ret = OB_SUCCESS; + bool is_submit_err = false; + diagnose_info.diagnose_str_.reset(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (!is_enabled_) { + ret = OB_STATE_NOT_MATCH; + } else if (OB_FAIL(get_min_unreplayed_log_info(min_unreplayed_lsn, min_unreplayed_scn, replay_hint, + log_type, first_handle_time, replay_cost, retry_cost))) { + CLOG_LOG(WARN, "get_min_unreplayed_log_info failed", KPC(this), K(ret)); + } else if (FALSE_IT(diagnose_info.max_replayed_lsn_ = min_unreplayed_lsn) || + FALSE_IT(diagnose_info.max_replayed_scn_ = min_unreplayed_scn - 1)) { + } else if (OB_FAIL(log_base_type_to_string(log_type, log_type_str, common::MAX_SERVICE_TYPE_BUF_LENGTH))) { + CLOG_LOG(WARN, "log_base_type_to_string failed", K(ret), K(log_type)); + } else if (OB_SUCCESS != err_info_.err_ret_) { + // 发生过不可重试的错误, 此场景不需要诊断最小未回放位日志 + min_unreplayed_lsn = err_info_.lsn_; + min_unreplayed_scn = err_info_.scn_; + replay_hint = err_info_.replay_hint_; + log_type = err_info_.log_type_; + first_handle_time = err_info_.err_ts_; + replay_cost = 0; + retry_cost = 0; + replay_ret = err_info_.err_ret_; + is_submit_err = err_info_.is_submit_err_; + } else if (0 < retry_cost || 0 < replay_cost) { + replay_ret = OB_EAGAIN; + } + if (OB_SUCC(ret)) { + if (OB_FAIL(diagnose_info.diagnose_str_.append_fmt("ret:%d; " + "min_unreplayed_lsn:%ld; " + "min_unreplayed_scn:%ld; " + "replay_hint:%ld; " + "log_type:%s; " + "replay_cost:%ld; " + "retry_cost:%ld; " + "first_handle_time:%ld;" , + replay_ret, min_unreplayed_lsn.val_, + min_unreplayed_scn, replay_hint, + is_submit_err ? "REPLAY_SUBMIT" : log_type_str, + replay_cost, retry_cost, first_handle_time))) { + CLOG_LOG(WARN, "append diagnose str failed", K(ret), K(replay_ret), K(min_unreplayed_lsn), K(min_unreplayed_scn), + K(replay_hint), K(is_submit_err), K(replay_cost), K(retry_cost), K(first_handle_time)); + } + } + return ret; +} + } // namespace logservice } diff --git a/src/logservice/replayservice/ob_replay_status.h b/src/logservice/replayservice/ob_replay_status.h index e2285936f..73b047276 100644 --- a/src/logservice/replayservice/ob_replay_status.h +++ b/src/logservice/replayservice/ob_replay_status.h @@ -75,6 +75,15 @@ struct LSReplayStat K(pending_cnt_)); }; +struct ReplayDiagnoseInfo +{ + palf::LSN max_replayed_lsn_; + int64_t max_replayed_scn_; + ObSqlString diagnose_str_; + TO_STRING_KV(K(max_replayed_lsn_), + K(max_replayed_scn_)); +}; + //此类型为前向barrier日志专用, 与ObLogReplayTask分开分配 //因此此结构的内存需要单独释放 struct ObLogReplayBuffer @@ -131,6 +140,8 @@ public: bool is_raw_write_; int64_t first_handle_ts_; int64_t print_error_ts_; + int64_t replay_cost_; //此任务重试的总耗时时间 + int64_t retry_cost_; //此任务回放成功时的当次处理时间 void *log_buf_; TO_STRING_KV(K(ls_id_), @@ -367,6 +378,11 @@ public: } int get_min_unreplayed_log_info(palf::LSN &lsn, int64_t &log_ts, + int64_t &replay_hint, + ObLogBaseType &log_type, + int64_t &first_handle_ts, + int64_t &replay_cost, + int64_t &retry_cost, bool &is_queue_empty); private: Link *pop_() @@ -421,12 +437,20 @@ public: } void reset() { lsn_.reset(); + scn_ = 0; + log_type_ = ObLogBaseType::INVALID_LOG_BASE_TYPE; + is_submit_err_ = false; err_ts_ = 0; err_ret_ = common::OB_SUCCESS; } - TO_STRING_KV(K(lsn_), K(err_ts_), K(err_ret_)); + TO_STRING_KV(K(lsn_), K(scn_), K(log_type_), + K(is_submit_err_), K(err_ts_), K(err_ret_)); public: palf::LSN lsn_; + uint64_t scn_; + ObLogBaseType log_type_; + int64_t replay_hint_; + bool is_submit_err_; //is submit log task error occured int64_t err_ts_; //the timestamp that partition encounts fatal error int err_ret_; //the ret code of fatal error }; @@ -496,7 +520,12 @@ public: int get_min_unreplayed_lsn(palf::LSN &lsn); int get_min_unreplayed_log_ts_ns(int64_t &log_ts); int get_min_unreplayed_log_info(palf::LSN &lsn, - int64_t &log_ts); + int64_t &log_ts, + int64_t &replay_hint, + ObLogBaseType &log_type, + int64_t &first_handle_ts, + int64_t &replay_cost, + int64_t &retry_cost); int get_replay_process(int64_t &replayed_log_size, int64_t &unreplayed_log_size); //提交日志检查barrier状态 int check_submit_barrier(); @@ -508,7 +537,7 @@ public: void set_post_barrier_submitted(const palf::LSN &lsn); int set_post_barrier_finished(const palf::LSN &lsn); int stat(LSReplayStat &stat) const; - + int diagnose(ReplayDiagnoseInfo &diagnose_info); inline void inc_ref() { ATOMIC_INC(&ref_cnt_); @@ -522,7 +551,13 @@ public: return replay_hint & (REPLAY_TASK_QUEUE_SIZE - 1); } // 用于记录日志流级别的错误, 此类错误不可恢复 - void set_err_info(const palf::LSN &lsn, const int64_t err_ts, const int err_ret); + void set_err_info(const palf::LSN &lsn, + const uint64_t scn, + const ObLogBaseType &log_type, + const int64_t replay_hint, + const bool is_submit_err, + const int64_t err_ts, + const int err_ret); bool has_fatal_error() const { return is_fatal_error(err_info_.err_ret_); diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index 489236d91..0faa623e9 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -229,6 +229,7 @@ ob_set_subtarget(ob_server virtual_table virtual_table/ob_all_virtual_log_stat.cpp virtual_table/ob_all_virtual_apply_stat.cpp virtual_table/ob_all_virtual_replay_stat.cpp + virtual_table/ob_all_virtual_ha_diagnose.cpp virtual_table/ob_global_variables.cpp virtual_table/ob_gv_sql.cpp virtual_table/ob_gv_sql_audit.cpp diff --git a/src/observer/virtual_table/ob_all_virtual_ha_diagnose.cpp b/src/observer/virtual_table/ob_all_virtual_ha_diagnose.cpp new file mode 100644 index 000000000..ff2aa7086 --- /dev/null +++ b/src/observer/virtual_table/ob_all_virtual_ha_diagnose.cpp @@ -0,0 +1,234 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "ob_all_virtual_ha_diagnose.h" +#include "lib/ob_define.h" +#include "lib/ob_errno.h" +#include "lib/oblog/ob_log_module.h" +#include "storage/tx_storage/ob_ls_service.h" + +namespace oceanbase +{ +namespace observer +{ +int ObAllVirtualHADiagnose::inner_get_next_row(common::ObNewRow *&row) +{ + int ret = OB_SUCCESS; + if (false == start_to_read_) { + auto func_iter_ls = [&](const storage::ObLS &ls) -> int + { + int ret = OB_SUCCESS; + storage::DiagnoseInfo diagnose_info; + if (OB_FAIL(ls.diagnose(diagnose_info))) { + SERVER_LOG(WARN, "ls stat diagnose info failed", K(ret), K(ls)); + } else if (OB_FAIL(insert_stat_(diagnose_info))) { + SERVER_LOG(WARN, "insert stat failed", K(ret), K(diagnose_info)); + } else if (OB_FAIL(scanner_.add_row(cur_row_))) { + SERVER_LOG(WARN, "iter diagnose info faild", KR(ret), K(diagnose_info)); + } else { + SERVER_LOG(INFO, "iter diagnose info succ", K(diagnose_info)); + } + return ret; + }; + auto func_iterate_tenant = [&func_iter_ls]() -> int + { + int ret = OB_SUCCESS; + storage::ObLSService *ls_service = MTL(storage::ObLSService*); + if (NULL == ls_service) { + SERVER_LOG(INFO, "tenant has no ObLSService", K(MTL_ID())); + } else if (OB_FAIL(ls_service->iterate_diagnose(func_iter_ls))) { + SERVER_LOG(WARN, "iter ls failed", K(ret)); + } else { + SERVER_LOG(INFO, "iter ls succ", K(ret)); + } + return ret; + }; + if (NULL == omt_) { + SERVER_LOG(INFO, "omt is NULL", K(MTL_ID())); + } else if (OB_FAIL(omt_->operate_each_tenant_for_sys_or_self(func_iterate_tenant))) { + SERVER_LOG(WARN, "iter tenant failed", K(ret)); + } else { + scanner_it_ = scanner_.begin(); + start_to_read_ = true; + } + } + if (OB_SUCC(ret) && start_to_read_) { + if (OB_FAIL(scanner_it_.get_next_row(cur_row_))) { + if (OB_ITER_END != ret) { + SERVER_LOG(WARN, "get next row failed", K(ret)); + } + } else { + row = &cur_row_; + } + } + return ret; +} + +int ObAllVirtualHADiagnose::insert_stat_(storage::DiagnoseInfo &diagnose_info) +{ + int ret = OB_SUCCESS; + const int64_t count = output_column_ids_.count(); + for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) { + uint64_t col_id = output_column_ids_.at(i); + switch (col_id) { + case TENANT_ID: + cur_row_.cells_[i].set_int(MTL_ID()); + break; + case LS_ID: + cur_row_.cells_[i].set_int(diagnose_info.ls_id_); + break; + case SVR_IP: + if (false == GCTX.self_addr().ip_to_string(ip_, common::OB_IP_PORT_STR_BUFF)) { + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(WARN, "ip_to_string failed", K(ret)); + } else { + cur_row_.cells_[i].set_varchar(ObString::make_string(ip_)); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); + } + break; + case SVR_PORT: + cur_row_.cells_[i].set_int(GCTX.self_addr().get_port()); + break; + case ELECTION_ROLE: + if (OB_FAIL(role_to_string(diagnose_info.palf_diagnose_info_.election_role_, + election_role_str_, sizeof(election_role_str_)))) { + SERVER_LOG(WARN, "role_to_string failed", K(ret), K(diagnose_info)); + } else { + cur_row_.cells_[i].set_varchar(ObString::make_string(election_role_str_)); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation( + ObCharset::get_default_charset())); + } + break; + case ELECTION_EPOCH: + cur_row_.cells_[i].set_int(diagnose_info.palf_diagnose_info_.election_epoch_); + break; + case PALF_ROLE: + if (OB_FAIL(role_to_string(diagnose_info.palf_diagnose_info_.palf_role_, + palf_role_str_, sizeof(palf_role_str_)))) { + SERVER_LOG(WARN, "role_to_string failed", K(ret), K(diagnose_info)); + } else { + cur_row_.cells_[i].set_varchar(ObString::make_string(palf_role_str_)); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation( + ObCharset::get_default_charset())); + } + break; + case PALF_STATE: + cur_row_.cells_[i].set_varchar(ObString::make_string(replica_state_to_string(diagnose_info.palf_diagnose_info_.palf_state_))); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation( + ObCharset::get_default_charset())); + break; + case PALF_PROPOSAL_ID: + cur_row_.cells_[i].set_int(diagnose_info.palf_diagnose_info_.palf_proposal_id_); + break; + case LOG_HANDLER_ROLE: + if (OB_FAIL(role_to_string(diagnose_info.log_handler_diagnose_info_.log_handler_role_, + log_handler_role_str_, sizeof(log_handler_role_str_)))) { + SERVER_LOG(WARN, "role_to_string failed", K(ret), K(diagnose_info)); + } else { + cur_row_.cells_[i].set_varchar(ObString::make_string(log_handler_role_str_)); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation( + ObCharset::get_default_charset())); + } + break; + case LOG_HANDLER_PROPOSAL_ID: + cur_row_.cells_[i].set_int(diagnose_info.log_handler_diagnose_info_.log_handler_proposal_id_); + break; + case LOG_HANDLER_TAKEOVER_STATE: + if (OB_FAIL(takeover_state_to_string(diagnose_info.rc_diagnose_info_.state_, + log_handler_takeover_state_str_, + sizeof(log_handler_takeover_state_str_)))) { + SERVER_LOG(WARN, "takeover_state_to_string failed", K(ret), K(diagnose_info)); + } else { + cur_row_.cells_[i].set_varchar(ObString::make_string(log_handler_takeover_state_str_)); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation( + ObCharset::get_default_charset())); + } + break; + case LOG_HANDLER_TAKEOVER_LOG_TYPE: + if (OB_FAIL(log_base_type_to_string(diagnose_info.rc_diagnose_info_.log_type_, + log_handler_takeover_log_type_str_, + sizeof(log_handler_takeover_log_type_str_)))) { + SERVER_LOG(WARN, "log_base_type_to_string failed", K(ret), K(diagnose_info)); + } else { + cur_row_.cells_[i].set_varchar(ObString::make_string(log_handler_takeover_log_type_str_)); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation( + ObCharset::get_default_charset())); + } + break; + case MAX_APPLIED_SCN: { + cur_row_.cells_[i].set_uint64(static_cast(diagnose_info.apply_diagnose_info_.max_applied_scn_)); + break; + } + case MAX_REPALYED_LSN: { + cur_row_.cells_[i].set_uint64(diagnose_info.replay_diagnose_info_.max_replayed_lsn_.val_); + break; + } + case MAX_REPLAYED_SCN: { + cur_row_.cells_[i].set_uint64(diagnose_info.replay_diagnose_info_.max_replayed_scn_); + break; + } + case REPLAY_DIAGNOSE_INFO: { + cur_row_.cells_[i].set_varchar((diagnose_info.replay_diagnose_info_.diagnose_str_.string())); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation( + ObCharset::get_default_charset())); + break; + } + case GC_STATE: + if (OB_FAIL(gc_state_to_string(diagnose_info.gc_diagnose_info_.gc_state_, + gc_state_str_, + sizeof(gc_state_str_)))) { + SERVER_LOG(WARN, "gc_state_to_string failed", K(ret), K(diagnose_info)); + } else { + cur_row_.cells_[i].set_varchar(ObString::make_string(gc_state_str_)); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation( + ObCharset::get_default_charset())); + } + break; + case GC_START_TS: { + cur_row_.cells_[i].set_int(diagnose_info.gc_diagnose_info_.gc_start_ts_); + break; + } + //TODO: @keqing.llt archive_scn列目前只占位 + case ARCHIVE_SCN: { + cur_row_.cells_[i].set_uint64(0); + break; + } + case CHECKPOINT_SCN: { + cur_row_.cells_[i].set_uint64(static_cast(diagnose_info.checkpoint_diagnose_info_.checkpoint_)); + break; + } + case MIN_REC_SCN: { + cur_row_.cells_[i].set_uint64(static_cast(diagnose_info.checkpoint_diagnose_info_.min_rec_scn_)); + break; + } + case MIN_REC_SCN_LOG_TYPE: { + if (OB_FAIL(log_base_type_to_string(diagnose_info.checkpoint_diagnose_info_.log_type_, + min_rec_log_scn_log_type_str_, + sizeof(min_rec_log_scn_log_type_str_)))) { + SERVER_LOG(WARN, "log_base_type_to_string failed", K(ret), K(diagnose_info)); + } else { + cur_row_.cells_[i].set_varchar(ObString::make_string(min_rec_log_scn_log_type_str_)); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation( + ObCharset::get_default_charset())); + } + break; + } + default: + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(WARN, "unkown column"); + break; + } + } + return ret; +} +} // namespace observer +} // namespace oceanbase diff --git a/src/observer/virtual_table/ob_all_virtual_ha_diagnose.h b/src/observer/virtual_table/ob_all_virtual_ha_diagnose.h new file mode 100644 index 000000000..2aed4c8c3 --- /dev/null +++ b/src/observer/virtual_table/ob_all_virtual_ha_diagnose.h @@ -0,0 +1,75 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_OBSERVER_OB_ALL_VIRTUAL_HA_DIAGNOSE_H_ +#define OCEANBASE_OBSERVER_OB_ALL_VIRTUAL_HA_DIAGNOSE_H_ + +#include "common/row/ob_row.h" +#include "observer/omt/ob_multi_tenant.h" +#include "share/ob_virtual_table_scanner_iterator.h" +#include "share/ob_scanner.h" +#include "storage/ls/ob_ls.h" + +namespace oceanbase +{ +namespace observer +{ +enum IOStatColumn +{ + TENANT_ID = common::OB_APP_MIN_COLUMN_ID, + LS_ID, + SVR_IP, + SVR_PORT, + ELECTION_ROLE, + ELECTION_EPOCH, + PALF_ROLE, + PALF_STATE, + PALF_PROPOSAL_ID, + LOG_HANDLER_ROLE, + LOG_HANDLER_PROPOSAL_ID, + LOG_HANDLER_TAKEOVER_STATE, + LOG_HANDLER_TAKEOVER_LOG_TYPE, + MAX_APPLIED_SCN, + MAX_REPALYED_LSN, + MAX_REPLAYED_SCN, + REPLAY_DIAGNOSE_INFO, + GC_STATE, + GC_START_TS, + ARCHIVE_SCN, + CHECKPOINT_SCN, + MIN_REC_SCN, + MIN_REC_SCN_LOG_TYPE, +}; + +class ObAllVirtualHADiagnose : public common::ObVirtualTableScannerIterator +{ +public: + explicit ObAllVirtualHADiagnose(omt::ObMultiTenant *omt) : omt_(omt) {} +public: + virtual int inner_get_next_row(common::ObNewRow *&row); +private: + int insert_stat_(storage::DiagnoseInfo &diagnose_info); +private: + static const int64_t VARCHAR_32 = 32; + char ip_[common::OB_IP_PORT_STR_BUFF] = {'\0'}; + char election_role_str_[VARCHAR_32] = {'\0'}; + char palf_role_str_[VARCHAR_32] = {'\0'}; + char log_handler_role_str_[VARCHAR_32] = {'\0'}; + char log_handler_takeover_state_str_[VARCHAR_32] = {'\0'}; + char log_handler_takeover_log_type_str_[VARCHAR_32] = {'\0'}; + char gc_state_str_[VARCHAR_32] = {'\0'}; + char min_rec_log_scn_log_type_str_[VARCHAR_32] = {'\0'}; + omt::ObMultiTenant *omt_; +}; +} // namespace observer +} // namespace oceanbase +#endif /* OCEANBASE_OBSERVER_OB_ALL_VIRTUAL_HA_DIAGNOSE_H_ */ \ No newline at end of file diff --git a/src/observer/virtual_table/ob_all_virtual_log_stat.cpp b/src/observer/virtual_table/ob_all_virtual_log_stat.cpp index 450b03b20..7cf75070f 100644 --- a/src/observer/virtual_table/ob_all_virtual_log_stat.cpp +++ b/src/observer/virtual_table/ob_all_virtual_log_stat.cpp @@ -263,10 +263,10 @@ int ObAllVirtualPalfStat::member_list_to_string_( tmp_member_list))) { SERVER_LOG(WARN, "fail to transform member_list", KR(ret), K(member_list)); } else if (OB_FAIL(share::ObLSReplica::member_list2text( - tmp_member_list, - member_list_buf_, + tmp_member_list, + member_list_buf_, MAX_MEMBER_LIST_LENGTH))) { - SERVER_LOG(WARN, "member_list2text failed", KR(ret), + SERVER_LOG(WARN, "member_list2text failed", KR(ret), K(member_list), K(tmp_member_list), K_(member_list_buf)); } return ret; diff --git a/src/observer/virtual_table/ob_virtual_table_iterator_factory.cpp b/src/observer/virtual_table/ob_virtual_table_iterator_factory.cpp index be8f887bc..5e7ba3409 100644 --- a/src/observer/virtual_table/ob_virtual_table_iterator_factory.cpp +++ b/src/observer/virtual_table/ob_virtual_table_iterator_factory.cpp @@ -174,6 +174,7 @@ #include "observer/virtual_table/ob_all_virtual_dtl_interm_result_monitor.h" #include "observer/virtual_table/ob_all_virtual_log_stat.h" #include "observer/virtual_table/ob_all_virtual_apply_stat.h" +#include "observer/virtual_table/ob_all_virtual_ha_diagnose.h" #include "observer/virtual_table/ob_all_virtual_replay_stat.h" #include "observer/virtual_table/ob_all_virtual_unit.h" #include "observer/virtual_table/ob_all_virtual_server.h" @@ -1600,6 +1601,19 @@ int ObVTIterCreator::create_vt_iter(ObVTableScanParam ¶ms, } break; } + case OB_ALL_VIRTUAL_HA_DIAGNOSE_TID: { + ObAllVirtualHADiagnose *diagnose_info = NULL; + omt::ObMultiTenant *omt = GCTX.omt_; + if (OB_UNLIKELY(NULL == omt)) { + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(WARN, "get tenant fail", K(ret)); + } else if (OB_FAIL(NEW_VIRTUAL_TABLE(ObAllVirtualHADiagnose, diagnose_info, omt))) { + SERVER_LOG(ERROR, "ObAllVirtualHADiagnose construct fail", K(ret)); + } else { + vt_iter = static_cast(diagnose_info); + } + break; + } END_CREATE_VT_ITER_SWITCH_LAMBDA BEGIN_CREATE_VT_ITER_SWITCH_LAMBDA diff --git a/src/share/inner_table/ob_inner_table_schema.12301_12350.cpp b/src/share/inner_table/ob_inner_table_schema.12301_12350.cpp index f74c3e745..d88c718f7 100644 --- a/src/share/inner_table/ob_inner_table_schema.12301_12350.cpp +++ b/src/share/inner_table/ob_inner_table_schema.12301_12350.cpp @@ -7256,6 +7256,405 @@ int ObInnerTableSchema::all_virtual_minor_freeze_info_schema(ObTableSchema &tabl return ret; } +int ObInnerTableSchema::all_virtual_ha_diagnose_schema(ObTableSchema &table_schema) +{ + int ret = OB_SUCCESS; + uint64_t column_id = OB_APP_MIN_COLUMN_ID - 1; + + //generated fields: + table_schema.set_tenant_id(OB_SYS_TENANT_ID); + table_schema.set_tablegroup_id(OB_INVALID_ID); + table_schema.set_database_id(OB_SYS_DATABASE_ID); + table_schema.set_table_id(OB_ALL_VIRTUAL_HA_DIAGNOSE_TID); + table_schema.set_rowkey_split_pos(0); + table_schema.set_is_use_bloomfilter(false); + table_schema.set_progressive_merge_num(0); + table_schema.set_rowkey_column_num(0); + table_schema.set_load_type(TABLE_LOAD_TYPE_IN_DISK); + table_schema.set_table_type(VIRTUAL_TABLE); + table_schema.set_index_type(INDEX_TYPE_IS_NOT); + table_schema.set_def_type(TABLE_DEF_TYPE_INTERNAL); + + if (OB_SUCC(ret)) { + if (OB_FAIL(table_schema.set_table_name(OB_ALL_VIRTUAL_HA_DIAGNOSE_TNAME))) { + LOG_ERROR("fail to set table_name", K(ret)); + } + } + + if (OB_SUCC(ret)) { + if (OB_FAIL(table_schema.set_compress_func_name(OB_DEFAULT_COMPRESS_FUNC_NAME))) { + LOG_ERROR("fail to set compress_func_name", K(ret)); + } + } + table_schema.set_part_level(PARTITION_LEVEL_ZERO); + table_schema.set_charset_type(ObCharset::get_default_charset()); + table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("tenant_id", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObIntType, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(int64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("ls_id", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObIntType, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(int64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("svr_ip", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 1, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_INVALID, //column_collation_type + MAX_IP_ADDR_LENGTH, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("svr_port", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 2, //part_key_pos + ObIntType, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(int64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("election_role", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_INVALID, //column_collation_type + 32, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("election_epoch", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObIntType, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(int64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("palf_role", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_INVALID, //column_collation_type + 32, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("palf_state", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_INVALID, //column_collation_type + 32, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("palf_proposal_id", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObIntType, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(int64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("log_handler_role", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_INVALID, //column_collation_type + 32, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("log_handler_proposal_id", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObIntType, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(int64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("log_handler_takeover_state", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_INVALID, //column_collation_type + 32, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("log_handler_takeover_log_type", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_INVALID, //column_collation_type + 32, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("max_applied_scn", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObUInt64Type, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(uint64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("max_replayed_lsn", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObUInt64Type, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(uint64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("max_replayed_scn", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObUInt64Type, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(uint64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("replay_diagnose_info", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_INVALID, //column_collation_type + 1024, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("gc_state", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_INVALID, //column_collation_type + 32, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("gc_start_ts", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObIntType, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(int64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("archive_scn", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObUInt64Type, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(uint64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("checkpoint_scn", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObUInt64Type, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(uint64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("min_rec_scn", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObUInt64Type, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(uint64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("min_rec_scn_log_type", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_INVALID, //column_collation_type + 32, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + if (OB_SUCC(ret)) { + table_schema.get_part_option().set_part_num(1); + table_schema.set_part_level(PARTITION_LEVEL_ONE); + table_schema.get_part_option().set_part_func_type(PARTITION_FUNC_TYPE_LIST_COLUMNS); + if (OB_FAIL(table_schema.get_part_option().set_part_expr("svr_ip, svr_port"))) { + LOG_WARN("set_part_expr failed", K(ret)); + } else if (OB_FAIL(table_schema.mock_list_partition_array())) { + LOG_WARN("mock list partition array failed", K(ret)); + } + } + table_schema.set_index_using_type(USING_HASH); + table_schema.set_row_store_type(ENCODING_ROW_STORE); + table_schema.set_store_format(OB_STORE_FORMAT_DYNAMIC_MYSQL); + table_schema.set_progressive_merge_round(1); + table_schema.set_storage_format_version(3); + table_schema.set_tablet_id(0); + + table_schema.set_max_used_column_id(column_id); + return ret; +} + } // end namespace share } // end namespace oceanbase diff --git a/src/share/inner_table/ob_inner_table_schema.h b/src/share/inner_table/ob_inner_table_schema.h index b77ac05a0..7fe288a58 100644 --- a/src/share/inner_table/ob_inner_table_schema.h +++ b/src/share/inner_table/ob_inner_table_schema.h @@ -833,6 +833,7 @@ public: static int all_virtual_schema_memory_schema(share::schema::ObTableSchema &table_schema); static int all_virtual_schema_slot_schema(share::schema::ObTableSchema &table_schema); static int all_virtual_minor_freeze_info_schema(share::schema::ObTableSchema &table_schema); + static int all_virtual_ha_diagnose_schema(share::schema::ObTableSchema &table_schema); static int all_virtual_sql_audit_ora_schema(share::schema::ObTableSchema &table_schema); static int all_virtual_plan_stat_ora_schema(share::schema::ObTableSchema &table_schema); static int all_virtual_plan_cache_plan_explain_ora_schema(share::schema::ObTableSchema &table_schema); @@ -2734,6 +2735,7 @@ const schema_create_func virtual_table_schema_creators [] = { ObInnerTableSchema::all_virtual_schema_memory_schema, ObInnerTableSchema::all_virtual_schema_slot_schema, ObInnerTableSchema::all_virtual_minor_freeze_info_schema, + ObInnerTableSchema::all_virtual_ha_diagnose_schema, ObInnerTableSchema::all_virtual_sql_audit_ora_schema, ObInnerTableSchema::all_virtual_plan_stat_ora_schema, ObInnerTableSchema::all_virtual_plan_cache_plan_explain_ora_schema, @@ -7052,7 +7054,8 @@ const uint64_t cluster_distributed_vtables [] = { OB_ALL_VIRTUAL_KVCACHE_HANDLE_LEAK_INFO_TID, OB_ALL_VIRTUAL_SCHEMA_MEMORY_TID, OB_ALL_VIRTUAL_SCHEMA_SLOT_TID, - OB_ALL_VIRTUAL_MINOR_FREEZE_INFO_TID, }; + OB_ALL_VIRTUAL_MINOR_FREEZE_INFO_TID, + OB_ALL_VIRTUAL_HA_DIAGNOSE_TID, }; const uint64_t tenant_distributed_vtables [] = { OB_ALL_VIRTUAL_PROCESSLIST_TID, @@ -9152,11 +9155,11 @@ static inline int get_sys_table_lob_aux_schema(const uint64_t tid, const int64_t OB_CORE_TABLE_COUNT = 4; const int64_t OB_SYS_TABLE_COUNT = 212; -const int64_t OB_VIRTUAL_TABLE_COUNT = 550; +const int64_t OB_VIRTUAL_TABLE_COUNT = 551; const int64_t OB_SYS_VIEW_COUNT = 601; -const int64_t OB_SYS_TENANT_TABLE_COUNT = 1368; +const int64_t OB_SYS_TENANT_TABLE_COUNT = 1369; const int64_t OB_CORE_SCHEMA_VERSION = 1; -const int64_t OB_BOOTSTRAP_SCHEMA_VERSION = 1371; +const int64_t OB_BOOTSTRAP_SCHEMA_VERSION = 1372; } // end namespace share } // end namespace oceanbase diff --git a/src/share/inner_table/ob_inner_table_schema_constants.h b/src/share/inner_table/ob_inner_table_schema_constants.h index 211ad91e5..01219febb 100644 --- a/src/share/inner_table/ob_inner_table_schema_constants.h +++ b/src/share/inner_table/ob_inner_table_schema_constants.h @@ -581,6 +581,7 @@ const uint64_t OB_ALL_VIRTUAL_LS_REPLICA_TASK_PLAN_TID = 12335; // "__all_virtua const uint64_t OB_ALL_VIRTUAL_SCHEMA_MEMORY_TID = 12336; // "__all_virtual_schema_memory" const uint64_t OB_ALL_VIRTUAL_SCHEMA_SLOT_TID = 12337; // "__all_virtual_schema_slot" const uint64_t OB_ALL_VIRTUAL_MINOR_FREEZE_INFO_TID = 12338; // "__all_virtual_minor_freeze_info" +const uint64_t OB_ALL_VIRTUAL_HA_DIAGNOSE_TID = 12340; // "__all_virtual_ha_diagnose" const uint64_t OB_ALL_VIRTUAL_SQL_AUDIT_ORA_TID = 15009; // "ALL_VIRTUAL_SQL_AUDIT_ORA" const uint64_t OB_ALL_VIRTUAL_PLAN_STAT_ORA_TID = 15010; // "ALL_VIRTUAL_PLAN_STAT_ORA" const uint64_t OB_ALL_VIRTUAL_PLAN_CACHE_PLAN_EXPLAIN_ORA_TID = 15012; // "ALL_VIRTUAL_PLAN_CACHE_PLAN_EXPLAIN_ORA" @@ -2466,6 +2467,7 @@ const char *const OB_ALL_VIRTUAL_LS_REPLICA_TASK_PLAN_TNAME = "__all_virtual_ls_ const char *const OB_ALL_VIRTUAL_SCHEMA_MEMORY_TNAME = "__all_virtual_schema_memory"; const char *const OB_ALL_VIRTUAL_SCHEMA_SLOT_TNAME = "__all_virtual_schema_slot"; const char *const OB_ALL_VIRTUAL_MINOR_FREEZE_INFO_TNAME = "__all_virtual_minor_freeze_info"; +const char *const OB_ALL_VIRTUAL_HA_DIAGNOSE_TNAME = "__all_virtual_ha_diagnose"; const char *const OB_ALL_VIRTUAL_SQL_AUDIT_ORA_TNAME = "ALL_VIRTUAL_SQL_AUDIT"; const char *const OB_ALL_VIRTUAL_PLAN_STAT_ORA_TNAME = "ALL_VIRTUAL_PLAN_STAT"; const char *const OB_ALL_VIRTUAL_PLAN_CACHE_PLAN_EXPLAIN_ORA_TNAME = "ALL_VIRTUAL_PLAN_CACHE_PLAN_EXPLAIN"; diff --git a/src/share/inner_table/ob_inner_table_schema_def.py b/src/share/inner_table/ob_inner_table_schema_def.py index d1fc873e9..dd6da3bff 100644 --- a/src/share/inner_table/ob_inner_table_schema_def.py +++ b/src/share/inner_table/ob_inner_table_schema_def.py @@ -10769,7 +10769,45 @@ def_table_schema( ) # 12339: __all_virtual_show_trace -# 12340: __all_virtual_ha_diagnose +def_table_schema( + owner = 'keqing.llt', + table_name = '__all_virtual_ha_diagnose', + table_id = '12340', + table_type = 'VIRTUAL_TABLE', + gm_columns = [], + in_tenant_space = False, + rowkey_columns = [ + ], + + normal_columns = [ + ('tenant_id', 'int'), + ('ls_id', 'int'), + ('svr_ip', 'varchar:MAX_IP_ADDR_LENGTH'), + ('svr_port', 'int'), + ('election_role', 'varchar:32'), + ('election_epoch', 'int'), + ('palf_role', 'varchar:32'), + ('palf_state', 'varchar:32'), + ('palf_proposal_id', 'int'), + ('log_handler_role', 'varchar:32'), + ('log_handler_proposal_id', 'int'), + ('log_handler_takeover_state', 'varchar:32'), + ('log_handler_takeover_log_type', 'varchar:32'), + ('max_applied_scn', 'uint'), + ('max_replayed_lsn', 'uint'), + ('max_replayed_scn', 'uint'), + ('replay_diagnose_info', 'varchar:1024'), + ('gc_state', 'varchar:32'), + ('gc_start_ts', 'int'), + ('archive_scn', 'uint'), + ('checkpoint_scn', 'uint'), + ('min_rec_scn', 'uint'), + ('min_rec_scn_log_type', 'varchar:32') + ], + + partition_columns = ['svr_ip', 'svr_port'], + vtable_route_policy = 'distributed', +) # # 余留位置 diff --git a/src/storage/checkpoint/ob_checkpoint_executor.cpp b/src/storage/checkpoint/ob_checkpoint_executor.cpp index 97cc6f687..749d8715b 100644 --- a/src/storage/checkpoint/ob_checkpoint_executor.cpp +++ b/src/storage/checkpoint/ob_checkpoint_executor.cpp @@ -106,6 +106,20 @@ void ObCheckpointExecutor::offline() update_checkpoint_enabled_ = false; } +void ObCheckpointExecutor::get_min_rec_log_ts(int &log_type, + int64_t &min_rec_log_ts) const +{ + for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) { + if (OB_NOT_NULL(handlers_[i])) { + int64_t rec_log_ts = handlers_[i]->get_rec_log_ts(); + if (rec_log_ts > 0 && rec_log_ts < min_rec_log_ts) { + min_rec_log_ts = rec_log_ts; + log_type = i; + } + } + } +} + inline void get_min_rec_log_ts_service_type_by_index_(int index, char* service_type) { int ret = OB_SUCCESS; @@ -133,15 +147,7 @@ int ObCheckpointExecutor::update_clog_checkpoint() // used to record which handler provide the smallest rec_log_ts int min_rec_log_ts_service_type_index = 0; char service_type[common::MAX_SERVICE_TYPE_BUF_LENGTH]; - for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) { - if (OB_NOT_NULL(handlers_[i])) { - int64_t rec_log_ts = handlers_[i]->get_rec_log_ts(); - if (rec_log_ts > 0 && rec_log_ts < checkpoint_ts) { - checkpoint_ts = rec_log_ts; - min_rec_log_ts_service_type_index = i; - } - } - } + get_min_rec_log_ts(min_rec_log_ts_service_type_index, checkpoint_ts); get_min_rec_log_ts_service_type_by_index_(min_rec_log_ts_service_type_index, service_type); const int64_t checkpoint_ts_in_ls_meta = ls_->get_clog_checkpoint_ts(); @@ -275,7 +281,7 @@ bool ObCheckpointExecutor::need_flush() K(end_log_ts), K(ls_->get_clog_checkpoint_ts())); need_flush = true; } - + return need_flush; } @@ -286,7 +292,7 @@ bool ObCheckpointExecutor::is_wait_advance_checkpoint() ATOMIC_STORE(&wait_advance_checkpoint_, false); } } - + return ATOMIC_LOAD(&wait_advance_checkpoint_); } @@ -313,6 +319,18 @@ int64_t ObCheckpointExecutor::get_cannot_recycle_log_size() return cannot_recycle_log_size; } +int ObCheckpointExecutor::diagnose(CheckpointDiagnoseInfo &diagnose_info) const +{ + int ret = OB_SUCCESS; + ObSpinLockGuard guard(lock_); + int log_type_index = 0; + diagnose_info.checkpoint_ = ls_->get_clog_checkpoint_ts(); + diagnose_info.min_rec_scn_ = INT64_MAX; + get_min_rec_log_ts(log_type_index, diagnose_info.min_rec_scn_); + ObLogBaseType log_type = static_cast(log_type_index); + diagnose_info.log_type_ = log_type; + return ret; +} } // namespace checkpoint } // namespace storage } // namespace oceanbase diff --git a/src/storage/checkpoint/ob_checkpoint_executor.h b/src/storage/checkpoint/ob_checkpoint_executor.h index 9450b8784..c1d43a7ef 100644 --- a/src/storage/checkpoint/ob_checkpoint_executor.h +++ b/src/storage/checkpoint/ob_checkpoint_executor.h @@ -36,6 +36,17 @@ struct ObCheckpointVTInfo ); }; +struct CheckpointDiagnoseInfo +{ + int64_t checkpoint_; + int64_t min_rec_scn_; + logservice::ObLogBaseType log_type_; + + TO_STRING_KV(K(checkpoint_), + K(min_rec_scn_), + K(log_type_)); +}; + class ObCheckpointExecutor { public: @@ -71,6 +82,10 @@ public: int64_t get_cannot_recycle_log_size(); + void get_min_rec_log_ts(int &log_type, int64_t &min_rec_log_ts) const; + + int diagnose(CheckpointDiagnoseInfo &diagnose_info) const; + private: static const int64_t CLOG_GC_PERCENT = 60; static const int64_t MAX_NEED_REPLAY_CLOG_INTERVAL = (int64_t)60 * 60 * 1000 * 1000 * 1000; //ns @@ -81,7 +96,7 @@ private: // be used to avoid checkpoint concurrently, // no need to protect handlers_[] because ls won't be destroyed(hold lshandle) // when the public interfaces are invoked - common::ObSpinLock lock_; + mutable common::ObSpinLock lock_; // avoid frequent freeze when clog_used_over_threshold bool wait_advance_checkpoint_; diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index 8d3469deb..0907c9e18 100644 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -1336,5 +1336,38 @@ int ObLS::update_id_meta_without_writing_slog(const int64_t service_type, return ret; } +int ObLS::diagnose(DiagnoseInfo &info) const +{ + int ret = OB_SUCCESS; + logservice::ObLogService *log_service = MTL(logservice::ObLogService *); + share::ObLSID ls_id = get_ls_id(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "ls is not inited", K(ret)); + } else if (FALSE_IT(info.ls_id_ = ls_id.id()) || + FALSE_IT(info.rc_diagnose_info_.id_ = ls_id.id())) { + } else if (OB_FAIL(gc_handler_.diagnose(info.gc_diagnose_info_))) { + STORAGE_LOG(WARN, "diagnose gc failed", K(ret), K(ls_id)); + } else if (OB_FAIL(checkpoint_executor_.diagnose(info.checkpoint_diagnose_info_))) { + STORAGE_LOG(WARN, "diagnose checkpoint failed", K(ret), K(ls_id)); + } else if (OB_FAIL(log_service->diagnose_apply(ls_id, info.apply_diagnose_info_))) { + STORAGE_LOG(WARN, "diagnose apply failed", K(ret), K(ls_id)); + } else if (OB_FAIL(log_service->diagnose_replay(ls_id, info.replay_diagnose_info_))) { + STORAGE_LOG(WARN, "diagnose replay failed", K(ret), K(ls_id)); + } else if (OB_FAIL(log_handler_.diagnose(info.log_handler_diagnose_info_))) { + STORAGE_LOG(WARN, "diagnose log handler failed", K(ret), K(ls_id)); + } else if (OB_FAIL(log_handler_.diagnose_palf(info.palf_diagnose_info_))) { + STORAGE_LOG(WARN, "diagnose palf failed", K(ret), K(ls_id)); + } else if (info.is_role_sync()) { + // 角色同步时不需要诊断role change service + info.rc_diagnose_info_.state_ = logservice::TakeOverState::TAKE_OVER_FINISH; + info.rc_diagnose_info_.log_type_ = logservice::ObLogBaseType::INVALID_LOG_BASE_TYPE; + } else if (OB_FAIL(log_service->diagnose_role_change(info.rc_diagnose_info_))) { + // election, palf, log handler角色不统一时可能出现无主 + STORAGE_LOG(WARN, "diagnose rc service failed", K(ret), K(ls_id)); + } + STORAGE_LOG(INFO, "diagnose finish", K(ret), K(info), K(ls_id)); + return ret; +} } } diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 854f4749e..1987efb2d 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -37,7 +37,9 @@ #include "storage/tx_table/ob_tx_table.h" #include "storage/tx/ob_keep_alive_ls_handler.h" #include "storage/restore/ob_ls_restore_handler.h" +#include "logservice/applyservice/ob_log_apply_service.h" #include "logservice/replayservice/ob_replay_handler.h" +#include "logservice/replayservice/ob_replay_status.h" #include "logservice/rcservice/ob_role_change_handler.h" #include "logservice/restoreservice/ob_log_restore_handler.h" // ObLogRestoreHandler #include "logservice/ob_log_handler.h" @@ -122,6 +124,31 @@ struct ObLSVTInfo int64_t checkpoint_lsn_; }; +// 诊断虚表统计信息 +struct DiagnoseInfo +{ + bool is_role_sync() { + return ((palf_diagnose_info_.election_role_ == palf_diagnose_info_.palf_role_) + && (palf_diagnose_info_.palf_role_ == log_handler_diagnose_info_.log_handler_role_)); + } + int64_t ls_id_; + logservice::LogHandlerDiagnoseInfo log_handler_diagnose_info_; + palf::PalfDiagnoseInfo palf_diagnose_info_; + logservice::RCDiagnoseInfo rc_diagnose_info_; + logservice::ApplyDiagnoseInfo apply_diagnose_info_; + logservice::ReplayDiagnoseInfo replay_diagnose_info_; + logservice::GCDiagnoseInfo gc_diagnose_info_; + checkpoint::CheckpointDiagnoseInfo checkpoint_diagnose_info_; + TO_STRING_KV(K(ls_id_), + K(log_handler_diagnose_info_), + K(palf_diagnose_info_), + K(rc_diagnose_info_), + K(apply_diagnose_info_), + K(replay_diagnose_info_), + K(gc_diagnose_info_), + K(checkpoint_diagnose_info_)); +}; + class ObIComponentFactory; enum class ObInnerLSStatus; @@ -640,6 +667,7 @@ public: const ObTabletID &tablet_id, const ObBatchUpdateTableStoreParam ¶m); int try_update_uppder_trans_version(); + int diagnose(DiagnoseInfo &info) const; private: // StorageBaseUtil diff --git a/src/storage/tx_storage/ob_ls_service.cpp b/src/storage/tx_storage/ob_ls_service.cpp index fa705dd1d..82a483e65 100644 --- a/src/storage/tx_storage/ob_ls_service.cpp +++ b/src/storage/tx_storage/ob_ls_service.cpp @@ -1256,6 +1256,34 @@ bool ObLSService::need_create_inner_tablets_(const obrpc::ObCreateLSArg &arg) co return arg.need_create_inner_tablets(); } +int ObLSService::iterate_diagnose(const ObFunction &func) +{ + int ret = OB_SUCCESS; + common::ObSharedGuard ls_iter; + ObLS *ls = nullptr; + + if (OB_FAIL(get_ls_iter(ls_iter, ObLSGetMod::OBSERVER_MOD))) { + LOG_WARN("failed to get ls iter", K(ret)); + } else { + while (OB_SUCC(ret)) { + if (OB_FAIL(ls_iter->get_next(ls))) { + if (OB_ITER_END != ret) { + LOG_ERROR("fail to get next ls", K(ret)); + } + } else if (nullptr == ls) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("ls is null", K(ret)); + } else if (OB_FAIL(func(*ls))) { + LOG_WARN("iter ls diagnose failed", K(ret)); + } + } + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } + } + return ret; +} + } // storage } // oceanbase diff --git a/src/storage/tx_storage/ob_ls_service.h b/src/storage/tx_storage/ob_ls_service.h index 25b16b979..8ade897c6 100644 --- a/src/storage/tx_storage/ob_ls_service.h +++ b/src/storage/tx_storage/ob_ls_service.h @@ -92,6 +92,8 @@ public: int get_ls(const share::ObLSID &ls_id, ObLSHandle &handle, ObLSGetMod mod); + // @param [in] func, iterate all ls diagnose info + int iterate_diagnose(const ObFunction &func); // remove the ls that is creating and write abort slog. int gc_ls_after_replay_slog(); diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/inner_table_overall.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/inner_table_overall.result index 55c2d7ce4..471e85075 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/inner_table_overall.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/inner_table_overall.result @@ -553,6 +553,7 @@ select 0xffffffffff & table_id, table_name, table_type, database_id, part_num fr 12336 __all_virtual_schema_memory 2 201001 1 12337 __all_virtual_schema_slot 2 201001 1 12338 __all_virtual_minor_freeze_info 2 201001 1 +12340 __all_virtual_ha_diagnose 2 201001 1 20001 GV$OB_PLAN_CACHE_STAT 1 201001 1 20002 GV$OB_PLAN_CACHE_PLAN_STAT 1 201001 1 20003 SCHEMATA 1 201002 1 diff --git a/unittest/logservice/test_role_change_handler.cpp b/unittest/logservice/test_role_change_handler.cpp index f96dabd25..9e9f5b998 100644 --- a/unittest/logservice/test_role_change_handler.cpp +++ b/unittest/logservice/test_role_change_handler.cpp @@ -44,8 +44,9 @@ TEST(TestRoleChangeHander, test_basic_func) ObRoleChangeHandler handler; ObLogBaseType type = ObLogBaseType::TRANS_SERVICE_LOG_BASE_TYPE; MockRoleChangeHandler mock_handler; + RCDiagnoseInfo unused_diagnose_info; handler.register_handler(type, &mock_handler); - handler.switch_to_leader(); + handler.switch_to_leader(unused_diagnose_info); handler.switch_to_follower_forcedly(); handler.switch_to_follower_gracefully(); }