add diagnose virtual table

This commit is contained in:
obdev 2022-11-07 04:35:44 +00:00 committed by wangzelin.wzl
parent e78a327f06
commit 55f776959e
42 changed files with 1468 additions and 112 deletions

View File

@ -757,6 +757,18 @@ int ObApplyStatus::handle_drop_cb()
return ret;
}
int ObApplyStatus::diagnose(ApplyDiagnoseInfo &diagnose_info)
{
int ret = OB_SUCCESS;
int64_t min_unapplied_scn = OB_INVALID_TIMESTAMP;
if (OB_FAIL(get_min_unapplied_log_ts_ns(min_unapplied_scn))) {
CLOG_LOG(WARN, "get_min_unapplied_log_ts_ns failed", KPC(this), K(ret));
} else {
diagnose_info.max_applied_scn_ = min_unapplied_scn - 1;
}
return ret;
}
int ObApplyStatus::submit_task_to_apply_service_(ObApplyServiceTask &task)
{
int ret = OB_SUCCESS;
@ -1378,6 +1390,28 @@ int ObLogApplyService::stat_for_each(const common::ObFunction<int (const ObApply
return apply_status_map_.for_each(stat_func);
}
int ObLogApplyService::diagnose(const share::ObLSID &id,
ApplyDiagnoseInfo &diagnose_info)
{
int ret = OB_SUCCESS;
ObApplyStatus *apply_status = NULL;
ObApplyStatusGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(ERROR, "apply service not init", K(ret));
} else if (OB_FAIL(get_apply_status(id, guard))) {
CLOG_LOG(WARN, "guard get apply status failed", K(ret), K(id));
} else if (NULL == (apply_status = guard.get_apply_status())) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "apply status is not exist", K(ret), K(id));
} else if (OB_FAIL(apply_status->diagnose(diagnose_info))) {
CLOG_LOG(WARN, "apply status diagnose failed", K(ret), K(id));
} else {
CLOG_LOG(TRACE, "apply service diagnose success", K(id));
}
return ret;
}
int ObLogApplyService::handle_cb_queue_(ObApplyStatus *apply_status,
ObApplyServiceQueueTask *cb_queue,
bool &is_timeslice_run_out)

View File

@ -58,6 +58,12 @@ struct LSApplyStat
K(pending_cnt_));
};
struct ApplyDiagnoseInfo
{
int64_t max_applied_scn_;
TO_STRING_KV(K(max_applied_scn_));
};
class ObApplyFsCb : public palf::PalfFSCb
{
public:
@ -164,6 +170,7 @@ public:
int get_min_unapplied_log_ts_ns(int64_t &log_ts);
int stat(LSApplyStat &stat) const;
int handle_drop_cb();
int diagnose(ApplyDiagnoseInfo &diagnose_info);
TO_STRING_KV(K(ls_id_),
K(role_),
K(proposal_id_),
@ -251,6 +258,7 @@ public:
int push_task(ObApplyServiceTask *task);
int wait_append_sync(const share::ObLSID &ls_id);
int stat_for_each(const common::ObFunction<int (const ObApplyStatus &)> &func);
int diagnose(const share::ObLSID &id, ApplyDiagnoseInfo &diagnose_info);
public:
class GetApplyStatusFunctor
{

View File

@ -298,7 +298,8 @@ DEFINE_GET_SERIALIZE_SIZE(ObGCLSLog)
ObGCHandler::ObGCHandler() : is_inited_(false),
rwlock_(),
ls_(NULL),
gc_seq_invalid_member_(-1)
gc_seq_invalid_member_(-1),
gc_start_ts_(OB_INVALID_TIMESTAMP)
{
}
@ -312,6 +313,7 @@ void ObGCHandler::reset()
WLockGuard wlock_guard(rwlock_);
gc_seq_invalid_member_ = -1;
ls_ = NULL;
gc_start_ts_ = OB_INVALID_TIMESTAMP;
is_inited_ = false;
}
@ -770,6 +772,7 @@ void ObGCHandler::handle_gc_ls_dropping_(const ObGarbageCollector::LSStatus &ls_
ObRole role;
ObLSID ls_id = ls_->get_ls_id();
LSGCState gc_state = INVALID_LS_GC_STATE;
gc_start_ts_ = ObTimeUtility::current_time();
if (OB_FAIL(get_palf_role_(role))) {
CLOG_LOG(WARN, "get_palf_role_ failed", K(ls_id));
} else if (ObRole::LEADER != role) {
@ -824,6 +827,22 @@ void ObGCHandler::handle_gc_ls_offline_(ObGarbageCollector::LSStatus &ls_status)
}
}
int ObGCHandler::diagnose(GCDiagnoseInfo &diagnose_info) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
CLOG_LOG(WARN, "GC handler not init");
} else {
RLockGuard wlock_guard(rwlock_);
if (OB_FAIL(ls_->get_gc_state(diagnose_info.gc_state_))) {
CLOG_LOG(WARN, "get_gc_state failed", K(ls_id));
} else {
diagnose_info.gc_start_ts_ = gc_start_ts_;
}
}
return ret;
}
//---------------ObGarbageCollector---------------//
void ObGarbageCollector::GCCandidate::set_ls_status(const share::ObLSStatus &ls_status)
{

View File

@ -65,6 +65,38 @@ enum LSGCState
MAX_LS_GC_STATE = 6,
};
static inline
int gc_state_to_string(const LSGCState gc_state,
char *str,
const int64_t str_len)
{
int ret = OB_SUCCESS;
if (gc_state == INVALID_LS_GC_STATE) {
strncpy(str ,"INVALID_STATE", str_len);
} else if (gc_state == NORMAL) {
strncpy(str ,"NORMAL", str_len);
} else if (gc_state == LS_BLOCKED) {
strncpy(str ,"LS_BLOCKED", str_len);
} else if (gc_state == WAIT_OFFLINE) {
strncpy(str ,"WAIT_OFFLINE", str_len);
} else if (gc_state == LS_OFFLINE) {
strncpy(str ,"LS_OFFLINE", str_len);
} else if (gc_state == WAIT_GC) {
strncpy(str ,"WAIT_GC", str_len);
} else {
ret = OB_INVALID_ARGUMENT;
}
return ret;
}
struct GCDiagnoseInfo
{
LSGCState gc_state_;
int64_t gc_start_ts_;
TO_STRING_KV(K(gc_state_),
K(gc_start_ts_));
};
class ObGCLSLog
{
public:
@ -189,6 +221,7 @@ public:
int check_ls_can_offline();
int gc_check_invalid_member_seq(const int64_t gc_seq, bool &need_gc);
static bool is_valid_ls_gc_state(const LSGCState &state);
int diagnose(GCDiagnoseInfo &diagnose_info) const;
// for replay
virtual int replay(const void *buffer,
@ -267,6 +300,7 @@ private:
RWLock rwlock_; //for leader revoke/takeover submit log
storage::ObLS *ls_;
int64_t gc_seq_invalid_member_; //缓存gc检查当前ls不在成员列表时的轮次
int64_t gc_start_ts_;
};
} // namespace logservice

View File

@ -72,7 +72,9 @@ int log_base_type_to_string(const ObLogBaseType log_type,
const int64_t str_len)
{
int ret = OB_SUCCESS;
if (log_type == TRANS_SERVICE_LOG_BASE_TYPE) {
if (log_type == INVALID_LOG_BASE_TYPE) {
strncpy(str ,"INVALID_TYPE", str_len);
} else if (log_type == TRANS_SERVICE_LOG_BASE_TYPE) {
strncpy(str ,"TRANS_SERVICE", str_len);
} else if (log_type == TABLET_OP_LOG_BASE_TYPE) {
strncpy(str ,"TABLET_OP", str_len);

View File

@ -1307,5 +1307,31 @@ int ObLogHandler::unregister_rebuild_cb()
return ret;
}
int ObLogHandler::diagnose(LogHandlerDiagnoseInfo &diagnose_info) const
{
int ret = OB_SUCCESS;
RLockGuard guard(lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
diagnose_info.log_handler_role_ = ATOMIC_LOAD(&role_);
diagnose_info.log_handler_proposal_id_ = ATOMIC_LOAD(&proposal_id_);
}
return ret;
}
int ObLogHandler::diagnose_palf(palf::PalfDiagnoseInfo &diagnose_info) const
{
int ret = OB_SUCCESS;
RLockGuard guard(lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(palf_handle_.diagnose(diagnose_info))) {
CLOG_LOG(WARN, "palf handle diagnose failed", K(ret), KPC(this));
} else {
// do nothing
}
return ret;
}
} // end namespace logservice
} // end napespace oceanbase

View File

@ -45,6 +45,13 @@ class ObLogApplyService;
class ObApplyStatus;
class ObLogReplayService;
class AppendCb;
struct LogHandlerDiagnoseInfo {
common::ObRole log_handler_role_;
int64_t log_handler_proposal_id_;
TO_STRING_KV(K(log_handler_role_),
K(log_handler_proposal_id_));
};
class ObILogHandler
{
public:
@ -502,6 +509,8 @@ public:
int enable_vote() override final;
int register_rebuild_cb(palf::PalfRebuildCb *rebuild_cb) override final;
int unregister_rebuild_cb() override final;
int diagnose(LogHandlerDiagnoseInfo &diagnose_info) const;
int diagnose_palf(palf::PalfDiagnoseInfo &diagnose_info) const;
TO_STRING_KV(K_(role), K_(proposal_id), KP(palf_env_), K(is_in_stop_state_), K(is_inited_));
private:
int submit_config_change_cmd_(const LogConfigChangeCmd &req);

View File

@ -613,5 +613,49 @@ int ObLogService::create_ls_(const share::ObLSID &id,
}
return ret;
}
int ObLogService::diagnose_role_change(RCDiagnoseInfo &diagnose_info)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "log_service is not inited", K(ret));
} else if (OB_FAIL(role_change_service_.diagnose(diagnose_info))) {
CLOG_LOG(WARN, "role_change_service diagnose failed", K(ret));
} else {
// do nothing
}
return ret;
}
int ObLogService::diagnose_replay(const share::ObLSID &id,
ReplayDiagnoseInfo &diagnose_info)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "log_service is not inited", K(ret));
} else if (OB_FAIL(replay_service_.diagnose(id, diagnose_info))) {
CLOG_LOG(WARN, "replay_service diagnose failed", K(ret), K(id));
} else {
// do nothing
}
return ret;
}
int ObLogService::diagnose_apply(const share::ObLSID &id,
ApplyDiagnoseInfo &diagnose_info)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "log_service is not inited", K(ret));
} else if (OB_FAIL(apply_service_.diagnose(id, diagnose_info))) {
CLOG_LOG(WARN, "apply_service diagnose failed", K(ret), K(id));
} else {
// do nothing
}
return ret;
}
}//end of namespace logservice
}//end of namespace oceanbase

View File

@ -181,6 +181,9 @@ public:
int iterate_palf(const ObFunction<int(const palf::PalfHandle&)> &func);
int iterate_apply(const ObFunction<int(const ObApplyStatus&)> &func);
int iterate_replay(const ObFunction<int(const ObReplayStatus&)> &func);
int diagnose_role_change(RCDiagnoseInfo &diagnose_info);
int diagnose_replay(const share::ObLSID &id, ReplayDiagnoseInfo &diagnose_info);
int diagnose_apply(const share::ObLSID &id, ApplyDiagnoseInfo &diagnose_info);
palf::PalfEnv *get_palf_env() { return palf_env_; }
// TODO by yunlong: temp solution, will by removed after Reporter be added in MTL

View File

@ -11,6 +11,7 @@
*/
#include "ob_ls_adapter.h"
#include "replayservice/ob_log_replay_service.h"
#include "replayservice/ob_replay_status.h"
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/tx_storage/ob_ls_handle.h"
@ -58,6 +59,7 @@ int ObLSAdapter::replay(ObLogReplayTask *replay_task)
int ret = OB_SUCCESS;
ObLS *ls = NULL;
ObLSHandle ls_handle;
int64_t start_ts = ObTimeUtility::fast_current_time();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(ERROR, "ObLSAdapter not inited", K(ret));
@ -73,6 +75,25 @@ int ObLSAdapter::replay(ObLogReplayTask *replay_task)
replay_task->log_ts_))) {
CLOG_LOG(WARN, "log stream do replay failed", K(ret), KPC(replay_task));
}
if (OB_EAGAIN == ret) {
if (common::OB_INVALID_TIMESTAMP == replay_task->first_handle_ts_) {
replay_task->first_handle_ts_ = start_ts;
replay_task->print_error_ts_ = start_ts;
} else if ((start_ts - replay_task->print_error_ts_) > MAX_SINGLE_RETRY_WARNING_TIME_THRESOLD) {
replay_task->retry_cost_ = start_ts - replay_task->first_handle_ts_;
CLOG_LOG(WARN, "single replay task retry cost too much time. replay may be delayed",
KPC(replay_task));
replay_task->print_error_ts_ = start_ts;
}
}
replay_task->replay_cost_ = ObTimeUtility::fast_current_time() - start_ts;
if (replay_task->replay_cost_ > MAX_SINGLE_REPLAY_WARNING_TIME_THRESOLD) {
if (replay_task->replay_cost_ > MAX_SINGLE_REPLAY_ERROR_TIME_THRESOLD && !get_replay_is_writing_throttling()) {
CLOG_LOG(ERROR, "single replay task cost too much time. replay may be delayed", KPC(replay_task));
} else {
CLOG_LOG(WARN, "single replay task cost too much time", KPC(replay_task));
}
}
return ret;
}

View File

@ -36,6 +36,9 @@ public:
virtual int replay(ObLogReplayTask *replay_task);
virtual int wait_append_sync(const share::ObLSID &ls_id);
private:
const int64_t MAX_SINGLE_REPLAY_WARNING_TIME_THRESOLD = 100 * 1000; //100ms
const int64_t MAX_SINGLE_REPLAY_ERROR_TIME_THRESOLD = 1000 * 1000; //1s 单条日志回放执行时间超过此值报error
const int64_t MAX_SINGLE_RETRY_WARNING_TIME_THRESOLD = 5 * 1000 * 1000; //1s 单条日志回放重试超过此值报error
bool is_inited_;
storage::ObLSService *ls_service_;
};

View File

@ -1090,5 +1090,18 @@ LogReplicaType LogStateMgr::get_replica_type() const
{
return ATOMIC_LOAD(&replica_type_);
}
int LogStateMgr::get_election_role(common::ObRole &role, int64_t &epoch) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(election_->get_role(role, epoch))) {
PALF_LOG(WARN, "get elect role failed", K(ret));
} else {
// do nothing
}
return ret;
}
} // namespace palf
} // namespace oceanbase

View File

@ -94,6 +94,7 @@ public:
virtual int disable_vote();
virtual int enable_vote();
virtual LogReplicaType get_replica_type() const;
virtual int get_election_role(common::ObRole &role, int64_t &epoch) const;
TO_STRING_KV(KP(this), K_(self), K_(palf_id), "role", role_to_string(role_), \
"state", replica_state_to_string(state_), K_(prepare_meta), K_(leader), K_(leader_epoch), \
K_(is_sync_enabled), K_(pending_end_lsn), K_(scan_disk_log_finished), K_(last_check_start_id), \

View File

@ -624,5 +624,11 @@ int PalfHandle::stat(PalfStat &palf_stat) const
return palf_handle_impl_->stat(palf_stat);
}
int PalfHandle::diagnose(PalfDiagnoseInfo &diagnose_info) const
{
CHECK_VALID;
return palf_handle_impl_->diagnose(diagnose_info);
}
} // end namespace palf
} // end namespace oceanbase

View File

@ -368,7 +368,7 @@ public:
// OB_NOT_MASTER: self is not active leader
// OB_EAGAIN: another change_acess_mode is running, try again later
// NB: 1. if return OB_EAGAIN, caller need execute 'change_access_mode' again.
// 2. before execute 'change_access_mode', caller need execute 'get_access_mode' to
// 2. before execute 'change_access_mode', caller need execute 'get_access_mode' to
// get 'mode_version' and pass it to 'change_access_mode'
int change_access_mode(const int64_t proposal_id,
const int64_t mode_version,
@ -427,6 +427,8 @@ public:
int reset_election_priority();
int stat(PalfStat &palf_stat) const;
// @param [out] diagnose info, current diagnose info of palf
int diagnose(PalfDiagnoseInfo &diagnose_info) const;
TO_STRING_KV(KP(palf_handle_impl_), KP(rc_cb_), KP(fs_cb_));
private:
palf::PalfHandleImpl *palf_handle_impl_;

View File

@ -3508,5 +3508,14 @@ int PalfHandleImpl::stat(PalfStat &palf_stat)
return ret;
}
int PalfHandleImpl::diagnose(PalfDiagnoseInfo &diagnose_info) const
{
int ret = OB_SUCCESS;
state_mgr_.get_role_and_state(diagnose_info.palf_role_, diagnose_info.palf_state_);
diagnose_info.palf_proposal_id_ = state_mgr_.get_proposal_id();
state_mgr_.get_election_role(diagnose_info.election_role_, diagnose_info.election_epoch_);
return ret;
}
} // end namespace palf
} // end namespace oceanbase

View File

@ -84,6 +84,19 @@ struct PalfStat {
K_(replica_type), K_(base_lsn), K_(end_lsn), K_(end_ts_ns), K_(max_lsn));
};
struct PalfDiagnoseInfo {
common::ObRole election_role_;
int64_t election_epoch_;
common::ObRole palf_role_;
palf::ObReplicaState palf_state_;
int64_t palf_proposal_id_;
TO_STRING_KV(K(election_role_),
K(election_epoch_),
K(palf_role_),
K(palf_state_),
K(palf_proposal_id_));
};
struct LSKey {
LSKey() : id_(-1) {}
explicit LSKey(const int64_t id) : id_(id) {}
@ -576,6 +589,7 @@ public:
virtual int revoke_leader(const int64_t proposal_id) = 0;
virtual int stat(PalfStat &palf_stat) = 0;
virtual int get_palf_epoch(int64_t &palf_epoch) const = 0;
virtual int diagnose(PalfDiagnoseInfo &diagnose_info) const = 0;
};
class PalfHandleImpl : public IPalfHandleImpl
@ -842,6 +856,7 @@ public:
const RegisterReturn reg_ret) override final;
int handle_learner_req(const LogLearner &server, const LogLearnerReqType req_type) override final;
int get_palf_epoch(int64_t &palf_epoch) const;
int diagnose(PalfDiagnoseInfo &diagnose_info) const;
TO_STRING_KV(K_(palf_id), K_(self), K_(has_set_deleted));
private:
int do_init_mem_(const int64_t palf_id,

View File

@ -20,6 +20,13 @@ namespace oceanbase
using namespace common;
namespace logservice
{
void RCDiagnoseInfo::reset()
{
id_ = -1;
state_ = TakeOverState::INVALID_TAKE_OVER_STATE;
log_type_ = ObLogBaseType::INVALID_LOG_BASE_TYPE;
}
ObRoleChangeHandler::ObRoleChangeHandler(): sub_role_change_handler_arr_()
{
reset();
@ -80,7 +87,7 @@ void ObRoleChangeHandler::switch_to_follower_forcedly()
}
}
int ObRoleChangeHandler::switch_to_leader()
int ObRoleChangeHandler::switch_to_leader(RCDiagnoseInfo &diagnose_info)
{
int ret = OB_SUCCESS;
ObSpinLockGuard guard(lock_);
@ -88,6 +95,7 @@ int ObRoleChangeHandler::switch_to_leader()
ObIRoleChangeSubHandler *handler = sub_role_change_handler_arr_[i];
char sub_role_change_handler_str[OB_LOG_BASE_TYPE_STR_MAX_LEN] = {'\0'};
ObLogBaseType base_type = static_cast<ObLogBaseType>(i);
diagnose_info.log_type_ = base_type;
bool has_defined_to_string = false;
if (OB_SUCCESS == log_base_type_to_string(base_type, sub_role_change_handler_str,
OB_LOG_BASE_TYPE_STR_MAX_LEN)) {

View File

@ -23,6 +23,47 @@ namespace oceanbase
{
namespace logservice
{
enum TakeOverState {
INVALID_TAKE_OVER_STATE = 0,
WAIT_REPLAY_DONE = 1,
WAIT_RC_HANDLER_DONE = 2,
TAKE_OVER_FINISH = 3,
UNKNOWN_TAKE_OVER_STATE = 4,
MAX_TAKE_OVER_STATE = 5
};
static inline
int takeover_state_to_string(const TakeOverState log_type,
char *str,
const int64_t str_len)
{
int ret = OB_SUCCESS;
if (log_type == INVALID_TAKE_OVER_STATE) {
strncpy(str ,"INVALID_STATE", str_len);
} else if (log_type == WAIT_REPLAY_DONE) {
strncpy(str ,"WAIT_REPLAY_DONE", str_len);
} else if (log_type == WAIT_RC_HANDLER_DONE) {
strncpy(str ,"WAIT_RC_HANDLER_DONE", str_len);
} else if (log_type == TAKE_OVER_FINISH) {
strncpy(str ,"FINISH", str_len);
} else if (log_type == UNKNOWN_TAKE_OVER_STATE) {
strncpy(str ,"UNKNOWN", str_len);
} else {
ret = OB_INVALID_ARGUMENT;
}
return ret;
}
struct RCDiagnoseInfo {
RCDiagnoseInfo() { reset(); }
void reset();
int64_t id_;
TakeOverState state_;
ObLogBaseType log_type_; //仅当处于WAIT_RC_HANDLER_DONE时才有意义
TO_STRING_KV(K(state_),
K(log_type_));
};
class ObRoleChangeHandler {
public:
ObRoleChangeHandler();
@ -32,7 +73,7 @@ public:
void unregister_handler(const ObLogBaseType &type);
void switch_to_follower_forcedly();
int switch_to_leader();
int switch_to_leader(RCDiagnoseInfo &diagnose_info);
// @retval:
// 1. OB_SUCCESS
// 2. OB_LS_NEED_REVOKE, ObRoleChangeService need revoke this LS.

View File

@ -57,6 +57,7 @@ ObRoleChangeService::ObRoleChangeService() : ls_service_(NULL),
apply_service_(NULL),
replay_service_(NULL),
tg_id_(-1),
cur_task_info_(),
is_inited_(false)
{
}
@ -82,6 +83,7 @@ int ObRoleChangeService::init(storage::ObLSService *ls_service,
} else if (OB_FAIL(TG_CREATE_TENANT(tg_id, tg_id_))) {
CLOG_LOG(WARN, "ObRoleChangeService TG_CREATE failed", K(ret));
} else {
cur_task_info_.reset();
ls_service_ = ls_service;
apply_service_ = apply_service;
replay_service_ = replay_service;
@ -128,6 +130,7 @@ void ObRoleChangeService::destroy()
(void)wait();
TG_DESTROY(tg_id_);
is_inited_ = false;
cur_task_info_.reset();
tg_id_ = -1;
ls_service_ = NULL;
apply_service_ = NULL;
@ -271,7 +274,7 @@ int ObRoleChangeService::handle_role_change_cb_event_for_restore_handler_(
new_proposal_id, is_pending_state))) {
CLOG_LOG(WARN, "ObRestoreHandler prepare_switch_role failed", K(ret), K(curr_role), K(curr_proposal_id),
K(new_role), K(new_proposal_id));
} else if (true == is_pending_state
} else if (true == is_pending_state
|| false == check_need_execute_role_change_(curr_proposal_id, curr_role, new_proposal_id, new_role)) {
CLOG_LOG(INFO, "no need change role", K(ret), K(is_pending_state), K(curr_role), K(curr_proposal_id),
K(new_role), K(new_proposal_id));
@ -330,7 +333,7 @@ int ObRoleChangeService::handle_role_change_cb_event_for_log_handler_(
} else if (true == is_pending_state) {
CLOG_LOG(INFO, "curr state of palf is follower pending, need ignore this signal", K(curr_role), K(curr_proposal_id),
K(new_role), K(new_proposal_id), K(is_pending_state));
} else if (true == is_pending_state
} else if (true == is_pending_state
|| false == check_need_execute_role_change_(curr_proposal_id, curr_role, new_proposal_id, new_role)) {
CLOG_LOG(INFO, "no need change role", K(ret), K(is_pending_state), K(curr_role), K(curr_proposal_id),
K(new_role), K(new_proposal_id));
@ -389,10 +392,10 @@ int ObRoleChangeService::handle_change_leader_event_for_restore_handler_(
curr_proposal_id, new_role, new_proposal_id, is_pending_state))) {
CLOG_LOG(WARN, "ObRestoreHandler prepare_switch_role failed", K(ret), K(curr_role), K(curr_proposal_id),
K(new_role), K(new_proposal_id));
} else if (true == is_pending_state
} else if (true == is_pending_state
|| curr_proposal_id != new_proposal_id || LEADER != curr_role || LEADER != new_role) {
ls->get_log_restore_handler()->change_leader_to(dst_addr);
CLOG_LOG(INFO, "no need execute switch_leader_to_follower_gracefully_restore_, change leader directlly",
CLOG_LOG(INFO, "no need execute switch_leader_to_follower_gracefully_restore_, change leader directlly",
K(ret), K(is_pending_state), K(curr_proposal_id), K(new_proposal_id), K(curr_role), K(new_role));
} else if (OB_FAIL(switch_leader_to_follower_gracefully_restore_(dst_addr, curr_proposal_id, ls))) {
CLOG_LOG(WARN, "switch_leader_to_follower_gracefully_restore_ failed", K(ret), K(ls), K(dst_addr));
@ -417,11 +420,11 @@ int ObRoleChangeService::handle_change_leader_event_for_log_handler_(
curr_proposal_id, new_role, new_proposal_id, is_pending_state))) {
CLOG_LOG(WARN, "ObLogHandler prepare_switch_role failed", K(ret), K(curr_role), K(curr_proposal_id),
K(new_role), K(new_proposal_id));
} else if (true == is_pending_state
} else if (true == is_pending_state
|| curr_proposal_id != new_proposal_id || LEADER != curr_role || LEADER != new_role) {
// when log handler is not LEDAER, we also need execute change_leader_to, otherwise, the leader can not be changed by election.
ls->get_log_handler()->change_leader_to(dst_addr);
CLOG_LOG(INFO, "no need execute switch_leader_to_follower_gracefully, change leader directlly",
CLOG_LOG(INFO, "no need execute switch_leader_to_follower_gracefully, change leader directlly",
K(ret), K(is_pending_state), K(curr_proposal_id), K(new_proposal_id), K(curr_role), K(new_role));
} else if (OB_FAIL(switch_leader_to_follower_gracefully_(new_proposal_id, curr_proposal_id,
dst_addr, ls))) {
@ -445,6 +448,8 @@ int ObRoleChangeService::switch_follower_to_leader_(
ObTimeGuard time_guard("switch_to_leader", EACH_ROLE_CHANGE_COST_MAX_TIME);
ObLogHandler *log_handler = ls->get_log_handler();
ObRoleChangeHandler *role_change_handler = ls->get_role_change_handler();
ATOMIC_SET(&cur_task_info_.state_, TakeOverState::WAIT_REPLAY_DONE);
ATOMIC_SET(&cur_task_info_.id_, ls->get_ls_id().id());
if (OB_FAIL(log_handler->get_end_lsn(end_lsn))) {
CLOG_LOG(WARN, "get_end_lsn failed", K(ret), KPC(ls));
// NB: order is vital!!!
@ -460,10 +465,13 @@ int ObRoleChangeService::switch_follower_to_leader_(
|| OB_FAIL(replay_service_->switch_to_leader(ls_id))) {
} else if (FALSE_IT(log_handler->switch_role(new_role, new_proposal_id))) {
CLOG_LOG(WARN, "ObLogHandler switch role failed", K(ret), K(new_role), K(new_proposal_id));
} else if (FALSE_IT(ATOMIC_SET(&cur_task_info_.state_, TakeOverState::WAIT_RC_HANDLER_DONE))) {
} else if (FALSE_IT(time_guard.click("role_change_handler->switch_to_leader"))
|| OB_FAIL(role_change_handler->switch_to_leader())) {
|| OB_FAIL(role_change_handler->switch_to_leader(cur_task_info_))) {
CLOG_LOG(WARN, "ObRoleChangeHandler switch_to_leader failed", K(ret), KPC(ls));
} else {
ATOMIC_SET(&cur_task_info_.state_, TakeOverState::TAKE_OVER_FINISH);
ATOMIC_SET(&cur_task_info_.log_type_, ObLogBaseType::INVALID_LOG_BASE_TYPE);
CLOG_LOG(INFO, "switch_follower_to_leader_ success", K(ret), KPC(ls));
}
if (OB_FAIL(ret)) {
@ -548,7 +556,7 @@ int ObRoleChangeService::switch_leader_to_follower_gracefully_(
K(new_role), K(new_proposal_id), K(dst_addr));
// wait apply service done my fail, we need :
// 1. switch log handler to origin status.
// 2. resume role change handler
// 2. resume role change handler
log_handler->switch_role(LEADER, curr_proposal_id);
if (OB_FAIL(role_change_handler->resume_to_leader())) {
CLOG_LOG(WARN, "resume to leader failed", K(ret), KPC(ls));
@ -613,13 +621,17 @@ int ObRoleChangeService::switch_follower_to_leader_restore_(
const ObRole new_role = LEADER;
ObLogRestoreHandler *log_restore_handler = ls->get_log_restore_handler();
ObRoleChangeHandler *restore_role_change_handler = ls->get_restore_role_change_handler();
ATOMIC_SET(&cur_task_info_.state_, TakeOverState::WAIT_RC_HANDLER_DONE);
ATOMIC_SET(&cur_task_info_.id_, ls->get_ls_id().id());
ObTimeGuard time_guard("switch_follower_to_leader_restore_", EACH_ROLE_CHANGE_COST_MAX_TIME);
if (FALSE_IT(log_restore_handler->switch_role(new_role, new_proposal_id))) {
} else if (FALSE_IT(time_guard.click("restore_role_change_handler->switch_to_leader"))
|| OB_FAIL(restore_role_change_handler->switch_to_leader())) {
|| OB_FAIL(restore_role_change_handler->switch_to_leader(cur_task_info_))) {
CLOG_LOG(WARN, "restore_role_change_handler switch_to_leader failed", K(ret), K(new_role),
K(new_proposal_id), K(ls));
} else {
ATOMIC_SET(&cur_task_info_.state_, TakeOverState::TAKE_OVER_FINISH);
ATOMIC_SET(&cur_task_info_.log_type_, ObLogBaseType::INVALID_LOG_BASE_TYPE);
}
if (OB_FAIL(ret)) {
log_restore_handler->revoke_leader();
@ -802,5 +814,20 @@ bool ObRoleChangeService::check_need_execute_role_change_(
{
return curr_proposal_id != new_proposal_id || curr_role != new_role;
}
int ObRoleChangeService::diagnose(RCDiagnoseInfo &diagnose_info)
{
int ret = OB_SUCCESS;
if (diagnose_info.id_ == ATOMIC_LOAD(&cur_task_info_.id_)) {
// 当前日志流的切主任务正在被处理
diagnose_info.state_ = ATOMIC_LOAD(&cur_task_info_.state_);
diagnose_info.log_type_ = ATOMIC_LOAD(&cur_task_info_.log_type_);
} else {
// 当前日志流切主任务尚未被处理, 可能是因为其他日志流的切主任务卡住
diagnose_info.state_ = logservice::TakeOverState::UNKNOWN_TAKE_OVER_STATE;
diagnose_info.log_type_ = logservice::ObLogBaseType::INVALID_LOG_BASE_TYPE;
}
return ret;
}
} // end namespace logservice
} // end namespace oceanbase

View File

@ -61,6 +61,7 @@ public:
void handle(void *task);
int on_role_change(const int64_t id) final override;
int on_need_change_leader(const int64_t ls_id, const common::ObAddr &dst_addr) final override;
int diagnose(RCDiagnoseInfo &diagnose_info);
private:
int submit_role_change_event_(const RoleChangeEvent &event);
@ -136,6 +137,7 @@ private:
logservice::ObLogApplyService *apply_service_;
logservice::ObILogReplayService *replay_service_;
int tg_id_;
RCDiagnoseInfo cur_task_info_; // for diagnose
bool is_inited_;
};
} // end namespace logservice

View File

@ -746,7 +746,8 @@ void ObLogReplayService::process_replay_ret_code_(const int ret_code,
if (OB_SUCCESS != ret_code) {
int64_t cur_ts = ObTimeUtility::fast_current_time();
if (replay_status.is_fatal_error(ret_code)) {
replay_status.set_err_info(replay_task.lsn_, cur_ts, ret_code);
replay_status.set_err_info(replay_task.lsn_, replay_task.log_ts_, replay_task.log_type_,
replay_task.replay_hint_, false, cur_ts, ret_code);
CLOG_LOG(ERROR, "replay task encount fatal error", K(replay_status), K(replay_task), K(ret_code));
} else {/*do nothing*/}
@ -802,10 +803,8 @@ int ObLogReplayService::do_replay_task_(ObLogReplayTask *replay_task,
{
int ret = OB_SUCCESS;
ObLS *ls;
int64_t start_ts = ObTimeUtility::fast_current_time();
get_replay_queue_index() = replay_queue_idx;
ObLogReplayBuffer *replay_log_buff = NULL;
int64_t retry_time = 0;
bool need_replay = false;
if (OB_ISNULL(replay_status) || OB_ISNULL(replay_task)) {
ret = OB_INVALID_ARGUMENT;
@ -848,24 +847,6 @@ int ObLogReplayService::do_replay_task_(ObLogReplayTask *replay_task,
}
}
CLOG_LOG(TRACE, "do replay task", KPC(replay_task), KPC(replay_status));
} else if (OB_EAGAIN == ret) {
if (common::OB_INVALID_TIMESTAMP == replay_task->first_handle_ts_) {
replay_task->first_handle_ts_ = start_ts;
replay_task->print_error_ts_ = start_ts;
} else if ((start_ts - replay_task->print_error_ts_) > MAX_SINGLE_RETRY_WARNING_TIME_THRESOLD) {
retry_time = start_ts - replay_task->first_handle_ts_;
CLOG_LOG(WARN, "single replay task retry cost too much time. replay may be delayed",
K(retry_time), KPC(replay_task), KPC(replay_status));
replay_task->print_error_ts_ = start_ts;
}
}
int64_t replay_used_time = ObTimeUtility::fast_current_time() - start_ts;
if (replay_used_time > MAX_REPLAY_TIME_PER_ROUND) {
if (replay_used_time > MAX_SINGLE_REPLAY_WARNING_TIME_THRESOLD && !get_replay_is_writing_throttling()) {
CLOG_LOG(ERROR, "single replay task cost too much time. replay may be delayed", K(replay_used_time), KPC(replay_task), KPC(replay_status));
} else {
CLOG_LOG(WARN, "single replay task cost too much time", K(replay_used_time), KPC(replay_task), KPC(replay_status));
}
}
get_replay_queue_index() = -1;
get_replay_is_writing_throttling() = false;
@ -940,6 +921,8 @@ int ObLogReplayService::try_submit_remained_log_replay_task_(ObReplayServiceSubm
LSN cur_log_lsn = replay_task->lsn_;
int64_t cur_log_size = replay_task->log_size_;
int64_t cur_log_ts = replay_task->log_ts_;
int64_t replay_hint = replay_task->replay_hint_;
ObLogBaseType log_type = replay_task->log_type_;
if (OB_FAIL(check_can_submit_log_replay_task_(replay_task, replay_status))) {
// do nothing
} else if (OB_SUCC(submit_log_replay_task_(*replay_task, *replay_status))) {
@ -953,7 +936,7 @@ int ObLogReplayService::try_submit_remained_log_replay_task_(ObReplayServiceSubm
// log info回退
int64_t cur_ts = common::ObTimeUtility::fast_current_time();
CLOG_LOG(ERROR, "failed to update_next_submit_log_info", KR(ret), KPC(replay_status), K(cur_log_lsn), K(cur_log_size), K(cur_log_ts));
replay_status->set_err_info(cur_log_lsn, cur_ts, ret);
replay_status->set_err_info(cur_log_lsn, cur_log_ts, log_type, replay_hint, true, cur_ts, ret);
}
}
}
@ -1105,14 +1088,16 @@ int ObLogReplayService::fetch_and_submit_single_log_(ObReplayStatus &replay_stat
CLOG_LOG(ERROR, "failed to update_next_to_submit_log_ts_allow_equal", KR(tmp_ret),
K(cur_log_submit_ts), K(replay_status));
ret = OB_ERR_UNEXPECTED;
replay_status.set_err_info(cur_lsn, ObClockGenerator::getClock(), ret);
replay_status.set_err_info(cur_lsn, cur_log_submit_ts, replay_task->log_type_,
replay_task->replay_hint_, true, ObClockGenerator::getClock(), tmp_ret);
free_replay_task_log_buf(replay_task);
free_replay_task(replay_task);
} else {
submit_task->cache_replay_task(replay_task);
}
} else {
replay_status.set_err_info(cur_lsn, ObClockGenerator::getClock(), ret);
replay_status.set_err_info(cur_lsn, cur_log_submit_ts, replay_task->log_type_,
replay_task->replay_hint_ ,true, ObClockGenerator::getClock(), ret);
free_replay_task_log_buf(replay_task);
free_replay_task(replay_task);
}
@ -1170,7 +1155,8 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta
} else if (OB_FAIL(submit_task->update_next_to_submit_lsn(committed_end_lsn))) {
// log info回退
CLOG_LOG(ERROR, "failed to update_next_submit_log_info", KR(ret), K(committed_end_lsn), KPC(replay_status));
replay_status->set_err_info(committed_end_lsn, ObClockGenerator::getClock(), ret);
replay_status->set_err_info(committed_end_lsn, to_submit_log_ts, ObLogBaseType::INVALID_LOG_BASE_TYPE,
0, true, ObClockGenerator::getClock(), ret);
} else {
CLOG_LOG(INFO, "no log to fetch but committed_end_lsn not reached, last log may be padding",
KR(ret), K(to_submit_lsn), K(committed_end_lsn), K(to_submit_log_ts), KPC(replay_status));
@ -1191,7 +1177,8 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta
// log info回退
CLOG_LOG(ERROR, "failed to update_next_submit_log_info", KR(ret), K(to_submit_lsn),
K(log_size), K(to_submit_log_ts));
replay_status->set_err_info(to_submit_lsn, ObClockGenerator::getClock(), ret);
replay_status->set_err_info(to_submit_lsn, to_submit_log_ts, ObLogBaseType::INVALID_LOG_BASE_TYPE,
0, true, ObClockGenerator::getClock(), ret);
}
} else if (OB_EAGAIN == ret) {
// do nothing
@ -1420,6 +1407,26 @@ int ObLogReplayService::remove_all_ls_()
return ret;
}
int ObLogReplayService::diagnose(const share::ObLSID &id,
ReplayDiagnoseInfo &diagnose_info)
{
int ret = OB_SUCCESS;
ObReplayStatus *replay_status = NULL;
ObReplayStatusGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "replay service not init", K(ret));
} else if (OB_FAIL(get_replay_status_(id, guard))) {
CLOG_LOG(WARN, "guard get replay status failed", K(ret), K(id));
} else if (NULL == (replay_status = guard.get_replay_status())) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "replay status is not exist", K(ret), K(id));
} else if (OB_FAIL(replay_status->diagnose(diagnose_info))) {
CLOG_LOG(WARN, "replay status enable failed", K(ret), K(id));
}
return ret;
}
bool ObLogReplayService::GetReplayStatusFunctor::operator()(const share::ObLSID &id,
ObReplayStatus *replay_status)
{

View File

@ -137,6 +137,7 @@ public:
int update_replayable_point(const int64_t replayable_ts_ns);
int stat_for_each(const common::ObFunction<int (const ObReplayStatus &)> &func);
int stat_all_ls_replay_process(int64_t &replayed_log_size, int64_t &unreplayed_log_size);
int diagnose(const share::ObLSID &id, ReplayDiagnoseInfo &diagnose_info);
void inc_pending_task_size(const int64_t log_size);
void dec_pending_task_size(const int64_t log_size);
int64_t get_pending_task_size() const;
@ -192,8 +193,6 @@ private:
int remove_all_ls_();
private:
const int64_t MAX_REPLAY_TIME_PER_ROUND = 100 * 1000; //100ms
const int64_t MAX_SINGLE_REPLAY_WARNING_TIME_THRESOLD = 1000 * 1000; //1s 单条日志回放执行时间超过此值报error
const int64_t MAX_SINGLE_RETRY_WARNING_TIME_THRESOLD = 5 * 1000 * 1000; //1s 单条日志回放重试超过此值报error
const int64_t MAX_SUBMIT_TIME_PER_ROUND = 1000 * 1000; //1s
const int64_t TASK_QUEUE_WAIT_IN_GLOBAL_QUEUE_TIME_THRESHOLD = 5 * 1000 * 1000; //5s
const int64_t PENDING_TASK_MEMORY_LIMIT = 128 * (1LL << 20); //128MB

View File

@ -408,6 +408,11 @@ int64_t ObReplayServiceReplayTask::idx() const
int ObReplayServiceReplayTask::get_min_unreplayed_log_info(LSN &lsn,
int64_t &log_ts,
int64_t &replay_hint,
ObLogBaseType &log_type,
int64_t &first_handle_ts,
int64_t &replay_cost,
int64_t &retry_cost,
bool &is_queue_empty)
{
int ret = OB_SUCCESS;
@ -417,6 +422,11 @@ int ObReplayServiceReplayTask::get_min_unreplayed_log_info(LSN &lsn,
if (NULL != top_item && NULL != (replay_task = static_cast<ObLogReplayTask *>(top_item))) {
lsn = replay_task->lsn_;
log_ts = replay_task->log_ts_;
replay_hint = replay_task->replay_hint_;
log_type = replay_task->log_type_;
first_handle_ts = replay_task->first_handle_ts_;
replay_cost = replay_task->replay_cost_;
retry_cost = replay_task->retry_cost_;
is_queue_empty = false;
} else {
is_queue_empty = true;
@ -914,51 +924,58 @@ int ObReplayStatus::get_ls_id(share::ObLSID &id)
}
int ObReplayStatus::get_min_unreplayed_log_info(LSN &lsn,
int64_t &log_ts)
int64_t &log_ts,
int64_t &replay_hint,
ObLogBaseType &log_type,
int64_t &first_handle_ts,
int64_t &replay_cost,
int64_t &retry_cost)
{
int ret = OB_SUCCESS;
int64_t base_log_ts = 0;
do {
RLockGuard rlock_guard(rwlock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "replay status is not inited", K(ret));
} else if (!is_enabled_) {
ret = OB_STATE_NOT_MATCH;
if (palf_reach_time_interval(1 * 1000 * 1000, get_log_info_debug_time_)) {
CLOG_LOG(WARN, "replay status is not enabled", K(ret), KPC(this));
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "replay status is not inited", K(ret));
} else if (!is_enabled_) {
ret = OB_STATE_NOT_MATCH;
if (palf_reach_time_interval(1 * 1000 * 1000, get_log_info_debug_time_)) {
CLOG_LOG(WARN, "replay status is not enabled", K(ret), KPC(this));
}
} else if (OB_FAIL(submit_log_task_.get_next_to_submit_log_info(lsn, log_ts))) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "get_next_to_submit_log_ts failed", K(ret));
} else if (OB_FAIL(submit_log_task_.get_base_log_ts(base_log_ts))) {
CLOG_LOG(ERROR, "get_base_log_ts failed", K(ret));
} else if (log_ts <= base_log_ts) {
//拉到的日志尚未超过过滤点
log_ts = base_log_ts;
if (palf_reach_time_interval(5 * 1000 * 1000, get_log_info_debug_time_)) {
CLOG_LOG(INFO, "get_min_unreplayed_log_info in skip state", K(lsn), K(log_ts), KPC(this));
}
} else {
LSN queue_lsn;
int64_t queue_ts = OB_INVALID_TIMESTAMP;
bool is_queue_empty = true;
for (int64_t i = 0; OB_SUCC(ret) && i < REPLAY_TASK_QUEUE_SIZE; ++i) {
if (OB_FAIL(task_queues_[i].get_min_unreplayed_log_info(queue_lsn, queue_ts, replay_hint, log_type,
first_handle_ts, replay_cost, retry_cost, is_queue_empty))) {
CLOG_LOG(ERROR, "task_queue get_min_unreplayed_log_info failed", K(ret), K(task_queues_[i]));
} else if (!is_queue_empty
&& queue_lsn < lsn
&& queue_ts < log_ts) {
lsn = queue_lsn;
log_ts = queue_ts;
}
}
} while (0);
if (OB_SUCC(ret)) {
if (OB_FAIL(submit_log_task_.get_next_to_submit_log_info(lsn, log_ts))) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "get_next_to_submit_log_ts failed", K(ret));
} else if (OB_FAIL(submit_log_task_.get_base_log_ts(base_log_ts))) {
CLOG_LOG(ERROR, "get_base_log_ts failed", K(ret));
} else if (log_ts <= base_log_ts) {
//拉到的日志尚未超过过滤点
log_ts = base_log_ts;
if (palf_reach_time_interval(5 * 1000 * 1000, get_log_info_debug_time_)) {
CLOG_LOG(INFO, "get_min_unreplayed_log_info in skip state", K(lsn), K(log_ts), KPC(this));
}
} else {
LSN queue_lsn;
int64_t queue_ts = OB_INVALID_TIMESTAMP;
bool is_queue_empty = true;
for (int64_t i = 0; OB_SUCC(ret) && i < REPLAY_TASK_QUEUE_SIZE; ++i) {
if (OB_FAIL(task_queues_[i].get_min_unreplayed_log_info(queue_lsn, queue_ts, is_queue_empty))) {
CLOG_LOG(ERROR, "task_queue get_min_unreplayed_log_info failed", K(ret), K(task_queues_[i]));
} else if (!is_queue_empty
&& queue_lsn < lsn
&& queue_ts < log_ts) {
lsn = queue_lsn;
log_ts = queue_ts;
}
}
if (palf_reach_time_interval(5 * 1000 * 1000, get_log_info_debug_time_)) {
CLOG_LOG(INFO, "get_min_unreplayed_log_info", K(lsn), K(log_ts), KPC(this));
}
if (palf_reach_time_interval(5 * 1000 * 1000, get_log_info_debug_time_)) {
CLOG_LOG(INFO, "get_min_unreplayed_log_info", K(lsn), K(log_ts), KPC(this));
}
}
if (OB_SUCC(ret) && !is_enabled()) {
//double check
ret = OB_STATE_NOT_MATCH;
if (palf_reach_time_interval(1 * 1000 * 1000, get_log_info_debug_time_)) {
CLOG_LOG(WARN, "replay status is not enabled", K(ret), KPC(this));
}
}
return ret;
@ -966,14 +983,26 @@ int ObReplayStatus::get_min_unreplayed_log_info(LSN &lsn,
int ObReplayStatus::get_min_unreplayed_lsn(LSN &lsn)
{
int64_t unused = OB_INVALID_TIMESTAMP;
return get_min_unreplayed_log_info(lsn, unused);
int64_t unused_log_ts = OB_INVALID_TIMESTAMP;
int64_t unused_replay_hint = 0;
ObLogBaseType unused_log_type = ObLogBaseType::INVALID_LOG_BASE_TYPE;
int64_t unused_first_handle_ts = 0;
int64_t unused_replay_cost = 0;
int64_t unused_retry_cost = 0;
return get_min_unreplayed_log_info(lsn, unused_log_ts, unused_replay_hint, unused_log_type,
unused_first_handle_ts, unused_replay_cost, unused_retry_cost);
}
int ObReplayStatus::get_min_unreplayed_log_ts_ns(int64_t &log_ts)
{
LSN unused;
return get_min_unreplayed_log_info(unused, log_ts);
LSN unused_lsn;
int64_t unused_replay_hint = 0;
int64_t unused_first_handle_ts = 0;
ObLogBaseType unused_log_type = ObLogBaseType::INVALID_LOG_BASE_TYPE;
int64_t unused_replay_cost = 0;
int64_t unused_retry_cost = 0;
return get_min_unreplayed_log_info(unused_lsn, log_ts, unused_replay_hint, unused_log_type,
unused_first_handle_ts, unused_replay_cost, unused_retry_cost);
}
int ObReplayStatus::get_replay_process(int64_t &replayed_log_size,
@ -1199,11 +1228,18 @@ int ObReplayStatus::check_replay_barrier(ObLogReplayTask *replay_task,
return ret;
}
void ObReplayStatus::set_err_info(const LSN &lsn,
void ObReplayStatus::set_err_info(const palf::LSN &lsn,
const uint64_t scn,
const ObLogBaseType &log_type,
const int64_t replay_hint,
const bool is_submit_err,
const int64_t err_ts,
const int err_ret)
{
err_info_.lsn_ = lsn;
err_info_.scn_ = scn;
err_info_.log_type_ = log_type;
err_info_.is_submit_err_ = is_submit_err;
err_info_.err_ts_ = err_ts;
err_info_.err_ret_ = err_ret;
}
@ -1253,5 +1289,65 @@ int ObReplayStatus::stat(LSReplayStat &stat) const
return ret;
}
int ObReplayStatus::diagnose(ReplayDiagnoseInfo &diagnose_info)
{
int ret = OB_SUCCESS;
RLockGuard rlock_guard(rwlock_);
LSN min_unreplayed_lsn;
int64_t min_unreplayed_scn;
int64_t replay_hint = 0;
ObLogBaseType log_type = ObLogBaseType::INVALID_LOG_BASE_TYPE;
char log_type_str[common::MAX_SERVICE_TYPE_BUF_LENGTH];
int64_t first_handle_time = 0;
int64_t replay_cost = 0;
int64_t retry_cost = 0;
int replay_ret = OB_SUCCESS;
bool is_submit_err = false;
diagnose_info.diagnose_str_.reset();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (!is_enabled_) {
ret = OB_STATE_NOT_MATCH;
} else if (OB_FAIL(get_min_unreplayed_log_info(min_unreplayed_lsn, min_unreplayed_scn, replay_hint,
log_type, first_handle_time, replay_cost, retry_cost))) {
CLOG_LOG(WARN, "get_min_unreplayed_log_info failed", KPC(this), K(ret));
} else if (FALSE_IT(diagnose_info.max_replayed_lsn_ = min_unreplayed_lsn) ||
FALSE_IT(diagnose_info.max_replayed_scn_ = min_unreplayed_scn - 1)) {
} else if (OB_FAIL(log_base_type_to_string(log_type, log_type_str, common::MAX_SERVICE_TYPE_BUF_LENGTH))) {
CLOG_LOG(WARN, "log_base_type_to_string failed", K(ret), K(log_type));
} else if (OB_SUCCESS != err_info_.err_ret_) {
// 发生过不可重试的错误, 此场景不需要诊断最小未回放位日志
min_unreplayed_lsn = err_info_.lsn_;
min_unreplayed_scn = err_info_.scn_;
replay_hint = err_info_.replay_hint_;
log_type = err_info_.log_type_;
first_handle_time = err_info_.err_ts_;
replay_cost = 0;
retry_cost = 0;
replay_ret = err_info_.err_ret_;
is_submit_err = err_info_.is_submit_err_;
} else if (0 < retry_cost || 0 < replay_cost) {
replay_ret = OB_EAGAIN;
}
if (OB_SUCC(ret)) {
if (OB_FAIL(diagnose_info.diagnose_str_.append_fmt("ret:%d; "
"min_unreplayed_lsn:%ld; "
"min_unreplayed_scn:%ld; "
"replay_hint:%ld; "
"log_type:%s; "
"replay_cost:%ld; "
"retry_cost:%ld; "
"first_handle_time:%ld;" ,
replay_ret, min_unreplayed_lsn.val_,
min_unreplayed_scn, replay_hint,
is_submit_err ? "REPLAY_SUBMIT" : log_type_str,
replay_cost, retry_cost, first_handle_time))) {
CLOG_LOG(WARN, "append diagnose str failed", K(ret), K(replay_ret), K(min_unreplayed_lsn), K(min_unreplayed_scn),
K(replay_hint), K(is_submit_err), K(replay_cost), K(retry_cost), K(first_handle_time));
}
}
return ret;
}
} // namespace logservice
}

View File

@ -75,6 +75,15 @@ struct LSReplayStat
K(pending_cnt_));
};
struct ReplayDiagnoseInfo
{
palf::LSN max_replayed_lsn_;
int64_t max_replayed_scn_;
ObSqlString diagnose_str_;
TO_STRING_KV(K(max_replayed_lsn_),
K(max_replayed_scn_));
};
//此类型为前向barrier日志专用, 与ObLogReplayTask分开分配
//因此此结构的内存需要单独释放
struct ObLogReplayBuffer
@ -131,6 +140,8 @@ public:
bool is_raw_write_;
int64_t first_handle_ts_;
int64_t print_error_ts_;
int64_t replay_cost_; //此任务重试的总耗时时间
int64_t retry_cost_; //此任务回放成功时的当次处理时间
void *log_buf_;
TO_STRING_KV(K(ls_id_),
@ -367,6 +378,11 @@ public:
}
int get_min_unreplayed_log_info(palf::LSN &lsn,
int64_t &log_ts,
int64_t &replay_hint,
ObLogBaseType &log_type,
int64_t &first_handle_ts,
int64_t &replay_cost,
int64_t &retry_cost,
bool &is_queue_empty);
private:
Link *pop_()
@ -421,12 +437,20 @@ public:
}
void reset() {
lsn_.reset();
scn_ = 0;
log_type_ = ObLogBaseType::INVALID_LOG_BASE_TYPE;
is_submit_err_ = false;
err_ts_ = 0;
err_ret_ = common::OB_SUCCESS;
}
TO_STRING_KV(K(lsn_), K(err_ts_), K(err_ret_));
TO_STRING_KV(K(lsn_), K(scn_), K(log_type_),
K(is_submit_err_), K(err_ts_), K(err_ret_));
public:
palf::LSN lsn_;
uint64_t scn_;
ObLogBaseType log_type_;
int64_t replay_hint_;
bool is_submit_err_; //is submit log task error occured
int64_t err_ts_; //the timestamp that partition encounts fatal error
int err_ret_; //the ret code of fatal error
};
@ -496,7 +520,12 @@ public:
int get_min_unreplayed_lsn(palf::LSN &lsn);
int get_min_unreplayed_log_ts_ns(int64_t &log_ts);
int get_min_unreplayed_log_info(palf::LSN &lsn,
int64_t &log_ts);
int64_t &log_ts,
int64_t &replay_hint,
ObLogBaseType &log_type,
int64_t &first_handle_ts,
int64_t &replay_cost,
int64_t &retry_cost);
int get_replay_process(int64_t &replayed_log_size, int64_t &unreplayed_log_size);
//提交日志检查barrier状态
int check_submit_barrier();
@ -508,7 +537,7 @@ public:
void set_post_barrier_submitted(const palf::LSN &lsn);
int set_post_barrier_finished(const palf::LSN &lsn);
int stat(LSReplayStat &stat) const;
int diagnose(ReplayDiagnoseInfo &diagnose_info);
inline void inc_ref()
{
ATOMIC_INC(&ref_cnt_);
@ -522,7 +551,13 @@ public:
return replay_hint & (REPLAY_TASK_QUEUE_SIZE - 1);
}
// 用于记录日志流级别的错误, 此类错误不可恢复
void set_err_info(const palf::LSN &lsn, const int64_t err_ts, const int err_ret);
void set_err_info(const palf::LSN &lsn,
const uint64_t scn,
const ObLogBaseType &log_type,
const int64_t replay_hint,
const bool is_submit_err,
const int64_t err_ts,
const int err_ret);
bool has_fatal_error() const
{
return is_fatal_error(err_info_.err_ret_);

View File

@ -229,6 +229,7 @@ ob_set_subtarget(ob_server virtual_table
virtual_table/ob_all_virtual_log_stat.cpp
virtual_table/ob_all_virtual_apply_stat.cpp
virtual_table/ob_all_virtual_replay_stat.cpp
virtual_table/ob_all_virtual_ha_diagnose.cpp
virtual_table/ob_global_variables.cpp
virtual_table/ob_gv_sql.cpp
virtual_table/ob_gv_sql_audit.cpp

View File

@ -0,0 +1,234 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_all_virtual_ha_diagnose.h"
#include "lib/ob_define.h"
#include "lib/ob_errno.h"
#include "lib/oblog/ob_log_module.h"
#include "storage/tx_storage/ob_ls_service.h"
namespace oceanbase
{
namespace observer
{
int ObAllVirtualHADiagnose::inner_get_next_row(common::ObNewRow *&row)
{
int ret = OB_SUCCESS;
if (false == start_to_read_) {
auto func_iter_ls = [&](const storage::ObLS &ls) -> int
{
int ret = OB_SUCCESS;
storage::DiagnoseInfo diagnose_info;
if (OB_FAIL(ls.diagnose(diagnose_info))) {
SERVER_LOG(WARN, "ls stat diagnose info failed", K(ret), K(ls));
} else if (OB_FAIL(insert_stat_(diagnose_info))) {
SERVER_LOG(WARN, "insert stat failed", K(ret), K(diagnose_info));
} else if (OB_FAIL(scanner_.add_row(cur_row_))) {
SERVER_LOG(WARN, "iter diagnose info faild", KR(ret), K(diagnose_info));
} else {
SERVER_LOG(INFO, "iter diagnose info succ", K(diagnose_info));
}
return ret;
};
auto func_iterate_tenant = [&func_iter_ls]() -> int
{
int ret = OB_SUCCESS;
storage::ObLSService *ls_service = MTL(storage::ObLSService*);
if (NULL == ls_service) {
SERVER_LOG(INFO, "tenant has no ObLSService", K(MTL_ID()));
} else if (OB_FAIL(ls_service->iterate_diagnose(func_iter_ls))) {
SERVER_LOG(WARN, "iter ls failed", K(ret));
} else {
SERVER_LOG(INFO, "iter ls succ", K(ret));
}
return ret;
};
if (NULL == omt_) {
SERVER_LOG(INFO, "omt is NULL", K(MTL_ID()));
} else if (OB_FAIL(omt_->operate_each_tenant_for_sys_or_self(func_iterate_tenant))) {
SERVER_LOG(WARN, "iter tenant failed", K(ret));
} else {
scanner_it_ = scanner_.begin();
start_to_read_ = true;
}
}
if (OB_SUCC(ret) && start_to_read_) {
if (OB_FAIL(scanner_it_.get_next_row(cur_row_))) {
if (OB_ITER_END != ret) {
SERVER_LOG(WARN, "get next row failed", K(ret));
}
} else {
row = &cur_row_;
}
}
return ret;
}
int ObAllVirtualHADiagnose::insert_stat_(storage::DiagnoseInfo &diagnose_info)
{
int ret = OB_SUCCESS;
const int64_t count = output_column_ids_.count();
for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) {
uint64_t col_id = output_column_ids_.at(i);
switch (col_id) {
case TENANT_ID:
cur_row_.cells_[i].set_int(MTL_ID());
break;
case LS_ID:
cur_row_.cells_[i].set_int(diagnose_info.ls_id_);
break;
case SVR_IP:
if (false == GCTX.self_addr().ip_to_string(ip_, common::OB_IP_PORT_STR_BUFF)) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "ip_to_string failed", K(ret));
} else {
cur_row_.cells_[i].set_varchar(ObString::make_string(ip_));
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
}
break;
case SVR_PORT:
cur_row_.cells_[i].set_int(GCTX.self_addr().get_port());
break;
case ELECTION_ROLE:
if (OB_FAIL(role_to_string(diagnose_info.palf_diagnose_info_.election_role_,
election_role_str_, sizeof(election_role_str_)))) {
SERVER_LOG(WARN, "role_to_string failed", K(ret), K(diagnose_info));
} else {
cur_row_.cells_[i].set_varchar(ObString::make_string(election_role_str_));
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
ObCharset::get_default_charset()));
}
break;
case ELECTION_EPOCH:
cur_row_.cells_[i].set_int(diagnose_info.palf_diagnose_info_.election_epoch_);
break;
case PALF_ROLE:
if (OB_FAIL(role_to_string(diagnose_info.palf_diagnose_info_.palf_role_,
palf_role_str_, sizeof(palf_role_str_)))) {
SERVER_LOG(WARN, "role_to_string failed", K(ret), K(diagnose_info));
} else {
cur_row_.cells_[i].set_varchar(ObString::make_string(palf_role_str_));
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
ObCharset::get_default_charset()));
}
break;
case PALF_STATE:
cur_row_.cells_[i].set_varchar(ObString::make_string(replica_state_to_string(diagnose_info.palf_diagnose_info_.palf_state_)));
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
ObCharset::get_default_charset()));
break;
case PALF_PROPOSAL_ID:
cur_row_.cells_[i].set_int(diagnose_info.palf_diagnose_info_.palf_proposal_id_);
break;
case LOG_HANDLER_ROLE:
if (OB_FAIL(role_to_string(diagnose_info.log_handler_diagnose_info_.log_handler_role_,
log_handler_role_str_, sizeof(log_handler_role_str_)))) {
SERVER_LOG(WARN, "role_to_string failed", K(ret), K(diagnose_info));
} else {
cur_row_.cells_[i].set_varchar(ObString::make_string(log_handler_role_str_));
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
ObCharset::get_default_charset()));
}
break;
case LOG_HANDLER_PROPOSAL_ID:
cur_row_.cells_[i].set_int(diagnose_info.log_handler_diagnose_info_.log_handler_proposal_id_);
break;
case LOG_HANDLER_TAKEOVER_STATE:
if (OB_FAIL(takeover_state_to_string(diagnose_info.rc_diagnose_info_.state_,
log_handler_takeover_state_str_,
sizeof(log_handler_takeover_state_str_)))) {
SERVER_LOG(WARN, "takeover_state_to_string failed", K(ret), K(diagnose_info));
} else {
cur_row_.cells_[i].set_varchar(ObString::make_string(log_handler_takeover_state_str_));
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
ObCharset::get_default_charset()));
}
break;
case LOG_HANDLER_TAKEOVER_LOG_TYPE:
if (OB_FAIL(log_base_type_to_string(diagnose_info.rc_diagnose_info_.log_type_,
log_handler_takeover_log_type_str_,
sizeof(log_handler_takeover_log_type_str_)))) {
SERVER_LOG(WARN, "log_base_type_to_string failed", K(ret), K(diagnose_info));
} else {
cur_row_.cells_[i].set_varchar(ObString::make_string(log_handler_takeover_log_type_str_));
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
ObCharset::get_default_charset()));
}
break;
case MAX_APPLIED_SCN: {
cur_row_.cells_[i].set_uint64(static_cast<uint64_t>(diagnose_info.apply_diagnose_info_.max_applied_scn_));
break;
}
case MAX_REPALYED_LSN: {
cur_row_.cells_[i].set_uint64(diagnose_info.replay_diagnose_info_.max_replayed_lsn_.val_);
break;
}
case MAX_REPLAYED_SCN: {
cur_row_.cells_[i].set_uint64(diagnose_info.replay_diagnose_info_.max_replayed_scn_);
break;
}
case REPLAY_DIAGNOSE_INFO: {
cur_row_.cells_[i].set_varchar((diagnose_info.replay_diagnose_info_.diagnose_str_.string()));
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
ObCharset::get_default_charset()));
break;
}
case GC_STATE:
if (OB_FAIL(gc_state_to_string(diagnose_info.gc_diagnose_info_.gc_state_,
gc_state_str_,
sizeof(gc_state_str_)))) {
SERVER_LOG(WARN, "gc_state_to_string failed", K(ret), K(diagnose_info));
} else {
cur_row_.cells_[i].set_varchar(ObString::make_string(gc_state_str_));
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
ObCharset::get_default_charset()));
}
break;
case GC_START_TS: {
cur_row_.cells_[i].set_int(diagnose_info.gc_diagnose_info_.gc_start_ts_);
break;
}
//TODO: @keqing.llt archive_scn列目前只占位
case ARCHIVE_SCN: {
cur_row_.cells_[i].set_uint64(0);
break;
}
case CHECKPOINT_SCN: {
cur_row_.cells_[i].set_uint64(static_cast<uint64_t>(diagnose_info.checkpoint_diagnose_info_.checkpoint_));
break;
}
case MIN_REC_SCN: {
cur_row_.cells_[i].set_uint64(static_cast<uint64_t>(diagnose_info.checkpoint_diagnose_info_.min_rec_scn_));
break;
}
case MIN_REC_SCN_LOG_TYPE: {
if (OB_FAIL(log_base_type_to_string(diagnose_info.checkpoint_diagnose_info_.log_type_,
min_rec_log_scn_log_type_str_,
sizeof(min_rec_log_scn_log_type_str_)))) {
SERVER_LOG(WARN, "log_base_type_to_string failed", K(ret), K(diagnose_info));
} else {
cur_row_.cells_[i].set_varchar(ObString::make_string(min_rec_log_scn_log_type_str_));
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
ObCharset::get_default_charset()));
}
break;
}
default:
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "unkown column");
break;
}
}
return ret;
}
} // namespace observer
} // namespace oceanbase

View File

@ -0,0 +1,75 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_OBSERVER_OB_ALL_VIRTUAL_HA_DIAGNOSE_H_
#define OCEANBASE_OBSERVER_OB_ALL_VIRTUAL_HA_DIAGNOSE_H_
#include "common/row/ob_row.h"
#include "observer/omt/ob_multi_tenant.h"
#include "share/ob_virtual_table_scanner_iterator.h"
#include "share/ob_scanner.h"
#include "storage/ls/ob_ls.h"
namespace oceanbase
{
namespace observer
{
enum IOStatColumn
{
TENANT_ID = common::OB_APP_MIN_COLUMN_ID,
LS_ID,
SVR_IP,
SVR_PORT,
ELECTION_ROLE,
ELECTION_EPOCH,
PALF_ROLE,
PALF_STATE,
PALF_PROPOSAL_ID,
LOG_HANDLER_ROLE,
LOG_HANDLER_PROPOSAL_ID,
LOG_HANDLER_TAKEOVER_STATE,
LOG_HANDLER_TAKEOVER_LOG_TYPE,
MAX_APPLIED_SCN,
MAX_REPALYED_LSN,
MAX_REPLAYED_SCN,
REPLAY_DIAGNOSE_INFO,
GC_STATE,
GC_START_TS,
ARCHIVE_SCN,
CHECKPOINT_SCN,
MIN_REC_SCN,
MIN_REC_SCN_LOG_TYPE,
};
class ObAllVirtualHADiagnose : public common::ObVirtualTableScannerIterator
{
public:
explicit ObAllVirtualHADiagnose(omt::ObMultiTenant *omt) : omt_(omt) {}
public:
virtual int inner_get_next_row(common::ObNewRow *&row);
private:
int insert_stat_(storage::DiagnoseInfo &diagnose_info);
private:
static const int64_t VARCHAR_32 = 32;
char ip_[common::OB_IP_PORT_STR_BUFF] = {'\0'};
char election_role_str_[VARCHAR_32] = {'\0'};
char palf_role_str_[VARCHAR_32] = {'\0'};
char log_handler_role_str_[VARCHAR_32] = {'\0'};
char log_handler_takeover_state_str_[VARCHAR_32] = {'\0'};
char log_handler_takeover_log_type_str_[VARCHAR_32] = {'\0'};
char gc_state_str_[VARCHAR_32] = {'\0'};
char min_rec_log_scn_log_type_str_[VARCHAR_32] = {'\0'};
omt::ObMultiTenant *omt_;
};
} // namespace observer
} // namespace oceanbase
#endif /* OCEANBASE_OBSERVER_OB_ALL_VIRTUAL_HA_DIAGNOSE_H_ */

View File

@ -263,10 +263,10 @@ int ObAllVirtualPalfStat::member_list_to_string_(
tmp_member_list))) {
SERVER_LOG(WARN, "fail to transform member_list", KR(ret), K(member_list));
} else if (OB_FAIL(share::ObLSReplica::member_list2text(
tmp_member_list,
member_list_buf_,
tmp_member_list,
member_list_buf_,
MAX_MEMBER_LIST_LENGTH))) {
SERVER_LOG(WARN, "member_list2text failed", KR(ret),
SERVER_LOG(WARN, "member_list2text failed", KR(ret),
K(member_list), K(tmp_member_list), K_(member_list_buf));
}
return ret;

View File

@ -174,6 +174,7 @@
#include "observer/virtual_table/ob_all_virtual_dtl_interm_result_monitor.h"
#include "observer/virtual_table/ob_all_virtual_log_stat.h"
#include "observer/virtual_table/ob_all_virtual_apply_stat.h"
#include "observer/virtual_table/ob_all_virtual_ha_diagnose.h"
#include "observer/virtual_table/ob_all_virtual_replay_stat.h"
#include "observer/virtual_table/ob_all_virtual_unit.h"
#include "observer/virtual_table/ob_all_virtual_server.h"
@ -1600,6 +1601,19 @@ int ObVTIterCreator::create_vt_iter(ObVTableScanParam &params,
}
break;
}
case OB_ALL_VIRTUAL_HA_DIAGNOSE_TID: {
ObAllVirtualHADiagnose *diagnose_info = NULL;
omt::ObMultiTenant *omt = GCTX.omt_;
if (OB_UNLIKELY(NULL == omt)) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "get tenant fail", K(ret));
} else if (OB_FAIL(NEW_VIRTUAL_TABLE(ObAllVirtualHADiagnose, diagnose_info, omt))) {
SERVER_LOG(ERROR, "ObAllVirtualHADiagnose construct fail", K(ret));
} else {
vt_iter = static_cast<ObAllVirtualHADiagnose *>(diagnose_info);
}
break;
}
END_CREATE_VT_ITER_SWITCH_LAMBDA
BEGIN_CREATE_VT_ITER_SWITCH_LAMBDA

View File

@ -7256,6 +7256,405 @@ int ObInnerTableSchema::all_virtual_minor_freeze_info_schema(ObTableSchema &tabl
return ret;
}
int ObInnerTableSchema::all_virtual_ha_diagnose_schema(ObTableSchema &table_schema)
{
int ret = OB_SUCCESS;
uint64_t column_id = OB_APP_MIN_COLUMN_ID - 1;
//generated fields:
table_schema.set_tenant_id(OB_SYS_TENANT_ID);
table_schema.set_tablegroup_id(OB_INVALID_ID);
table_schema.set_database_id(OB_SYS_DATABASE_ID);
table_schema.set_table_id(OB_ALL_VIRTUAL_HA_DIAGNOSE_TID);
table_schema.set_rowkey_split_pos(0);
table_schema.set_is_use_bloomfilter(false);
table_schema.set_progressive_merge_num(0);
table_schema.set_rowkey_column_num(0);
table_schema.set_load_type(TABLE_LOAD_TYPE_IN_DISK);
table_schema.set_table_type(VIRTUAL_TABLE);
table_schema.set_index_type(INDEX_TYPE_IS_NOT);
table_schema.set_def_type(TABLE_DEF_TYPE_INTERNAL);
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_table_name(OB_ALL_VIRTUAL_HA_DIAGNOSE_TNAME))) {
LOG_ERROR("fail to set table_name", K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_compress_func_name(OB_DEFAULT_COMPRESS_FUNC_NAME))) {
LOG_ERROR("fail to set compress_func_name", K(ret));
}
}
table_schema.set_part_level(PARTITION_LEVEL_ZERO);
table_schema.set_charset_type(ObCharset::get_default_charset());
table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("tenant_id", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObIntType, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(int64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("ls_id", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObIntType, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(int64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("svr_ip", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
1, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
MAX_IP_ADDR_LENGTH, //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("svr_port", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
2, //part_key_pos
ObIntType, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(int64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("election_role", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
32, //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("election_epoch", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObIntType, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(int64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("palf_role", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
32, //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("palf_state", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
32, //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("palf_proposal_id", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObIntType, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(int64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("log_handler_role", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
32, //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("log_handler_proposal_id", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObIntType, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(int64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("log_handler_takeover_state", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
32, //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("log_handler_takeover_log_type", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
32, //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("max_applied_scn", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObUInt64Type, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(uint64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("max_replayed_lsn", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObUInt64Type, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(uint64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("max_replayed_scn", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObUInt64Type, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(uint64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("replay_diagnose_info", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
1024, //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("gc_state", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
32, //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("gc_start_ts", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObIntType, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(int64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("archive_scn", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObUInt64Type, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(uint64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("checkpoint_scn", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObUInt64Type, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(uint64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("min_rec_scn", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObUInt64Type, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(uint64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("min_rec_scn_log_type", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
32, //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
table_schema.get_part_option().set_part_num(1);
table_schema.set_part_level(PARTITION_LEVEL_ONE);
table_schema.get_part_option().set_part_func_type(PARTITION_FUNC_TYPE_LIST_COLUMNS);
if (OB_FAIL(table_schema.get_part_option().set_part_expr("svr_ip, svr_port"))) {
LOG_WARN("set_part_expr failed", K(ret));
} else if (OB_FAIL(table_schema.mock_list_partition_array())) {
LOG_WARN("mock list partition array failed", K(ret));
}
}
table_schema.set_index_using_type(USING_HASH);
table_schema.set_row_store_type(ENCODING_ROW_STORE);
table_schema.set_store_format(OB_STORE_FORMAT_DYNAMIC_MYSQL);
table_schema.set_progressive_merge_round(1);
table_schema.set_storage_format_version(3);
table_schema.set_tablet_id(0);
table_schema.set_max_used_column_id(column_id);
return ret;
}
} // end namespace share
} // end namespace oceanbase

View File

@ -833,6 +833,7 @@ public:
static int all_virtual_schema_memory_schema(share::schema::ObTableSchema &table_schema);
static int all_virtual_schema_slot_schema(share::schema::ObTableSchema &table_schema);
static int all_virtual_minor_freeze_info_schema(share::schema::ObTableSchema &table_schema);
static int all_virtual_ha_diagnose_schema(share::schema::ObTableSchema &table_schema);
static int all_virtual_sql_audit_ora_schema(share::schema::ObTableSchema &table_schema);
static int all_virtual_plan_stat_ora_schema(share::schema::ObTableSchema &table_schema);
static int all_virtual_plan_cache_plan_explain_ora_schema(share::schema::ObTableSchema &table_schema);
@ -2734,6 +2735,7 @@ const schema_create_func virtual_table_schema_creators [] = {
ObInnerTableSchema::all_virtual_schema_memory_schema,
ObInnerTableSchema::all_virtual_schema_slot_schema,
ObInnerTableSchema::all_virtual_minor_freeze_info_schema,
ObInnerTableSchema::all_virtual_ha_diagnose_schema,
ObInnerTableSchema::all_virtual_sql_audit_ora_schema,
ObInnerTableSchema::all_virtual_plan_stat_ora_schema,
ObInnerTableSchema::all_virtual_plan_cache_plan_explain_ora_schema,
@ -7052,7 +7054,8 @@ const uint64_t cluster_distributed_vtables [] = {
OB_ALL_VIRTUAL_KVCACHE_HANDLE_LEAK_INFO_TID,
OB_ALL_VIRTUAL_SCHEMA_MEMORY_TID,
OB_ALL_VIRTUAL_SCHEMA_SLOT_TID,
OB_ALL_VIRTUAL_MINOR_FREEZE_INFO_TID, };
OB_ALL_VIRTUAL_MINOR_FREEZE_INFO_TID,
OB_ALL_VIRTUAL_HA_DIAGNOSE_TID, };
const uint64_t tenant_distributed_vtables [] = {
OB_ALL_VIRTUAL_PROCESSLIST_TID,
@ -9152,11 +9155,11 @@ static inline int get_sys_table_lob_aux_schema(const uint64_t tid,
const int64_t OB_CORE_TABLE_COUNT = 4;
const int64_t OB_SYS_TABLE_COUNT = 212;
const int64_t OB_VIRTUAL_TABLE_COUNT = 550;
const int64_t OB_VIRTUAL_TABLE_COUNT = 551;
const int64_t OB_SYS_VIEW_COUNT = 601;
const int64_t OB_SYS_TENANT_TABLE_COUNT = 1368;
const int64_t OB_SYS_TENANT_TABLE_COUNT = 1369;
const int64_t OB_CORE_SCHEMA_VERSION = 1;
const int64_t OB_BOOTSTRAP_SCHEMA_VERSION = 1371;
const int64_t OB_BOOTSTRAP_SCHEMA_VERSION = 1372;
} // end namespace share
} // end namespace oceanbase

View File

@ -581,6 +581,7 @@ const uint64_t OB_ALL_VIRTUAL_LS_REPLICA_TASK_PLAN_TID = 12335; // "__all_virtua
const uint64_t OB_ALL_VIRTUAL_SCHEMA_MEMORY_TID = 12336; // "__all_virtual_schema_memory"
const uint64_t OB_ALL_VIRTUAL_SCHEMA_SLOT_TID = 12337; // "__all_virtual_schema_slot"
const uint64_t OB_ALL_VIRTUAL_MINOR_FREEZE_INFO_TID = 12338; // "__all_virtual_minor_freeze_info"
const uint64_t OB_ALL_VIRTUAL_HA_DIAGNOSE_TID = 12340; // "__all_virtual_ha_diagnose"
const uint64_t OB_ALL_VIRTUAL_SQL_AUDIT_ORA_TID = 15009; // "ALL_VIRTUAL_SQL_AUDIT_ORA"
const uint64_t OB_ALL_VIRTUAL_PLAN_STAT_ORA_TID = 15010; // "ALL_VIRTUAL_PLAN_STAT_ORA"
const uint64_t OB_ALL_VIRTUAL_PLAN_CACHE_PLAN_EXPLAIN_ORA_TID = 15012; // "ALL_VIRTUAL_PLAN_CACHE_PLAN_EXPLAIN_ORA"
@ -2466,6 +2467,7 @@ const char *const OB_ALL_VIRTUAL_LS_REPLICA_TASK_PLAN_TNAME = "__all_virtual_ls_
const char *const OB_ALL_VIRTUAL_SCHEMA_MEMORY_TNAME = "__all_virtual_schema_memory";
const char *const OB_ALL_VIRTUAL_SCHEMA_SLOT_TNAME = "__all_virtual_schema_slot";
const char *const OB_ALL_VIRTUAL_MINOR_FREEZE_INFO_TNAME = "__all_virtual_minor_freeze_info";
const char *const OB_ALL_VIRTUAL_HA_DIAGNOSE_TNAME = "__all_virtual_ha_diagnose";
const char *const OB_ALL_VIRTUAL_SQL_AUDIT_ORA_TNAME = "ALL_VIRTUAL_SQL_AUDIT";
const char *const OB_ALL_VIRTUAL_PLAN_STAT_ORA_TNAME = "ALL_VIRTUAL_PLAN_STAT";
const char *const OB_ALL_VIRTUAL_PLAN_CACHE_PLAN_EXPLAIN_ORA_TNAME = "ALL_VIRTUAL_PLAN_CACHE_PLAN_EXPLAIN";

View File

@ -10769,7 +10769,45 @@ def_table_schema(
)
# 12339: __all_virtual_show_trace
# 12340: __all_virtual_ha_diagnose
def_table_schema(
owner = 'keqing.llt',
table_name = '__all_virtual_ha_diagnose',
table_id = '12340',
table_type = 'VIRTUAL_TABLE',
gm_columns = [],
in_tenant_space = False,
rowkey_columns = [
],
normal_columns = [
('tenant_id', 'int'),
('ls_id', 'int'),
('svr_ip', 'varchar:MAX_IP_ADDR_LENGTH'),
('svr_port', 'int'),
('election_role', 'varchar:32'),
('election_epoch', 'int'),
('palf_role', 'varchar:32'),
('palf_state', 'varchar:32'),
('palf_proposal_id', 'int'),
('log_handler_role', 'varchar:32'),
('log_handler_proposal_id', 'int'),
('log_handler_takeover_state', 'varchar:32'),
('log_handler_takeover_log_type', 'varchar:32'),
('max_applied_scn', 'uint'),
('max_replayed_lsn', 'uint'),
('max_replayed_scn', 'uint'),
('replay_diagnose_info', 'varchar:1024'),
('gc_state', 'varchar:32'),
('gc_start_ts', 'int'),
('archive_scn', 'uint'),
('checkpoint_scn', 'uint'),
('min_rec_scn', 'uint'),
('min_rec_scn_log_type', 'varchar:32')
],
partition_columns = ['svr_ip', 'svr_port'],
vtable_route_policy = 'distributed',
)
#
# 余留位置

View File

@ -106,6 +106,20 @@ void ObCheckpointExecutor::offline()
update_checkpoint_enabled_ = false;
}
void ObCheckpointExecutor::get_min_rec_log_ts(int &log_type,
int64_t &min_rec_log_ts) const
{
for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) {
if (OB_NOT_NULL(handlers_[i])) {
int64_t rec_log_ts = handlers_[i]->get_rec_log_ts();
if (rec_log_ts > 0 && rec_log_ts < min_rec_log_ts) {
min_rec_log_ts = rec_log_ts;
log_type = i;
}
}
}
}
inline void get_min_rec_log_ts_service_type_by_index_(int index, char* service_type)
{
int ret = OB_SUCCESS;
@ -133,15 +147,7 @@ int ObCheckpointExecutor::update_clog_checkpoint()
// used to record which handler provide the smallest rec_log_ts
int min_rec_log_ts_service_type_index = 0;
char service_type[common::MAX_SERVICE_TYPE_BUF_LENGTH];
for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) {
if (OB_NOT_NULL(handlers_[i])) {
int64_t rec_log_ts = handlers_[i]->get_rec_log_ts();
if (rec_log_ts > 0 && rec_log_ts < checkpoint_ts) {
checkpoint_ts = rec_log_ts;
min_rec_log_ts_service_type_index = i;
}
}
}
get_min_rec_log_ts(min_rec_log_ts_service_type_index, checkpoint_ts);
get_min_rec_log_ts_service_type_by_index_(min_rec_log_ts_service_type_index, service_type);
const int64_t checkpoint_ts_in_ls_meta = ls_->get_clog_checkpoint_ts();
@ -275,7 +281,7 @@ bool ObCheckpointExecutor::need_flush()
K(end_log_ts), K(ls_->get_clog_checkpoint_ts()));
need_flush = true;
}
return need_flush;
}
@ -286,7 +292,7 @@ bool ObCheckpointExecutor::is_wait_advance_checkpoint()
ATOMIC_STORE(&wait_advance_checkpoint_, false);
}
}
return ATOMIC_LOAD(&wait_advance_checkpoint_);
}
@ -313,6 +319,18 @@ int64_t ObCheckpointExecutor::get_cannot_recycle_log_size()
return cannot_recycle_log_size;
}
int ObCheckpointExecutor::diagnose(CheckpointDiagnoseInfo &diagnose_info) const
{
int ret = OB_SUCCESS;
ObSpinLockGuard guard(lock_);
int log_type_index = 0;
diagnose_info.checkpoint_ = ls_->get_clog_checkpoint_ts();
diagnose_info.min_rec_scn_ = INT64_MAX;
get_min_rec_log_ts(log_type_index, diagnose_info.min_rec_scn_);
ObLogBaseType log_type = static_cast<ObLogBaseType>(log_type_index);
diagnose_info.log_type_ = log_type;
return ret;
}
} // namespace checkpoint
} // namespace storage
} // namespace oceanbase

View File

@ -36,6 +36,17 @@ struct ObCheckpointVTInfo
);
};
struct CheckpointDiagnoseInfo
{
int64_t checkpoint_;
int64_t min_rec_scn_;
logservice::ObLogBaseType log_type_;
TO_STRING_KV(K(checkpoint_),
K(min_rec_scn_),
K(log_type_));
};
class ObCheckpointExecutor
{
public:
@ -71,6 +82,10 @@ public:
int64_t get_cannot_recycle_log_size();
void get_min_rec_log_ts(int &log_type, int64_t &min_rec_log_ts) const;
int diagnose(CheckpointDiagnoseInfo &diagnose_info) const;
private:
static const int64_t CLOG_GC_PERCENT = 60;
static const int64_t MAX_NEED_REPLAY_CLOG_INTERVAL = (int64_t)60 * 60 * 1000 * 1000 * 1000; //ns
@ -81,7 +96,7 @@ private:
// be used to avoid checkpoint concurrently,
// no need to protect handlers_[] because ls won't be destroyed(hold lshandle)
// when the public interfaces are invoked
common::ObSpinLock lock_;
mutable common::ObSpinLock lock_;
// avoid frequent freeze when clog_used_over_threshold
bool wait_advance_checkpoint_;

View File

@ -1336,5 +1336,38 @@ int ObLS::update_id_meta_without_writing_slog(const int64_t service_type,
return ret;
}
int ObLS::diagnose(DiagnoseInfo &info) const
{
int ret = OB_SUCCESS;
logservice::ObLogService *log_service = MTL(logservice::ObLogService *);
share::ObLSID ls_id = get_ls_id();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ls is not inited", K(ret));
} else if (FALSE_IT(info.ls_id_ = ls_id.id()) ||
FALSE_IT(info.rc_diagnose_info_.id_ = ls_id.id())) {
} else if (OB_FAIL(gc_handler_.diagnose(info.gc_diagnose_info_))) {
STORAGE_LOG(WARN, "diagnose gc failed", K(ret), K(ls_id));
} else if (OB_FAIL(checkpoint_executor_.diagnose(info.checkpoint_diagnose_info_))) {
STORAGE_LOG(WARN, "diagnose checkpoint failed", K(ret), K(ls_id));
} else if (OB_FAIL(log_service->diagnose_apply(ls_id, info.apply_diagnose_info_))) {
STORAGE_LOG(WARN, "diagnose apply failed", K(ret), K(ls_id));
} else if (OB_FAIL(log_service->diagnose_replay(ls_id, info.replay_diagnose_info_))) {
STORAGE_LOG(WARN, "diagnose replay failed", K(ret), K(ls_id));
} else if (OB_FAIL(log_handler_.diagnose(info.log_handler_diagnose_info_))) {
STORAGE_LOG(WARN, "diagnose log handler failed", K(ret), K(ls_id));
} else if (OB_FAIL(log_handler_.diagnose_palf(info.palf_diagnose_info_))) {
STORAGE_LOG(WARN, "diagnose palf failed", K(ret), K(ls_id));
} else if (info.is_role_sync()) {
// 角色同步时不需要诊断role change service
info.rc_diagnose_info_.state_ = logservice::TakeOverState::TAKE_OVER_FINISH;
info.rc_diagnose_info_.log_type_ = logservice::ObLogBaseType::INVALID_LOG_BASE_TYPE;
} else if (OB_FAIL(log_service->diagnose_role_change(info.rc_diagnose_info_))) {
// election, palf, log handler角色不统一时可能出现无主
STORAGE_LOG(WARN, "diagnose rc service failed", K(ret), K(ls_id));
}
STORAGE_LOG(INFO, "diagnose finish", K(ret), K(info), K(ls_id));
return ret;
}
}
}

View File

@ -37,7 +37,9 @@
#include "storage/tx_table/ob_tx_table.h"
#include "storage/tx/ob_keep_alive_ls_handler.h"
#include "storage/restore/ob_ls_restore_handler.h"
#include "logservice/applyservice/ob_log_apply_service.h"
#include "logservice/replayservice/ob_replay_handler.h"
#include "logservice/replayservice/ob_replay_status.h"
#include "logservice/rcservice/ob_role_change_handler.h"
#include "logservice/restoreservice/ob_log_restore_handler.h" // ObLogRestoreHandler
#include "logservice/ob_log_handler.h"
@ -122,6 +124,31 @@ struct ObLSVTInfo
int64_t checkpoint_lsn_;
};
// 诊断虚表统计信息
struct DiagnoseInfo
{
bool is_role_sync() {
return ((palf_diagnose_info_.election_role_ == palf_diagnose_info_.palf_role_)
&& (palf_diagnose_info_.palf_role_ == log_handler_diagnose_info_.log_handler_role_));
}
int64_t ls_id_;
logservice::LogHandlerDiagnoseInfo log_handler_diagnose_info_;
palf::PalfDiagnoseInfo palf_diagnose_info_;
logservice::RCDiagnoseInfo rc_diagnose_info_;
logservice::ApplyDiagnoseInfo apply_diagnose_info_;
logservice::ReplayDiagnoseInfo replay_diagnose_info_;
logservice::GCDiagnoseInfo gc_diagnose_info_;
checkpoint::CheckpointDiagnoseInfo checkpoint_diagnose_info_;
TO_STRING_KV(K(ls_id_),
K(log_handler_diagnose_info_),
K(palf_diagnose_info_),
K(rc_diagnose_info_),
K(apply_diagnose_info_),
K(replay_diagnose_info_),
K(gc_diagnose_info_),
K(checkpoint_diagnose_info_));
};
class ObIComponentFactory;
enum class ObInnerLSStatus;
@ -640,6 +667,7 @@ public:
const ObTabletID &tablet_id,
const ObBatchUpdateTableStoreParam &param);
int try_update_uppder_trans_version();
int diagnose(DiagnoseInfo &info) const;
private:
// StorageBaseUtil

View File

@ -1256,6 +1256,34 @@ bool ObLSService::need_create_inner_tablets_(const obrpc::ObCreateLSArg &arg) co
return arg.need_create_inner_tablets();
}
int ObLSService::iterate_diagnose(const ObFunction<int(const storage::ObLS &ls)> &func)
{
int ret = OB_SUCCESS;
common::ObSharedGuard<ObLSIterator> ls_iter;
ObLS *ls = nullptr;
if (OB_FAIL(get_ls_iter(ls_iter, ObLSGetMod::OBSERVER_MOD))) {
LOG_WARN("failed to get ls iter", K(ret));
} else {
while (OB_SUCC(ret)) {
if (OB_FAIL(ls_iter->get_next(ls))) {
if (OB_ITER_END != ret) {
LOG_ERROR("fail to get next ls", K(ret));
}
} else if (nullptr == ls) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls is null", K(ret));
} else if (OB_FAIL(func(*ls))) {
LOG_WARN("iter ls diagnose failed", K(ret));
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
return ret;
}
} // storage
} // oceanbase

View File

@ -92,6 +92,8 @@ public:
int get_ls(const share::ObLSID &ls_id,
ObLSHandle &handle,
ObLSGetMod mod);
// @param [in] func, iterate all ls diagnose info
int iterate_diagnose(const ObFunction<int(const storage::ObLS &ls)> &func);
// remove the ls that is creating and write abort slog.
int gc_ls_after_replay_slog();

View File

@ -553,6 +553,7 @@ select 0xffffffffff & table_id, table_name, table_type, database_id, part_num fr
12336 __all_virtual_schema_memory 2 201001 1
12337 __all_virtual_schema_slot 2 201001 1
12338 __all_virtual_minor_freeze_info 2 201001 1
12340 __all_virtual_ha_diagnose 2 201001 1
20001 GV$OB_PLAN_CACHE_STAT 1 201001 1
20002 GV$OB_PLAN_CACHE_PLAN_STAT 1 201001 1
20003 SCHEMATA 1 201002 1

View File

@ -44,8 +44,9 @@ TEST(TestRoleChangeHander, test_basic_func)
ObRoleChangeHandler handler;
ObLogBaseType type = ObLogBaseType::TRANS_SERVICE_LOG_BASE_TYPE;
MockRoleChangeHandler mock_handler;
RCDiagnoseInfo unused_diagnose_info;
handler.register_handler(type, &mock_handler);
handler.switch_to_leader();
handler.switch_to_leader(unused_diagnose_info);
handler.switch_to_follower_forcedly();
handler.switch_to_follower_gracefully();
}