diff --git a/deps/oblib/src/lib/utility/utility.h b/deps/oblib/src/lib/utility/utility.h index 463ee20f8..e19a28a1e 100644 --- a/deps/oblib/src/lib/utility/utility.h +++ b/deps/oblib/src/lib/utility/utility.h @@ -1123,20 +1123,26 @@ public: { public: ObStatItem(const char *item, const int64_t stat_interval) - : item_(item), stat_interval_(stat_interval), last_ts_(0), stat_count_(0), lock_tag_(false) {} + : item_(item), stat_interval_(stat_interval), last_ts_(0), stat_count_(0), accum_count_(0), lock_tag_(false) { + MEMSET(extra_info_, '\0', MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH); + } ~ObStatItem() {} public: - void stat(const int64_t time_cost = 0) + void set_extra_info(const char *extra_info) + { + MEMCPY(extra_info_, extra_info, MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH); + } + void stat(const int64_t count = 0) { const int64_t cur_ts = ::oceanbase::common::ObTimeUtility::fast_current_time(); const int64_t cur_stat_count = ATOMIC_AAF(&stat_count_, 1); - const int64_t cur_accum_time = ATOMIC_AAF(&accum_time_, time_cost); + const int64_t cur_accum_count = ATOMIC_AAF(&accum_count_, count); if (ATOMIC_LOAD(&last_ts_) + stat_interval_ < cur_ts) { if (ATOMIC_BCAS(&lock_tag_, false, true)) { - LIB_LOG(INFO, NULL == item_ ? "" : item_, K(cur_stat_count), K_(stat_interval), "avg cost", cur_accum_time / cur_stat_count, K(this)); + LIB_LOG(INFO, NULL == item_ ? "" : item_, K(cur_stat_count), K_(stat_interval), "avg count/cost", cur_accum_count / cur_stat_count, K(this), K_(extra_info)); (void)ATOMIC_SET(&last_ts_, cur_ts); (void)ATOMIC_SET(&stat_count_, 0); - (void)ATOMIC_SET(&accum_time_, 0); + (void)ATOMIC_SET(&accum_count_, 0); ATOMIC_BCAS(&lock_tag_, true, false); } } @@ -1144,9 +1150,10 @@ public: private: const char *const item_; const int64_t stat_interval_; + char extra_info_[MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH]; int64_t last_ts_; int64_t stat_count_; - int64_t accum_time_; + int64_t accum_count_; bool lock_tag_; }; public: diff --git a/src/logservice/ob_log_handler.cpp b/src/logservice/ob_log_handler.cpp index 6d25cdad6..f28f081ed 100644 --- a/src/logservice/ob_log_handler.cpp +++ b/src/logservice/ob_log_handler.cpp @@ -92,6 +92,8 @@ int ObLogHandler::init(const int64_t id, replay_service_ = replay_service; rc_service_ = rc_service; apply_status_->inc_ref(); + PALF_REPORT_INFO_KV(K(id)); + append_cost_stat_.set_extra_info(EXTRA_INFOS); id_ = id; self_ = self; palf_handle_ = palf_handle; diff --git a/src/logservice/palf/log_define.h b/src/logservice/palf/log_define.h index 099a2c2a5..8d3a6cc5f 100644 --- a/src/logservice/palf/log_define.h +++ b/src/logservice/palf/log_define.h @@ -39,7 +39,7 @@ namespace palf #define PALF_EVENT(info_string, palf_id, args...) FLOG_INFO("[PALF_EVENT] "info_string, "palf_id", palf_id, args) -#define PALF_EVENT_REPORT_INFO_KV(args...) \ +#define PALF_REPORT_INFO_KV(args...) \ const int64_t MAX_INFO_LENGTH = 512; \ char EXTRA_INFOS[MAX_INFO_LENGTH]; \ int64_t pos = 0; \ diff --git a/src/logservice/palf/log_io_worker.cpp b/src/logservice/palf/log_io_worker.cpp index cfd44e3a9..9ce2cb54b 100644 --- a/src/logservice/palf/log_io_worker.cpp +++ b/src/logservice/palf/log_io_worker.cpp @@ -34,6 +34,7 @@ LogIOWorker::LogIOWorker() do_task_count_(0), print_log_interval_(OB_INVALID_TIMESTAMP), last_working_time_(OB_INVALID_TIMESTAMP), + log_io_worker_queue_size_stat_("[PALF STAT LOG IO WORKER QUEUE SIZE]", PALF_STAT_PRINT_INTERVAL_US), is_inited_(false) { } @@ -69,6 +70,8 @@ int LogIOWorker::init(const LogIOWorkerConfig &config, log_io_worker_num_ = config.io_worker_num_; cb_thread_pool_tg_id_ = cb_thread_pool_tg_id; palf_env_impl_ = palf_env_impl; + PALF_REPORT_INFO_KV(K_(log_io_worker_num), K_(cb_thread_pool_tg_id)); + log_io_worker_queue_size_stat_.set_extra_info(EXTRA_INFOS); is_inited_ = true; PALF_LOG(INFO, "LogIOWorker init success", K(ret), K(config), K(cb_thread_pool_tg_id), KPC(palf_env_impl)); @@ -148,13 +151,15 @@ int LogIOWorker::run_loop_() while (false == has_set_stop() && false == (OB_NOT_NULL(&lib::Thread::current()) ? lib::Thread::current().has_set_stop() : false)) { - void *task = NULL; if (OB_SUCC(queue_.pop(task, QUEUE_WAIT_TIME))) { ATOMIC_STORE(&last_working_time_, common::ObTimeUtility::fast_current_time()); ret = reduce_io_task_(task); ATOMIC_STORE(&last_working_time_, OB_INVALID_TIMESTAMP); } + if (queue_.size() > 0) { + log_io_worker_queue_size_stat_.stat(queue_.size()); + } } // After IOWorker has stopped, need clear queue_. diff --git a/src/logservice/palf/log_io_worker.h b/src/logservice/palf/log_io_worker.h index 0dac398c9..5e165f579 100644 --- a/src/logservice/palf/log_io_worker.h +++ b/src/logservice/palf/log_io_worker.h @@ -124,6 +124,7 @@ private: int64_t do_task_count_; int64_t print_log_interval_; int64_t last_working_time_; + ObMiniStat::ObStatItem log_io_worker_queue_size_stat_; bool is_inited_; }; } // end namespace palf diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index faeee6168..73c70662d 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -121,8 +121,12 @@ LogSlidingWindow::LogSlidingWindow() last_record_end_lsn_(PALF_INITIAL_LSN_VAL), fs_cb_cost_stat_("[PALF STAT FS CB]", PALF_STAT_PRINT_INTERVAL_US), log_life_time_stat_("[PALF STAT LOG LIFETIME]", PALF_STAT_PRINT_INTERVAL_US), - log_submit_wait_stat_("[PALF STAT LOG SUBMIT WAIT]", PALF_STAT_PRINT_INTERVAL_US), - log_submit_to_slide_cost_stat_("[PALF STAT LOG SLIDE WAIT]", PALF_STAT_PRINT_INTERVAL_US), + log_gen_to_freeze_cost_stat_("PALF STAT LOG GEN_TO_FREEZE COST TIME", PALF_STAT_PRINT_INTERVAL_US), + log_gen_to_submit_cost_stat_("PALF STAT LOG GEN_TO_SUBMIT COST TIME", PALF_STAT_PRINT_INTERVAL_US), + log_submit_to_first_ack_cost_stat_("PALF STAT LOG SUBMIT_TO_FIRST_ACK COST TIME", PALF_STAT_PRINT_INTERVAL_US), + log_submit_to_flush_cost_stat_("PALF STAT LOG SUBMIT_TO_FLUSH COST TIME", PALF_STAT_PRINT_INTERVAL_US), + log_submit_to_commit_cost_stat_("PALF STAT LOG SUBMIT_TO_COMMIT COST TIME", PALF_STAT_PRINT_INTERVAL_US), + log_submit_to_slide_cost_stat_("[PALF STAT LOG SUBMIT_TO_SLIDE COST TIME]", PALF_STAT_PRINT_INTERVAL_US), group_log_stat_time_us_(OB_INVALID_TIMESTAMP), accum_log_cnt_(0), accum_group_log_size_(0), @@ -259,6 +263,16 @@ int LogSlidingWindow::init(const int64_t palf_id, MEMSET(append_cnt_array_, 0, APPEND_CNT_ARRAY_SIZE * sizeof(int64_t)); + PALF_REPORT_INFO_KV(K_(palf_id)); + fs_cb_cost_stat_.set_extra_info(EXTRA_INFOS); + log_life_time_stat_.set_extra_info(EXTRA_INFOS); + log_gen_to_freeze_cost_stat_.set_extra_info(EXTRA_INFOS); + log_gen_to_submit_cost_stat_.set_extra_info(EXTRA_INFOS); + log_submit_to_first_ack_cost_stat_.set_extra_info(EXTRA_INFOS); + log_submit_to_flush_cost_stat_.set_extra_info(EXTRA_INFOS); + log_submit_to_commit_cost_stat_.set_extra_info(EXTRA_INFOS); + log_submit_to_slide_cost_stat_.set_extra_info(EXTRA_INFOS); + is_inited_ = true; LogGroupEntryHeader group_header; LogEntryHeader log_header; @@ -563,6 +577,7 @@ int LogSlidingWindow::try_freeze_prev_log_(const int64_t next_log_id, const LSN log_task->unlock(); // check if this log_task can be submitted if (log_task->is_freezed()) { + log_task->set_freeze_ts(ObTimeUtility::current_time()); is_need_handle = (0 == log_task->get_ref_cnt()) ? true : false; } } @@ -735,6 +750,7 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn, } // check if this log_task can be submitted if (log_task->is_freezed()) { + log_task->set_freeze_ts(ObTimeUtility::current_time()); is_need_handle = (0 == log_task->get_ref_cnt()) ? true : false; } } @@ -1163,6 +1179,7 @@ int LogSlidingWindow::try_freeze_last_log_task_(const int64_t expected_log_id, log_task->unlock(); // check if this log_task can be submitted if (log_task->is_freezed()) { + log_task->set_freeze_ts(ObTimeUtility::current_time()); is_need_handle = (0 == log_task->get_ref_cnt()) ? true : false; } } @@ -1588,6 +1605,9 @@ int LogSlidingWindow::try_advance_committed_lsn_(const LSN &end_lsn) get_committed_end_lsn_(old_committed_end_lsn); } } + if (end_lsn > old_committed_end_lsn) { + (void) try_advance_log_task_committed_ts_(); + } PALF_LOG(TRACE, "try_advance_committed_lsn_ success", K_(palf_id), K_(self), K_(committed_end_lsn)); if (palf_reach_time_interval(PALF_STAT_PRINT_INTERVAL_US, end_lsn_stat_time_us_)) { LSN curr_end_lsn; @@ -1599,6 +1619,58 @@ int LogSlidingWindow::try_advance_committed_lsn_(const LSN &end_lsn) return ret; } +int LogSlidingWindow::try_advance_log_task_committed_ts_() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else { + LSN new_committed_end_lsn; + get_committed_end_lsn(new_committed_end_lsn); + const int64_t commit_ts = ObTimeUtility::current_time(); + int64_t last_submit_log_id = get_last_submit_log_id_(); + int64_t log_start_id = get_start_id(); + LogTask *log_task = NULL; + for (int64_t tmp_log_id = last_submit_log_id; OB_SUCC(ret) && tmp_log_id >= log_start_id; tmp_log_id--) { + LogTaskGuard guard(this); + if (OB_FAIL(guard.get_log_task(tmp_log_id, log_task))) { + // the log_task may has been slided + } else if (!log_task->is_freezed() || log_task->get_end_lsn() > new_committed_end_lsn) { + continue; + } else if (OB_INVALID_TIMESTAMP == log_task->get_committed_ts()) { + log_task->set_committed_ts(commit_ts); + } else { + break; + } + } + } + return ret; +} + +int LogSlidingWindow::try_advance_log_task_first_ack_ts_(const LSN &end_lsn) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else { + const int64_t log_start_id = sw_.get_begin_sn(); + LogTask *log_task = NULL; + for (int64_t tmp_log_id = last_submit_log_id_; OB_SUCC(ret) && tmp_log_id >= log_start_id; tmp_log_id--) { + LogTaskGuard guard(this); + if (OB_FAIL(guard.get_log_task(tmp_log_id, log_task))) { + // the log_task may has been slided + } else if (!log_task->is_freezed() || log_task->get_end_lsn() > end_lsn) { + continue; + } else if (OB_INVALID_TIMESTAMP == log_task->get_first_ack_ts()) { + log_task->set_first_ack_ts(ObTimeUtility::current_time()); + } else { + break; + } + } + } + return ret; +} + int LogSlidingWindow::inc_update_scn_base(const SCN &scn) { int ret = OB_SUCCESS; @@ -2024,7 +2096,12 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot const int64_t log_proposal_id = log_task->get_proposal_id(); const int64_t log_accum_checksum = log_task->get_accum_checksum(); const int64_t log_gen_ts = log_task->get_gen_ts(); + const int64_t log_freeze_ts = log_task->get_freeze_ts(); const int64_t log_submit_ts = log_task->get_submit_ts(); + const int64_t log_flush_ts = log_task->get_flushed_ts(); + const int64_t log_first_ack_ts = log_task->get_first_ack_ts(); + const int64_t log_commit_ts = log_task->get_committed_ts(); + PALF_LOG(INFO, "sliding_cb log_task", K(log_commit_ts), KPC(log_task)); log_task->unlock(); // Verifying accum_checksum firstly. @@ -2053,7 +2130,18 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot const int64_t log_life_time = fs_cb_begin_ts - log_gen_ts; log_life_time_stat_.stat(log_life_time); - log_submit_wait_stat_.stat(log_submit_ts - log_gen_ts); + // When role is follower, the freeze state takes effect immediately and we do not need to stat it. + log_gen_to_freeze_cost_stat_.stat(log_freeze_ts - log_gen_ts); + log_gen_to_submit_cost_stat_.stat(log_submit_ts - log_gen_ts); + log_submit_to_flush_cost_stat_.stat(log_flush_ts - log_submit_ts); + // Concurrency problem + // The first_ack_ts and committed_ts of log_task may havn't been updated when they were slided from sw. + if (OB_INVALID_TIMESTAMP != log_first_ack_ts) { + log_submit_to_first_ack_cost_stat_.stat(log_first_ack_ts - log_submit_ts); + } + if (OB_INVALID_TIMESTAMP != log_commit_ts) { + log_submit_to_commit_cost_stat_.stat(log_commit_ts - log_submit_ts); + } log_submit_to_slide_cost_stat_.stat(fs_cb_begin_ts - log_submit_ts); if (log_life_time > 100 * 1000) { @@ -3099,6 +3187,7 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server, // update group log data_checksum log_task->set_group_log_checksum(group_log_data_checksum); (void) log_task->set_freezed(); + log_task->set_freeze_ts(ObTimeUtility::current_time()); } log_task->unlock(); } @@ -3285,6 +3374,7 @@ int LogSlidingWindow::submit_group_log(const LSN &lsn, log_task->set_group_log_checksum(group_log_data_checksum); (void) log_task->set_submit_log_exist(); (void) log_task->set_freezed(); + log_task->set_freeze_ts(ObTimeUtility::current_time()); } log_task->unlock(); @@ -3953,6 +4043,10 @@ int LogSlidingWindow::ack_log(const common::ObAddr &src_server, const LSN &end_l LSN old_committed_end_lsn; get_committed_end_lsn_(old_committed_end_lsn); LSN new_committed_end_lsn; + // first ack ts only needs to be updated when end_lsn >= old_committed_lsn + if (old_committed_end_lsn < end_lsn) { + (void) try_advance_log_task_first_ack_ts_(end_lsn); + } if (state_mgr_->is_leader_active()) { // Only leader with ACTIVE state can generate new committed_end_lsn. (void) gen_committed_end_lsn_(new_committed_end_lsn); @@ -4053,6 +4147,7 @@ int LogSlidingWindow::append_disk_log_to_sw_(const LSN &lsn, log_task->set_prev_lsn(max_flushed_lsn); log_task->set_prev_log_proposal_id(max_flushed_log_pid); log_task->set_freezed(); + log_task->set_freeze_ts(ObTimeUtility::current_time()); log_task->try_pre_submit(); PALF_LOG(TRACE, "append_disk_log success", K(ret), K_(palf_id), K_(self), K(lsn), K(header), KPC(log_task)); } diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index c5663c083..bc4472296 100644 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -305,6 +305,8 @@ private: const int64_t &proposal_id, const int64_t accum_checksum); int try_advance_committed_lsn_(const LSN &end_lsn); + int try_advance_log_task_committed_ts_(); + int try_advance_log_task_first_ack_ts_(const LSN &end_lsn); void get_last_submit_log_info_(LSN &lsn, LSN &end_lsn, int64_t &log_id, @@ -518,7 +520,11 @@ private: LSN last_record_end_lsn_; ObMiniStat::ObStatItem fs_cb_cost_stat_; ObMiniStat::ObStatItem log_life_time_stat_; - ObMiniStat::ObStatItem log_submit_wait_stat_; + ObMiniStat::ObStatItem log_gen_to_freeze_cost_stat_; + ObMiniStat::ObStatItem log_gen_to_submit_cost_stat_; + ObMiniStat::ObStatItem log_submit_to_first_ack_cost_stat_; + ObMiniStat::ObStatItem log_submit_to_flush_cost_stat_; + ObMiniStat::ObStatItem log_submit_to_commit_cost_stat_; ObMiniStat::ObStatItem log_submit_to_slide_cost_stat_; int64_t group_log_stat_time_us_; int64_t accum_log_cnt_; diff --git a/src/logservice/palf/log_state_mgr.cpp b/src/logservice/palf/log_state_mgr.cpp index ddba2511d..e3dd286dd 100644 --- a/src/logservice/palf/log_state_mgr.cpp +++ b/src/logservice/palf/log_state_mgr.cpp @@ -570,7 +570,7 @@ int LogStateMgr::pending_to_follower_active_() if (OB_FAIL(to_follower_active_())) { PALF_LOG(ERROR, "to_follower_active_ failed", K(ret), K_(palf_id)); } - PALF_EVENT_REPORT_INFO_KV(K_(leader), K_(pending_end_lsn)); + PALF_REPORT_INFO_KV(K_(leader), K_(pending_end_lsn)); plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::PENDING, FOLLOWER, ObReplicaState::ACTIVE, EXTRA_INFOS); PALF_EVENT("follower_pending_to_follower_active", palf_id_, K(ret), K_(self), K_(leader), @@ -602,7 +602,7 @@ int LogStateMgr::to_reconfirm_(const int64_t new_leader_epoch) int LogStateMgr::follower_active_to_reconfirm_(const int64_t new_leader_epoch) { int ret = to_reconfirm_(new_leader_epoch); - PALF_EVENT_REPORT_INFO_KV(K_(leader), K(new_leader_epoch)); + PALF_REPORT_INFO_KV(K_(leader), K(new_leader_epoch)); plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::ACTIVE, LEADER, ObReplicaState::RECONFIRM, EXTRA_INFOS); PALF_EVENT("follower_active_to_reconfirm", palf_id_, K(ret), K_(self), K(new_leader_epoch), @@ -613,7 +613,7 @@ int LogStateMgr::follower_active_to_reconfirm_(const int64_t new_leader_epoch) int LogStateMgr::follower_pending_to_reconfirm_(const int64_t new_leader_epoch) { int ret = to_reconfirm_(new_leader_epoch); - PALF_EVENT_REPORT_INFO_KV(K_(leader), K(new_leader_epoch), K_(pending_end_lsn)); + PALF_REPORT_INFO_KV(K_(leader), K(new_leader_epoch), K_(pending_end_lsn)); plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::PENDING, LEADER, ObReplicaState::RECONFIRM, EXTRA_INFOS); PALF_EVENT("follower_pending_to_reconfirm", palf_id_, K(ret), K_(self), K(new_leader_epoch), @@ -629,7 +629,7 @@ int LogStateMgr::reconfirm_to_follower_pending_() } else { reset_status_(); update_role_and_state_(FOLLOWER, PENDING); - PALF_EVENT_REPORT_INFO_KV(K_(leader), K_(allow_vote)); + PALF_REPORT_INFO_KV(K_(leader), K_(allow_vote)); plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::RECONFIRM, FOLLOWER, ObReplicaState::PENDING, EXTRA_INFOS); PALF_EVENT("reconfirm_to_follower_pending", palf_id_, K_(self), K_(leader), "is_allow_vote", @@ -663,7 +663,7 @@ int LogStateMgr::reconfirm_to_leader_active_() } const int64_t reconfirm_to_active_cost = ObTimeUtility::current_time() - reconfirm_start_time_us_; PALF_EVENT("reconfirm_to_leader_active end", palf_id_, K(ret), K_(self), K(reconfirm_to_active_cost), K_(role), K_(state)); - PALF_EVENT_REPORT_INFO_KV(K(reconfirm_stage_cost), K(reconfirm_to_active_cost)); + PALF_REPORT_INFO_KV(K(reconfirm_stage_cost), K(reconfirm_to_active_cost)); plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::RECONFIRM, LEADER, ObReplicaState::ACTIVE, EXTRA_INFOS); } @@ -690,7 +690,7 @@ int LogStateMgr::leader_active_to_follower_pending_() update_role_and_state_(FOLLOWER, PENDING); } - PALF_EVENT_REPORT_INFO_KV(K_(pending_end_lsn)); + PALF_REPORT_INFO_KV(K_(pending_end_lsn)); plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::ACTIVE, FOLLOWER, ObReplicaState::PENDING, EXTRA_INFOS); PALF_EVENT("leader_active_to_follower_pending_", palf_id_, K(ret), K_(self), K_(role), K_(state), K_(pending_end_lsn)); diff --git a/src/logservice/palf/log_task.cpp b/src/logservice/palf/log_task.cpp index 05e07d0e2..b1fed4862 100644 --- a/src/logservice/palf/log_task.cpp +++ b/src/logservice/palf/log_task.cpp @@ -138,9 +138,12 @@ LogTask::LogTask() header_(), ref_cnt_(0), log_cnt_(0), - gen_ts_(0), - submit_ts_(0), - flushed_ts_(0), + gen_ts_(OB_INVALID_TIMESTAMP), + freeze_ts_(OB_INVALID_TIMESTAMP), + submit_ts_(OB_INVALID_TIMESTAMP), + first_ack_ts_(OB_INVALID_TIMESTAMP), + flushed_ts_(OB_INVALID_TIMESTAMP), + committed_ts_(OB_INVALID_TIMESTAMP), lock_() { reset(); @@ -161,9 +164,12 @@ void LogTask::reset() header_.reset(); ref_cnt_ = 0; log_cnt_ = 0; - gen_ts_ = 0; - submit_ts_ = 0; - flushed_ts_ = 0; + gen_ts_ = OB_INVALID_TIMESTAMP; + freeze_ts_ = OB_INVALID_TIMESTAMP; + submit_ts_ = OB_INVALID_TIMESTAMP; + first_ack_ts_ = OB_INVALID_TIMESTAMP; + flushed_ts_ = OB_INVALID_TIMESTAMP; + committed_ts_ = OB_INVALID_TIMESTAMP; } bool LogTask::can_be_slid() @@ -248,6 +254,7 @@ int LogTask::set_group_header(const LSN &lsn, const SCN &scn, const LogGroupEntr PALF_LOG(WARN, "log_task has been valid", K(ret), K(lsn), K(scn), K(group_entry_header), KPC(this)); } else { header_.begin_lsn_ = lsn; + header_.end_lsn_ = lsn + group_entry_header.get_serialize_size() + group_entry_header.get_data_len(); header_.log_id_ = group_entry_header.get_log_id(); header_.is_padding_log_ = group_entry_header.is_padding_log(); header_.proposal_id_ = group_entry_header.get_log_proposal_id(); // leader's proposal_id when generate this log @@ -417,14 +424,30 @@ int64_t LogTask::ref(const int64_t val, const bool is_append_log) return ATOMIC_AAF(&ref_cnt_, val); } +void LogTask::set_freeze_ts(const int64_t ts) +{ + // freeze ts may be setted more than once + ATOMIC_CAS(&freeze_ts_, OB_INVALID_TIMESTAMP, ts); +} + void LogTask::set_submit_ts(const int64_t ts) { ATOMIC_STORE(&submit_ts_, ts); } +void LogTask::set_first_ack_ts(const int64_t ts) +{ + ATOMIC_STORE(&first_ack_ts_, ts); +} + void LogTask::set_flushed_ts(const int64_t ts) { ATOMIC_STORE(&flushed_ts_, ts); } + +void LogTask::set_committed_ts(const int64_t ts) +{ + ATOMIC_STORE(&committed_ts_, ts); +} } // namespace palf } // namespace oceanbase diff --git a/src/logservice/palf/log_task.h b/src/logservice/palf/log_task.h index 915487bac..d2192554f 100644 --- a/src/logservice/palf/log_task.h +++ b/src/logservice/palf/log_task.h @@ -138,13 +138,25 @@ public: LSN get_begin_lsn() const { return header_.begin_lsn_; } LSN get_end_lsn() const { return header_.end_lsn_; } int64_t get_accum_checksum() const { return header_.accum_checksum_; } + void set_freeze_ts(const int64_t ts); void set_submit_ts(const int64_t ts); + void set_first_ack_ts(const int64_t ts); void set_flushed_ts(const int64_t ts); + void set_committed_ts(const int64_t ts); int64_t get_gen_ts() const { return ATOMIC_LOAD(&(gen_ts_)); } + int64_t get_freeze_ts() const { return ATOMIC_LOAD(&(freeze_ts_)); } int64_t get_submit_ts() const { return ATOMIC_LOAD(&(submit_ts_)); } + int64_t get_first_ack_ts() const { return ATOMIC_LOAD(&(first_ack_ts_)); } int64_t get_flushed_ts() const { return ATOMIC_LOAD(&(flushed_ts_)); } - TO_STRING_KV(K_(header), K_(state_map), K_(ref_cnt), K_(gen_ts), "submit_wait_time", submit_ts_ - gen_ts_, - K_(submit_ts), "flush_cost", ((flushed_ts_ - submit_ts_) < 0 ? 0 : (flushed_ts_ - submit_ts_)), K_(flushed_ts)); + int64_t get_committed_ts() const { return ATOMIC_LOAD(&(committed_ts_)); } + TO_STRING_KV(K_(header), K_(state_map), K_(ref_cnt), + K_(gen_ts), K_(freeze_ts), K_(submit_ts), K_(flushed_ts), K_(first_ack_ts), K_(committed_ts), + "gen_to_freeze cost time", freeze_ts_ - gen_ts_, + "gen_to_submit cost time", submit_ts_ - gen_ts_, + "submit_to_flush cost time", ((flushed_ts_ - submit_ts_) < 0 ? 0 : (flushed_ts_ - submit_ts_)), + "submit_to_first_ack cost time", ((first_ack_ts_ - submit_ts_) < 0 ? 0 : (first_ack_ts_ - submit_ts_)), + "submit_to_commit cost time", ((committed_ts_ - submit_ts_) < 0 ? 0 : (committed_ts_ - submit_ts_)) + ); private: int try_freeze_(const LSN &end_lsn); private: @@ -153,8 +165,11 @@ private: int64_t ref_cnt_; int64_t log_cnt_; // log_entry count mutable int64_t gen_ts_; + mutable int64_t freeze_ts_; mutable int64_t submit_ts_; + mutable int64_t first_ack_ts_; mutable int64_t flushed_ts_; + mutable int64_t committed_ts_; mutable common::ObLatch lock_; }; } // end namespace palf diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index 387684e37..28405ea43 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -148,6 +148,9 @@ int PalfHandleImpl::init(const int64_t palf_id, alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer))) { PALF_LOG(WARN, "PalfHandleImpl do_init_mem_ failed", K(ret), K(palf_id)); } else { + PALF_REPORT_INFO_KV(K_(palf_id)); + append_cost_stat_.set_extra_info(EXTRA_INFOS); + flush_cb_cost_stat_.set_extra_info(EXTRA_INFOS); last_accum_statistic_time_ = ObTimeUtility::current_time(); PALF_EVENT("PalfHandleImpl init success", palf_id_, K(ret), K(self), K(access_mode), K(palf_base_info), K(replica_type), K(log_dir), K(log_meta), K(palf_epoch)); @@ -1672,7 +1675,7 @@ int PalfHandleImpl::inner_append_log(const LSN &lsn, const int64_t time_cost = now - begin_ts; append_cost_stat_.stat(time_cost); if (time_cost >= 5 * 1000) { - PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "write log cost too much time", K(ret), KPC(this), K(lsn), K(scn), K(time_cost)); + PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "write log cost too much time", K(ret), KPC(this), K(lsn), K(scn), K(accum_size), K(time_cost)); } if (palf_reach_time_interval(PALF_STAT_PRINT_INTERVAL_US, last_accum_statistic_time_)) { PALF_LOG(INFO, "[PALF STAT INNER APPEND LOG]", KPC(this), K(accum_size)); @@ -1697,15 +1700,15 @@ int PalfHandleImpl::inner_append_log(const LSNArray &lsn_array, int64_t count = lsn_array.count(); int64_t accum_size = 0, curr_size = 0; for (int64_t i = 0; i < count; i++) { - curr_size = write_buf_array[i]->get_total_size(); - accum_size = ATOMIC_AAF(&accum_write_log_size_, curr_size); + curr_size += write_buf_array[i]->get_total_size(); } + accum_size = ATOMIC_AAF(&accum_write_log_size_, curr_size); const int64_t now = ObTimeUtility::current_time(); const int64_t time_cost = now - begin_ts; append_cost_stat_.stat(time_cost); if (time_cost > 10 * 1000) { PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "write log cost too much time", K(ret), KPC(this), K(lsn_array), - K(scn_array), K(time_cost)); + K(scn_array), K(curr_size), K(time_cost)); } if (palf_reach_time_interval(PALF_STAT_PRINT_INTERVAL_US, last_accum_statistic_time_)) { PALF_LOG(INFO, "[PALF STAT INNER APPEND LOG]", KPC(this), K(accum_size));