Add detailed logs about log_task
This commit is contained in:
parent
51e69ae88c
commit
7148f2bce1
19
deps/oblib/src/lib/utility/utility.h
vendored
19
deps/oblib/src/lib/utility/utility.h
vendored
@ -1123,20 +1123,26 @@ public:
|
||||
{
|
||||
public:
|
||||
ObStatItem(const char *item, const int64_t stat_interval)
|
||||
: item_(item), stat_interval_(stat_interval), last_ts_(0), stat_count_(0), lock_tag_(false) {}
|
||||
: item_(item), stat_interval_(stat_interval), last_ts_(0), stat_count_(0), accum_count_(0), lock_tag_(false) {
|
||||
MEMSET(extra_info_, '\0', MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH);
|
||||
}
|
||||
~ObStatItem() {}
|
||||
public:
|
||||
void stat(const int64_t time_cost = 0)
|
||||
void set_extra_info(const char *extra_info)
|
||||
{
|
||||
MEMCPY(extra_info_, extra_info, MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH);
|
||||
}
|
||||
void stat(const int64_t count = 0)
|
||||
{
|
||||
const int64_t cur_ts = ::oceanbase::common::ObTimeUtility::fast_current_time();
|
||||
const int64_t cur_stat_count = ATOMIC_AAF(&stat_count_, 1);
|
||||
const int64_t cur_accum_time = ATOMIC_AAF(&accum_time_, time_cost);
|
||||
const int64_t cur_accum_count = ATOMIC_AAF(&accum_count_, count);
|
||||
if (ATOMIC_LOAD(&last_ts_) + stat_interval_ < cur_ts) {
|
||||
if (ATOMIC_BCAS(&lock_tag_, false, true)) {
|
||||
LIB_LOG(INFO, NULL == item_ ? "" : item_, K(cur_stat_count), K_(stat_interval), "avg cost", cur_accum_time / cur_stat_count, K(this));
|
||||
LIB_LOG(INFO, NULL == item_ ? "" : item_, K(cur_stat_count), K_(stat_interval), "avg count/cost", cur_accum_count / cur_stat_count, K(this), K_(extra_info));
|
||||
(void)ATOMIC_SET(&last_ts_, cur_ts);
|
||||
(void)ATOMIC_SET(&stat_count_, 0);
|
||||
(void)ATOMIC_SET(&accum_time_, 0);
|
||||
(void)ATOMIC_SET(&accum_count_, 0);
|
||||
ATOMIC_BCAS(&lock_tag_, true, false);
|
||||
}
|
||||
}
|
||||
@ -1144,9 +1150,10 @@ public:
|
||||
private:
|
||||
const char *const item_;
|
||||
const int64_t stat_interval_;
|
||||
char extra_info_[MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH];
|
||||
int64_t last_ts_;
|
||||
int64_t stat_count_;
|
||||
int64_t accum_time_;
|
||||
int64_t accum_count_;
|
||||
bool lock_tag_;
|
||||
};
|
||||
public:
|
||||
|
@ -92,6 +92,8 @@ int ObLogHandler::init(const int64_t id,
|
||||
replay_service_ = replay_service;
|
||||
rc_service_ = rc_service;
|
||||
apply_status_->inc_ref();
|
||||
PALF_REPORT_INFO_KV(K(id));
|
||||
append_cost_stat_.set_extra_info(EXTRA_INFOS);
|
||||
id_ = id;
|
||||
self_ = self;
|
||||
palf_handle_ = palf_handle;
|
||||
|
@ -39,7 +39,7 @@ namespace palf
|
||||
|
||||
#define PALF_EVENT(info_string, palf_id, args...) FLOG_INFO("[PALF_EVENT] "info_string, "palf_id", palf_id, args)
|
||||
|
||||
#define PALF_EVENT_REPORT_INFO_KV(args...) \
|
||||
#define PALF_REPORT_INFO_KV(args...) \
|
||||
const int64_t MAX_INFO_LENGTH = 512; \
|
||||
char EXTRA_INFOS[MAX_INFO_LENGTH]; \
|
||||
int64_t pos = 0; \
|
||||
|
@ -34,6 +34,7 @@ LogIOWorker::LogIOWorker()
|
||||
do_task_count_(0),
|
||||
print_log_interval_(OB_INVALID_TIMESTAMP),
|
||||
last_working_time_(OB_INVALID_TIMESTAMP),
|
||||
log_io_worker_queue_size_stat_("[PALF STAT LOG IO WORKER QUEUE SIZE]", PALF_STAT_PRINT_INTERVAL_US),
|
||||
is_inited_(false)
|
||||
{
|
||||
}
|
||||
@ -69,6 +70,8 @@ int LogIOWorker::init(const LogIOWorkerConfig &config,
|
||||
log_io_worker_num_ = config.io_worker_num_;
|
||||
cb_thread_pool_tg_id_ = cb_thread_pool_tg_id;
|
||||
palf_env_impl_ = palf_env_impl;
|
||||
PALF_REPORT_INFO_KV(K_(log_io_worker_num), K_(cb_thread_pool_tg_id));
|
||||
log_io_worker_queue_size_stat_.set_extra_info(EXTRA_INFOS);
|
||||
is_inited_ = true;
|
||||
PALF_LOG(INFO, "LogIOWorker init success", K(ret), K(config), K(cb_thread_pool_tg_id),
|
||||
KPC(palf_env_impl));
|
||||
@ -148,13 +151,15 @@ int LogIOWorker::run_loop_()
|
||||
|
||||
while (false == has_set_stop()
|
||||
&& false == (OB_NOT_NULL(&lib::Thread::current()) ? lib::Thread::current().has_set_stop() : false)) {
|
||||
|
||||
void *task = NULL;
|
||||
if (OB_SUCC(queue_.pop(task, QUEUE_WAIT_TIME))) {
|
||||
ATOMIC_STORE(&last_working_time_, common::ObTimeUtility::fast_current_time());
|
||||
ret = reduce_io_task_(task);
|
||||
ATOMIC_STORE(&last_working_time_, OB_INVALID_TIMESTAMP);
|
||||
}
|
||||
if (queue_.size() > 0) {
|
||||
log_io_worker_queue_size_stat_.stat(queue_.size());
|
||||
}
|
||||
}
|
||||
|
||||
// After IOWorker has stopped, need clear queue_.
|
||||
|
@ -124,6 +124,7 @@ private:
|
||||
int64_t do_task_count_;
|
||||
int64_t print_log_interval_;
|
||||
int64_t last_working_time_;
|
||||
ObMiniStat::ObStatItem log_io_worker_queue_size_stat_;
|
||||
bool is_inited_;
|
||||
};
|
||||
} // end namespace palf
|
||||
|
@ -121,8 +121,12 @@ LogSlidingWindow::LogSlidingWindow()
|
||||
last_record_end_lsn_(PALF_INITIAL_LSN_VAL),
|
||||
fs_cb_cost_stat_("[PALF STAT FS CB]", PALF_STAT_PRINT_INTERVAL_US),
|
||||
log_life_time_stat_("[PALF STAT LOG LIFETIME]", PALF_STAT_PRINT_INTERVAL_US),
|
||||
log_submit_wait_stat_("[PALF STAT LOG SUBMIT WAIT]", PALF_STAT_PRINT_INTERVAL_US),
|
||||
log_submit_to_slide_cost_stat_("[PALF STAT LOG SLIDE WAIT]", PALF_STAT_PRINT_INTERVAL_US),
|
||||
log_gen_to_freeze_cost_stat_("PALF STAT LOG GEN_TO_FREEZE COST TIME", PALF_STAT_PRINT_INTERVAL_US),
|
||||
log_gen_to_submit_cost_stat_("PALF STAT LOG GEN_TO_SUBMIT COST TIME", PALF_STAT_PRINT_INTERVAL_US),
|
||||
log_submit_to_first_ack_cost_stat_("PALF STAT LOG SUBMIT_TO_FIRST_ACK COST TIME", PALF_STAT_PRINT_INTERVAL_US),
|
||||
log_submit_to_flush_cost_stat_("PALF STAT LOG SUBMIT_TO_FLUSH COST TIME", PALF_STAT_PRINT_INTERVAL_US),
|
||||
log_submit_to_commit_cost_stat_("PALF STAT LOG SUBMIT_TO_COMMIT COST TIME", PALF_STAT_PRINT_INTERVAL_US),
|
||||
log_submit_to_slide_cost_stat_("[PALF STAT LOG SUBMIT_TO_SLIDE COST TIME]", PALF_STAT_PRINT_INTERVAL_US),
|
||||
group_log_stat_time_us_(OB_INVALID_TIMESTAMP),
|
||||
accum_log_cnt_(0),
|
||||
accum_group_log_size_(0),
|
||||
@ -259,6 +263,16 @@ int LogSlidingWindow::init(const int64_t palf_id,
|
||||
|
||||
MEMSET(append_cnt_array_, 0, APPEND_CNT_ARRAY_SIZE * sizeof(int64_t));
|
||||
|
||||
PALF_REPORT_INFO_KV(K_(palf_id));
|
||||
fs_cb_cost_stat_.set_extra_info(EXTRA_INFOS);
|
||||
log_life_time_stat_.set_extra_info(EXTRA_INFOS);
|
||||
log_gen_to_freeze_cost_stat_.set_extra_info(EXTRA_INFOS);
|
||||
log_gen_to_submit_cost_stat_.set_extra_info(EXTRA_INFOS);
|
||||
log_submit_to_first_ack_cost_stat_.set_extra_info(EXTRA_INFOS);
|
||||
log_submit_to_flush_cost_stat_.set_extra_info(EXTRA_INFOS);
|
||||
log_submit_to_commit_cost_stat_.set_extra_info(EXTRA_INFOS);
|
||||
log_submit_to_slide_cost_stat_.set_extra_info(EXTRA_INFOS);
|
||||
|
||||
is_inited_ = true;
|
||||
LogGroupEntryHeader group_header;
|
||||
LogEntryHeader log_header;
|
||||
@ -563,6 +577,7 @@ int LogSlidingWindow::try_freeze_prev_log_(const int64_t next_log_id, const LSN
|
||||
log_task->unlock();
|
||||
// check if this log_task can be submitted
|
||||
if (log_task->is_freezed()) {
|
||||
log_task->set_freeze_ts(ObTimeUtility::current_time());
|
||||
is_need_handle = (0 == log_task->get_ref_cnt()) ? true : false;
|
||||
}
|
||||
}
|
||||
@ -735,6 +750,7 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn,
|
||||
}
|
||||
// check if this log_task can be submitted
|
||||
if (log_task->is_freezed()) {
|
||||
log_task->set_freeze_ts(ObTimeUtility::current_time());
|
||||
is_need_handle = (0 == log_task->get_ref_cnt()) ? true : false;
|
||||
}
|
||||
}
|
||||
@ -1163,6 +1179,7 @@ int LogSlidingWindow::try_freeze_last_log_task_(const int64_t expected_log_id,
|
||||
log_task->unlock();
|
||||
// check if this log_task can be submitted
|
||||
if (log_task->is_freezed()) {
|
||||
log_task->set_freeze_ts(ObTimeUtility::current_time());
|
||||
is_need_handle = (0 == log_task->get_ref_cnt()) ? true : false;
|
||||
}
|
||||
}
|
||||
@ -1588,6 +1605,9 @@ int LogSlidingWindow::try_advance_committed_lsn_(const LSN &end_lsn)
|
||||
get_committed_end_lsn_(old_committed_end_lsn);
|
||||
}
|
||||
}
|
||||
if (end_lsn > old_committed_end_lsn) {
|
||||
(void) try_advance_log_task_committed_ts_();
|
||||
}
|
||||
PALF_LOG(TRACE, "try_advance_committed_lsn_ success", K_(palf_id), K_(self), K_(committed_end_lsn));
|
||||
if (palf_reach_time_interval(PALF_STAT_PRINT_INTERVAL_US, end_lsn_stat_time_us_)) {
|
||||
LSN curr_end_lsn;
|
||||
@ -1599,6 +1619,58 @@ int LogSlidingWindow::try_advance_committed_lsn_(const LSN &end_lsn)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogSlidingWindow::try_advance_log_task_committed_ts_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else {
|
||||
LSN new_committed_end_lsn;
|
||||
get_committed_end_lsn(new_committed_end_lsn);
|
||||
const int64_t commit_ts = ObTimeUtility::current_time();
|
||||
int64_t last_submit_log_id = get_last_submit_log_id_();
|
||||
int64_t log_start_id = get_start_id();
|
||||
LogTask *log_task = NULL;
|
||||
for (int64_t tmp_log_id = last_submit_log_id; OB_SUCC(ret) && tmp_log_id >= log_start_id; tmp_log_id--) {
|
||||
LogTaskGuard guard(this);
|
||||
if (OB_FAIL(guard.get_log_task(tmp_log_id, log_task))) {
|
||||
// the log_task may has been slided
|
||||
} else if (!log_task->is_freezed() || log_task->get_end_lsn() > new_committed_end_lsn) {
|
||||
continue;
|
||||
} else if (OB_INVALID_TIMESTAMP == log_task->get_committed_ts()) {
|
||||
log_task->set_committed_ts(commit_ts);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogSlidingWindow::try_advance_log_task_first_ack_ts_(const LSN &end_lsn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else {
|
||||
const int64_t log_start_id = sw_.get_begin_sn();
|
||||
LogTask *log_task = NULL;
|
||||
for (int64_t tmp_log_id = last_submit_log_id_; OB_SUCC(ret) && tmp_log_id >= log_start_id; tmp_log_id--) {
|
||||
LogTaskGuard guard(this);
|
||||
if (OB_FAIL(guard.get_log_task(tmp_log_id, log_task))) {
|
||||
// the log_task may has been slided
|
||||
} else if (!log_task->is_freezed() || log_task->get_end_lsn() > end_lsn) {
|
||||
continue;
|
||||
} else if (OB_INVALID_TIMESTAMP == log_task->get_first_ack_ts()) {
|
||||
log_task->set_first_ack_ts(ObTimeUtility::current_time());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogSlidingWindow::inc_update_scn_base(const SCN &scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -2024,7 +2096,12 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot
|
||||
const int64_t log_proposal_id = log_task->get_proposal_id();
|
||||
const int64_t log_accum_checksum = log_task->get_accum_checksum();
|
||||
const int64_t log_gen_ts = log_task->get_gen_ts();
|
||||
const int64_t log_freeze_ts = log_task->get_freeze_ts();
|
||||
const int64_t log_submit_ts = log_task->get_submit_ts();
|
||||
const int64_t log_flush_ts = log_task->get_flushed_ts();
|
||||
const int64_t log_first_ack_ts = log_task->get_first_ack_ts();
|
||||
const int64_t log_commit_ts = log_task->get_committed_ts();
|
||||
PALF_LOG(INFO, "sliding_cb log_task", K(log_commit_ts), KPC(log_task));
|
||||
log_task->unlock();
|
||||
|
||||
// Verifying accum_checksum firstly.
|
||||
@ -2053,7 +2130,18 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot
|
||||
|
||||
const int64_t log_life_time = fs_cb_begin_ts - log_gen_ts;
|
||||
log_life_time_stat_.stat(log_life_time);
|
||||
log_submit_wait_stat_.stat(log_submit_ts - log_gen_ts);
|
||||
// When role is follower, the freeze state takes effect immediately and we do not need to stat it.
|
||||
log_gen_to_freeze_cost_stat_.stat(log_freeze_ts - log_gen_ts);
|
||||
log_gen_to_submit_cost_stat_.stat(log_submit_ts - log_gen_ts);
|
||||
log_submit_to_flush_cost_stat_.stat(log_flush_ts - log_submit_ts);
|
||||
// Concurrency problem
|
||||
// The first_ack_ts and committed_ts of log_task may havn't been updated when they were slided from sw.
|
||||
if (OB_INVALID_TIMESTAMP != log_first_ack_ts) {
|
||||
log_submit_to_first_ack_cost_stat_.stat(log_first_ack_ts - log_submit_ts);
|
||||
}
|
||||
if (OB_INVALID_TIMESTAMP != log_commit_ts) {
|
||||
log_submit_to_commit_cost_stat_.stat(log_commit_ts - log_submit_ts);
|
||||
}
|
||||
log_submit_to_slide_cost_stat_.stat(fs_cb_begin_ts - log_submit_ts);
|
||||
|
||||
if (log_life_time > 100 * 1000) {
|
||||
@ -3099,6 +3187,7 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server,
|
||||
// update group log data_checksum
|
||||
log_task->set_group_log_checksum(group_log_data_checksum);
|
||||
(void) log_task->set_freezed();
|
||||
log_task->set_freeze_ts(ObTimeUtility::current_time());
|
||||
}
|
||||
log_task->unlock();
|
||||
}
|
||||
@ -3285,6 +3374,7 @@ int LogSlidingWindow::submit_group_log(const LSN &lsn,
|
||||
log_task->set_group_log_checksum(group_log_data_checksum);
|
||||
(void) log_task->set_submit_log_exist();
|
||||
(void) log_task->set_freezed();
|
||||
log_task->set_freeze_ts(ObTimeUtility::current_time());
|
||||
}
|
||||
log_task->unlock();
|
||||
|
||||
@ -3953,6 +4043,10 @@ int LogSlidingWindow::ack_log(const common::ObAddr &src_server, const LSN &end_l
|
||||
LSN old_committed_end_lsn;
|
||||
get_committed_end_lsn_(old_committed_end_lsn);
|
||||
LSN new_committed_end_lsn;
|
||||
// first ack ts only needs to be updated when end_lsn >= old_committed_lsn
|
||||
if (old_committed_end_lsn < end_lsn) {
|
||||
(void) try_advance_log_task_first_ack_ts_(end_lsn);
|
||||
}
|
||||
if (state_mgr_->is_leader_active()) {
|
||||
// Only leader with ACTIVE state can generate new committed_end_lsn.
|
||||
(void) gen_committed_end_lsn_(new_committed_end_lsn);
|
||||
@ -4053,6 +4147,7 @@ int LogSlidingWindow::append_disk_log_to_sw_(const LSN &lsn,
|
||||
log_task->set_prev_lsn(max_flushed_lsn);
|
||||
log_task->set_prev_log_proposal_id(max_flushed_log_pid);
|
||||
log_task->set_freezed();
|
||||
log_task->set_freeze_ts(ObTimeUtility::current_time());
|
||||
log_task->try_pre_submit();
|
||||
PALF_LOG(TRACE, "append_disk_log success", K(ret), K_(palf_id), K_(self), K(lsn), K(header), KPC(log_task));
|
||||
}
|
||||
|
@ -305,6 +305,8 @@ private:
|
||||
const int64_t &proposal_id,
|
||||
const int64_t accum_checksum);
|
||||
int try_advance_committed_lsn_(const LSN &end_lsn);
|
||||
int try_advance_log_task_committed_ts_();
|
||||
int try_advance_log_task_first_ack_ts_(const LSN &end_lsn);
|
||||
void get_last_submit_log_info_(LSN &lsn,
|
||||
LSN &end_lsn,
|
||||
int64_t &log_id,
|
||||
@ -518,7 +520,11 @@ private:
|
||||
LSN last_record_end_lsn_;
|
||||
ObMiniStat::ObStatItem fs_cb_cost_stat_;
|
||||
ObMiniStat::ObStatItem log_life_time_stat_;
|
||||
ObMiniStat::ObStatItem log_submit_wait_stat_;
|
||||
ObMiniStat::ObStatItem log_gen_to_freeze_cost_stat_;
|
||||
ObMiniStat::ObStatItem log_gen_to_submit_cost_stat_;
|
||||
ObMiniStat::ObStatItem log_submit_to_first_ack_cost_stat_;
|
||||
ObMiniStat::ObStatItem log_submit_to_flush_cost_stat_;
|
||||
ObMiniStat::ObStatItem log_submit_to_commit_cost_stat_;
|
||||
ObMiniStat::ObStatItem log_submit_to_slide_cost_stat_;
|
||||
int64_t group_log_stat_time_us_;
|
||||
int64_t accum_log_cnt_;
|
||||
|
@ -570,7 +570,7 @@ int LogStateMgr::pending_to_follower_active_()
|
||||
if (OB_FAIL(to_follower_active_())) {
|
||||
PALF_LOG(ERROR, "to_follower_active_ failed", K(ret), K_(palf_id));
|
||||
}
|
||||
PALF_EVENT_REPORT_INFO_KV(K_(leader), K_(pending_end_lsn));
|
||||
PALF_REPORT_INFO_KV(K_(leader), K_(pending_end_lsn));
|
||||
plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::PENDING,
|
||||
FOLLOWER, ObReplicaState::ACTIVE, EXTRA_INFOS);
|
||||
PALF_EVENT("follower_pending_to_follower_active", palf_id_, K(ret), K_(self), K_(leader),
|
||||
@ -602,7 +602,7 @@ int LogStateMgr::to_reconfirm_(const int64_t new_leader_epoch)
|
||||
int LogStateMgr::follower_active_to_reconfirm_(const int64_t new_leader_epoch)
|
||||
{
|
||||
int ret = to_reconfirm_(new_leader_epoch);
|
||||
PALF_EVENT_REPORT_INFO_KV(K_(leader), K(new_leader_epoch));
|
||||
PALF_REPORT_INFO_KV(K_(leader), K(new_leader_epoch));
|
||||
plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::ACTIVE,
|
||||
LEADER, ObReplicaState::RECONFIRM, EXTRA_INFOS);
|
||||
PALF_EVENT("follower_active_to_reconfirm", palf_id_, K(ret), K_(self), K(new_leader_epoch),
|
||||
@ -613,7 +613,7 @@ int LogStateMgr::follower_active_to_reconfirm_(const int64_t new_leader_epoch)
|
||||
int LogStateMgr::follower_pending_to_reconfirm_(const int64_t new_leader_epoch)
|
||||
{
|
||||
int ret = to_reconfirm_(new_leader_epoch);
|
||||
PALF_EVENT_REPORT_INFO_KV(K_(leader), K(new_leader_epoch), K_(pending_end_lsn));
|
||||
PALF_REPORT_INFO_KV(K_(leader), K(new_leader_epoch), K_(pending_end_lsn));
|
||||
plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::PENDING,
|
||||
LEADER, ObReplicaState::RECONFIRM, EXTRA_INFOS);
|
||||
PALF_EVENT("follower_pending_to_reconfirm", palf_id_, K(ret), K_(self), K(new_leader_epoch),
|
||||
@ -629,7 +629,7 @@ int LogStateMgr::reconfirm_to_follower_pending_()
|
||||
} else {
|
||||
reset_status_();
|
||||
update_role_and_state_(FOLLOWER, PENDING);
|
||||
PALF_EVENT_REPORT_INFO_KV(K_(leader), K_(allow_vote));
|
||||
PALF_REPORT_INFO_KV(K_(leader), K_(allow_vote));
|
||||
plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::RECONFIRM,
|
||||
FOLLOWER, ObReplicaState::PENDING, EXTRA_INFOS);
|
||||
PALF_EVENT("reconfirm_to_follower_pending", palf_id_, K_(self), K_(leader), "is_allow_vote",
|
||||
@ -663,7 +663,7 @@ int LogStateMgr::reconfirm_to_leader_active_()
|
||||
}
|
||||
const int64_t reconfirm_to_active_cost = ObTimeUtility::current_time() - reconfirm_start_time_us_;
|
||||
PALF_EVENT("reconfirm_to_leader_active end", palf_id_, K(ret), K_(self), K(reconfirm_to_active_cost), K_(role), K_(state));
|
||||
PALF_EVENT_REPORT_INFO_KV(K(reconfirm_stage_cost), K(reconfirm_to_active_cost));
|
||||
PALF_REPORT_INFO_KV(K(reconfirm_stage_cost), K(reconfirm_to_active_cost));
|
||||
plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::RECONFIRM,
|
||||
LEADER, ObReplicaState::ACTIVE, EXTRA_INFOS);
|
||||
}
|
||||
@ -690,7 +690,7 @@ int LogStateMgr::leader_active_to_follower_pending_()
|
||||
update_role_and_state_(FOLLOWER, PENDING);
|
||||
}
|
||||
|
||||
PALF_EVENT_REPORT_INFO_KV(K_(pending_end_lsn));
|
||||
PALF_REPORT_INFO_KV(K_(pending_end_lsn));
|
||||
plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::ACTIVE,
|
||||
FOLLOWER, ObReplicaState::PENDING, EXTRA_INFOS);
|
||||
PALF_EVENT("leader_active_to_follower_pending_", palf_id_, K(ret), K_(self), K_(role), K_(state), K_(pending_end_lsn));
|
||||
|
@ -138,9 +138,12 @@ LogTask::LogTask()
|
||||
header_(),
|
||||
ref_cnt_(0),
|
||||
log_cnt_(0),
|
||||
gen_ts_(0),
|
||||
submit_ts_(0),
|
||||
flushed_ts_(0),
|
||||
gen_ts_(OB_INVALID_TIMESTAMP),
|
||||
freeze_ts_(OB_INVALID_TIMESTAMP),
|
||||
submit_ts_(OB_INVALID_TIMESTAMP),
|
||||
first_ack_ts_(OB_INVALID_TIMESTAMP),
|
||||
flushed_ts_(OB_INVALID_TIMESTAMP),
|
||||
committed_ts_(OB_INVALID_TIMESTAMP),
|
||||
lock_()
|
||||
{
|
||||
reset();
|
||||
@ -161,9 +164,12 @@ void LogTask::reset()
|
||||
header_.reset();
|
||||
ref_cnt_ = 0;
|
||||
log_cnt_ = 0;
|
||||
gen_ts_ = 0;
|
||||
submit_ts_ = 0;
|
||||
flushed_ts_ = 0;
|
||||
gen_ts_ = OB_INVALID_TIMESTAMP;
|
||||
freeze_ts_ = OB_INVALID_TIMESTAMP;
|
||||
submit_ts_ = OB_INVALID_TIMESTAMP;
|
||||
first_ack_ts_ = OB_INVALID_TIMESTAMP;
|
||||
flushed_ts_ = OB_INVALID_TIMESTAMP;
|
||||
committed_ts_ = OB_INVALID_TIMESTAMP;
|
||||
}
|
||||
|
||||
bool LogTask::can_be_slid()
|
||||
@ -248,6 +254,7 @@ int LogTask::set_group_header(const LSN &lsn, const SCN &scn, const LogGroupEntr
|
||||
PALF_LOG(WARN, "log_task has been valid", K(ret), K(lsn), K(scn), K(group_entry_header), KPC(this));
|
||||
} else {
|
||||
header_.begin_lsn_ = lsn;
|
||||
header_.end_lsn_ = lsn + group_entry_header.get_serialize_size() + group_entry_header.get_data_len();
|
||||
header_.log_id_ = group_entry_header.get_log_id();
|
||||
header_.is_padding_log_ = group_entry_header.is_padding_log();
|
||||
header_.proposal_id_ = group_entry_header.get_log_proposal_id(); // leader's proposal_id when generate this log
|
||||
@ -417,14 +424,30 @@ int64_t LogTask::ref(const int64_t val, const bool is_append_log)
|
||||
return ATOMIC_AAF(&ref_cnt_, val);
|
||||
}
|
||||
|
||||
void LogTask::set_freeze_ts(const int64_t ts)
|
||||
{
|
||||
// freeze ts may be setted more than once
|
||||
ATOMIC_CAS(&freeze_ts_, OB_INVALID_TIMESTAMP, ts);
|
||||
}
|
||||
|
||||
void LogTask::set_submit_ts(const int64_t ts)
|
||||
{
|
||||
ATOMIC_STORE(&submit_ts_, ts);
|
||||
}
|
||||
|
||||
void LogTask::set_first_ack_ts(const int64_t ts)
|
||||
{
|
||||
ATOMIC_STORE(&first_ack_ts_, ts);
|
||||
}
|
||||
|
||||
void LogTask::set_flushed_ts(const int64_t ts)
|
||||
{
|
||||
ATOMIC_STORE(&flushed_ts_, ts);
|
||||
}
|
||||
|
||||
void LogTask::set_committed_ts(const int64_t ts)
|
||||
{
|
||||
ATOMIC_STORE(&committed_ts_, ts);
|
||||
}
|
||||
} // namespace palf
|
||||
} // namespace oceanbase
|
||||
|
@ -138,13 +138,25 @@ public:
|
||||
LSN get_begin_lsn() const { return header_.begin_lsn_; }
|
||||
LSN get_end_lsn() const { return header_.end_lsn_; }
|
||||
int64_t get_accum_checksum() const { return header_.accum_checksum_; }
|
||||
void set_freeze_ts(const int64_t ts);
|
||||
void set_submit_ts(const int64_t ts);
|
||||
void set_first_ack_ts(const int64_t ts);
|
||||
void set_flushed_ts(const int64_t ts);
|
||||
void set_committed_ts(const int64_t ts);
|
||||
int64_t get_gen_ts() const { return ATOMIC_LOAD(&(gen_ts_)); }
|
||||
int64_t get_freeze_ts() const { return ATOMIC_LOAD(&(freeze_ts_)); }
|
||||
int64_t get_submit_ts() const { return ATOMIC_LOAD(&(submit_ts_)); }
|
||||
int64_t get_first_ack_ts() const { return ATOMIC_LOAD(&(first_ack_ts_)); }
|
||||
int64_t get_flushed_ts() const { return ATOMIC_LOAD(&(flushed_ts_)); }
|
||||
TO_STRING_KV(K_(header), K_(state_map), K_(ref_cnt), K_(gen_ts), "submit_wait_time", submit_ts_ - gen_ts_,
|
||||
K_(submit_ts), "flush_cost", ((flushed_ts_ - submit_ts_) < 0 ? 0 : (flushed_ts_ - submit_ts_)), K_(flushed_ts));
|
||||
int64_t get_committed_ts() const { return ATOMIC_LOAD(&(committed_ts_)); }
|
||||
TO_STRING_KV(K_(header), K_(state_map), K_(ref_cnt),
|
||||
K_(gen_ts), K_(freeze_ts), K_(submit_ts), K_(flushed_ts), K_(first_ack_ts), K_(committed_ts),
|
||||
"gen_to_freeze cost time", freeze_ts_ - gen_ts_,
|
||||
"gen_to_submit cost time", submit_ts_ - gen_ts_,
|
||||
"submit_to_flush cost time", ((flushed_ts_ - submit_ts_) < 0 ? 0 : (flushed_ts_ - submit_ts_)),
|
||||
"submit_to_first_ack cost time", ((first_ack_ts_ - submit_ts_) < 0 ? 0 : (first_ack_ts_ - submit_ts_)),
|
||||
"submit_to_commit cost time", ((committed_ts_ - submit_ts_) < 0 ? 0 : (committed_ts_ - submit_ts_))
|
||||
);
|
||||
private:
|
||||
int try_freeze_(const LSN &end_lsn);
|
||||
private:
|
||||
@ -153,8 +165,11 @@ private:
|
||||
int64_t ref_cnt_;
|
||||
int64_t log_cnt_; // log_entry count
|
||||
mutable int64_t gen_ts_;
|
||||
mutable int64_t freeze_ts_;
|
||||
mutable int64_t submit_ts_;
|
||||
mutable int64_t first_ack_ts_;
|
||||
mutable int64_t flushed_ts_;
|
||||
mutable int64_t committed_ts_;
|
||||
mutable common::ObLatch lock_;
|
||||
};
|
||||
} // end namespace palf
|
||||
|
@ -148,6 +148,9 @@ int PalfHandleImpl::init(const int64_t palf_id,
|
||||
alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer))) {
|
||||
PALF_LOG(WARN, "PalfHandleImpl do_init_mem_ failed", K(ret), K(palf_id));
|
||||
} else {
|
||||
PALF_REPORT_INFO_KV(K_(palf_id));
|
||||
append_cost_stat_.set_extra_info(EXTRA_INFOS);
|
||||
flush_cb_cost_stat_.set_extra_info(EXTRA_INFOS);
|
||||
last_accum_statistic_time_ = ObTimeUtility::current_time();
|
||||
PALF_EVENT("PalfHandleImpl init success", palf_id_, K(ret), K(self), K(access_mode), K(palf_base_info),
|
||||
K(replica_type), K(log_dir), K(log_meta), K(palf_epoch));
|
||||
@ -1672,7 +1675,7 @@ int PalfHandleImpl::inner_append_log(const LSN &lsn,
|
||||
const int64_t time_cost = now - begin_ts;
|
||||
append_cost_stat_.stat(time_cost);
|
||||
if (time_cost >= 5 * 1000) {
|
||||
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "write log cost too much time", K(ret), KPC(this), K(lsn), K(scn), K(time_cost));
|
||||
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "write log cost too much time", K(ret), KPC(this), K(lsn), K(scn), K(accum_size), K(time_cost));
|
||||
}
|
||||
if (palf_reach_time_interval(PALF_STAT_PRINT_INTERVAL_US, last_accum_statistic_time_)) {
|
||||
PALF_LOG(INFO, "[PALF STAT INNER APPEND LOG]", KPC(this), K(accum_size));
|
||||
@ -1697,15 +1700,15 @@ int PalfHandleImpl::inner_append_log(const LSNArray &lsn_array,
|
||||
int64_t count = lsn_array.count();
|
||||
int64_t accum_size = 0, curr_size = 0;
|
||||
for (int64_t i = 0; i < count; i++) {
|
||||
curr_size = write_buf_array[i]->get_total_size();
|
||||
accum_size = ATOMIC_AAF(&accum_write_log_size_, curr_size);
|
||||
curr_size += write_buf_array[i]->get_total_size();
|
||||
}
|
||||
accum_size = ATOMIC_AAF(&accum_write_log_size_, curr_size);
|
||||
const int64_t now = ObTimeUtility::current_time();
|
||||
const int64_t time_cost = now - begin_ts;
|
||||
append_cost_stat_.stat(time_cost);
|
||||
if (time_cost > 10 * 1000) {
|
||||
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "write log cost too much time", K(ret), KPC(this), K(lsn_array),
|
||||
K(scn_array), K(time_cost));
|
||||
K(scn_array), K(curr_size), K(time_cost));
|
||||
}
|
||||
if (palf_reach_time_interval(PALF_STAT_PRINT_INTERVAL_US, last_accum_statistic_time_)) {
|
||||
PALF_LOG(INFO, "[PALF STAT INNER APPEND LOG]", KPC(this), K(accum_size));
|
||||
|
Loading…
x
Reference in New Issue
Block a user