diff --git a/build.sh b/build.sh --- a/build.sh +++ b/build.sh @@ -188,7 +188,7 @@ function build set -- "${BUILD_ARGS[@]}" case "x$1" in xrelease) - do_build "$@" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DOB_USE_LLD=$LLD_OPTION + do_build "$@" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DENABLE_AUTO_FDO=ON -DOB_USE_LLD=$LLD_OPTION ;; xrelease_no_unity) do_build "$@" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DOB_USE_LLD=$LLD_OPTION -DOB_ENABLE_UNITY=OFF diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -968,6 +968,9 @@ PCODE_DEF(OB_LOG_FORCE_CLEAR_ARB_CLUSTER_INFO, 0x1521) PCODE_DEF(OB_LOG_GET_ARB_MEMBER_INFO, 0x1522) #endif PCODE_DEF(OB_LOG_BATCH_FETCH_RESP, 0X1523) +PCODE_DEF(OB_LOG_CREATE_REPLICA_CMD, 0x1524) +PCODE_DEF(OB_LOG_SUBMIT_LOG_CMD, 0x1525) +PCODE_DEF(OB_LOG_SUBMIT_LOG_CMD_RESP, 0x1526) // 1531-1550 for obesi // PCODE_DEF(OB_ESI_IS_EXIST, 0x1531) diff --git a/src/logservice/applyservice/ob_log_apply_service.cpp b/src/logservice/applyservice/ob_log_apply_service.cpp --- a/src/logservice/applyservice/ob_log_apply_service.cpp +++ b/src/logservice/applyservice/ob_log_apply_service.cpp @@ -28,6 +28,9 @@ using namespace storage; using namespace share; namespace logservice { +int64_t p_append_cnt = 0; +int64_t p_log_body_size = 0; +int64_t p_rt = 0; //---------------ObApplyFsCb---------------// ObApplyFsCb::ObApplyFsCb() : apply_status_(NULL) @@ -482,6 +485,26 @@ int ObApplyStatus::try_handle_cb_queue(ObApplyServiceQueueTask *cb_queue, scn = cb->__get_scn(); get_cb_trace_(cb, append_start_time, append_finish_time, cb_first_handle_time, cb_start_time); CLOG_LOG(TRACE, "cb on_success", K(lsn), K(scn), KP(link->next_), KPC(cb_queue), KPC(this)); + + ATOMIC_INC(&p_append_cnt); + ATOMIC_FAA(&p_log_body_size, cb->get_log_size()); + int64_t tmp_rt = ObTimeUtility::current_time() - cb->get_append_start_ts(); + ATOMIC_FAA(&p_rt, tmp_rt); + + if (REACH_TIME_INTERVAL(1000 *1000)) { + int64_t l_append_cnt = ATOMIC_LOAD(&p_append_cnt); + if (l_append_cnt == 0) l_append_cnt = 1; + int64_t l_log_body_size = ATOMIC_LOAD(&p_log_body_size); + int64_t l_rt = ATOMIC_LOAD(&p_rt); + + ATOMIC_STORE(&p_append_cnt, 0); + ATOMIC_STORE(&p_log_body_size, 0); + ATOMIC_STORE(&p_rt, 0); + + CLOG_LOG(ERROR, "result:", K(l_append_cnt), K(l_log_body_size), K(l_rt), + "avg_body_size", l_log_body_size/l_append_cnt, "avg_rt", l_rt/l_append_cnt); + } + if (OB_FAIL(cb->on_success())) { // 不处理此类失败情况 CLOG_LOG(ERROR, "cb on_success failed", KP(cb), K(ret), KPC(this)); @@ -1044,7 +1067,7 @@ int ObLogApplyService::init(PalfEnv *palf_env, CLOG_LOG(WARN, "invalid argument", K(ret), KP(palf_env), K(ls_adapter)); } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::ApplyService, tg_id_))) { CLOG_LOG(WARN, "fail to create thread group", K(ret)); - } else if (OB_FAIL(MTL_REGISTER_THREAD_DYNAMIC(0.5, tg_id_))) { + } else if (OB_FAIL(MTL_REGISTER_THREAD_DYNAMIC(1, tg_id_))) { CLOG_LOG(WARN, "MTL_REGISTER_THREAD_DYNAMIC failed", K(ret), K(tg_id_)); } else if (OB_FAIL(apply_status_map_.init("APPLY_STATUS", MAP_TENANT_ID))) { CLOG_LOG(WARN, "apply_status_map_ init error", K(ret)); diff --git a/src/logservice/ob_append_callback.h b/src/logservice/ob_append_callback.h --- a/src/logservice/ob_append_callback.h +++ b/src/logservice/ob_append_callback.h @@ -60,6 +60,7 @@ public: append_start_ts_ = OB_INVALID_TIMESTAMP; append_finish_ts_ = OB_INVALID_TIMESTAMP; cb_first_handle_ts_ = OB_INVALID_TIMESTAMP; + log_size_ = 0; } virtual int on_success() = 0; virtual int on_failure() = 0; @@ -69,10 +70,16 @@ public: int64_t get_append_start_ts() const { return append_start_ts_; } int64_t get_append_finish_ts() const { return append_finish_ts_; } int64_t get_cb_first_handle_ts() const { return cb_first_handle_ts_; } + void set_log_size(const int64_t size) + { + log_size_ = size; + } + int64_t get_log_size() const { return log_size_; } public: int64_t append_start_ts_; //提交到palf的起始时刻 int64_t append_finish_ts_; //palf提交完成时刻,即提交到apply service起始时刻 int64_t cb_first_handle_ts_; //cb第一次被处理的时刻,不一定调用on_success + int64_t log_size_; }; } // end namespace logservice diff --git a/src/logservice/ob_log_handler.cpp b/src/logservice/ob_log_handler.cpp --- a/src/logservice/ob_log_handler.cpp +++ b/src/logservice/ob_log_handler.cpp @@ -237,6 +237,7 @@ int ObLogHandler::append(const void *buffer, cb->set_append_finish_ts(ObTimeUtility::fast_current_time()); cb->__set_lsn(lsn); cb->__set_scn(scn); + cb->set_log_size(nbytes); ret = apply_status_->push_append_cb(cb); CLOG_LOG(TRACE, "palf_handle_ push_append_cb success", K(lsn), K(scn), K(ret), K(id_)); } diff --git a/src/logservice/palf/log_block_handler.cpp b/src/logservice/palf/log_block_handler.cpp --- a/src/logservice/palf/log_block_handler.cpp +++ b/src/logservice/palf/log_block_handler.cpp @@ -437,12 +437,26 @@ int LogBlockHandler::inner_writev_once_(const offset_t offset, return ret; } +int64_t p_io_cnt = 0; +int64_t p_log_size = 0; + int LogBlockHandler::inner_write_impl_(const int fd, const char *buf, const int64_t count, const int64_t offset) { int ret = OB_SUCCESS; int64_t start_ts = ObTimeUtility::fast_current_time(); int64_t write_size = 0; int64_t time_interval = OB_INVALID_TIMESTAMP; + ATOMIC_INC(&p_io_cnt); + ATOMIC_FAA(&p_log_size, count); + if (REACH_TIME_INTERVAL(1000 * 1000)) { + int64_t l_io_cnt = ATOMIC_LOAD(&p_io_cnt); + int64_t l_log_size = ATOMIC_LOAD(&p_log_size); + + ATOMIC_STORE(&p_io_cnt, 0); + ATOMIC_STORE(&p_log_size, 0); + + CLOG_LOG(ERROR, "io result:", K(l_io_cnt), K(l_log_size), "avg_size", l_log_size/l_io_cnt); + } do { if (count != (write_size = ob_pwrite(fd, buf, count, offset))) { if (palf_reach_time_interval(1000 * 1000, time_interval)) { diff --git a/src/logservice/palf/log_define.h b/src/logservice/palf/log_define.h --- a/src/logservice/palf/log_define.h +++ b/src/logservice/palf/log_define.h @@ -102,7 +102,7 @@ const int64_t PALF_LEADER_RECONFIRM_SYNC_TIMEOUT_US = 10 * 1000 * 1000L; // const int64_t PREPARE_LOG_BUFFER_SIZE = 2048; const int64_t PALF_LEADER_ACTIVE_SYNC_TIMEOUT_US = 10 * 1000 * 1000L; // 10s const int32_t PALF_MAX_REPLAY_TIMEOUT = 500 * 1000; -const int32_t DEFAULT_LOG_LOOP_INTERVAL_US = 100 * 1000; // 100ms +const int32_t DEFAULT_LOG_LOOP_INTERVAL_US = 500; // 100ms const int32_t LOG_LOOP_INTERVAL_FOR_PERIOD_FREEZE_US = 1 * 1000; // 1ms const int64_t PALF_SLIDING_WINDOW_SIZE = 1 << 11; // must be 2^n(n>0), default 2^11 = 2048 const int64_t PALF_MAX_LEADER_SUBMIT_LOG_COUNT = PALF_SLIDING_WINDOW_SIZE / 2; // max number of concurrent submitting group log in leader diff --git a/src/logservice/palf/log_io_task.cpp b/src/logservice/palf/log_io_task.cpp --- a/src/logservice/palf/log_io_task.cpp +++ b/src/logservice/palf/log_io_task.cpp @@ -623,7 +623,8 @@ int BatchLogIOFlushLogTask::push_flush_cb_to_thread_pool_(int tg_id, IPalfEnvImp PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret), KPC(this)); } else { const int64_t current_time = ObTimeUtility::current_time(); - for (int64_t i = 0; i < count && OB_SUCC(ret); i++) { + // for (int64_t i = 0; i < count && OB_SUCC(ret); i++) { + int64_t i = count - 1; LogIOFlushLogTask *io_task = io_task_array_[i]; if (NULL == io_task) { PALF_LOG(WARN, "io_task is nullptr, may be its' epoch has changed", K(ret), KP(io_task), @@ -637,6 +638,9 @@ int BatchLogIOFlushLogTask::push_flush_cb_to_thread_pool_(int tg_id, IPalfEnvImp // again. io_task_array_[i] = NULL; } + // } + for (int64_t i = 0; i < count && OB_SUCC(ret); i++) { + io_task_array_[i] = NULL; } } return ret; diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -132,7 +132,8 @@ LogSlidingWindow::LogSlidingWindow() accum_log_cnt_(0), accum_group_log_size_(0), last_record_group_log_id_(FIRST_VALID_LOG_ID - 1), - freeze_mode_(FEEDBACK_FREEZE_MODE), + freeze_mode_(PERIOD_FREEZE_MODE), + avg_log_batch_cnt_(0), is_inited_(false) {} @@ -448,6 +449,7 @@ int LogSlidingWindow::submit_log(const char *buf, const int64_t start_log_id = get_start_id(); const int64_t log_id_upper_bound = start_log_id + PALF_MAX_LEADER_SUBMIT_LOG_COUNT - 1; LSN tmp_lsn, lsn_upper_bound; + bool need_freeze_self = false; if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else if (NULL == buf || buf_len <= 0 || buf_len > MAX_LOG_BODY_SIZE || (!ref_scn.is_valid())) { @@ -462,7 +464,7 @@ int LogSlidingWindow::submit_log(const char *buf, } // sw_ cannot submit larger log } else if (OB_FAIL(lsn_allocator_.alloc_lsn_scn(ref_scn, valid_log_size, log_id_upper_bound, lsn_upper_bound, - tmp_lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_size))) { + tmp_lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_size, need_freeze_self))) { PALF_LOG(WARN, "alloc_lsn_scn failed", K(ret), K_(palf_id), K_(self)); } else if (OB_FAIL(leader_wait_sw_slot_ready_(log_id))) { PALF_LOG(WARN, "leader_wait_sw_slot_ready_ failed", K(ret), K_(palf_id), K_(self), K(log_id)); @@ -480,7 +482,7 @@ int LogSlidingWindow::submit_log(const char *buf, K(padding_size), K(is_new_log), K(valid_log_size)); } else if (is_need_handle && FALSE_IT(is_need_handle_next |= is_need_handle)) { } else if (OB_FAIL(generate_new_group_log_(tmp_lsn, log_id, scn, padding_entry_body_size, LOG_PADDING, \ - NULL, padding_entry_body_size, is_need_handle))) { + NULL, padding_entry_body_size, need_freeze_self, is_need_handle))) { PALF_LOG(ERROR, "generate_new_group_log_ failed", K(ret), K_(palf_id), K_(self), K(log_id), K(tmp_lsn), K(padding_size), K(is_new_log), K(valid_log_size)); } else if (is_need_handle && FALSE_IT(is_need_handle_next |= is_need_handle)) { @@ -503,7 +505,7 @@ int LogSlidingWindow::submit_log(const char *buf, PALF_LOG(WARN, "try_freeze_prev_log_ failed", K(ret), K_(palf_id), K_(self), K(log_id)); } else if (is_need_handle && FALSE_IT(is_need_handle_next |= is_need_handle)) { } else if (OB_FAIL(generate_new_group_log_(tmp_lsn, log_id, scn, valid_log_size, LOG_SUBMIT, \ - buf, buf_len, is_need_handle))) { + buf, buf_len, need_freeze_self, is_need_handle))) { PALF_LOG(WARN, "generate_new_group_log_ failed", K(ret), K_(palf_id), K_(self), K(log_id)); } else if (is_need_handle && FALSE_IT(is_need_handle_next |= is_need_handle)) { } else { @@ -673,6 +675,7 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn, const LogType &log_type, const char *log_data, const int64_t data_len, + const bool need_freeze_self, bool &is_need_handle) { int ret = OB_SUCCESS; @@ -712,6 +715,9 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn, } else if (OB_FAIL(log_task->set_initial_header_info(header_info))) { PALF_LOG(WARN, "set_initial_header_info failed", K(ret), K_(palf_id), K_(self), K(log_id), KPC(log_task)); } else { + if (need_freeze_self) { + log_task->set_end_lsn(lsn + log_body_size + LogGroupEntryHeader::HEADER_SER_SIZE); + } // The first log is responsible to try freezing self, if its end_lsn_ has been set by next log. log_task->try_freeze_by_myself(); } @@ -1048,6 +1054,10 @@ int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated) const int64_t avg_group_log_size = total_group_log_size / total_group_log_cnt; PALF_LOG(INFO, "[PALF STAT GROUP LOG INFO]", K_(palf_id), K_(self), "role", role_to_string(role), K(total_group_log_cnt), K(avg_log_batch_cnt), K(total_group_log_size), K(avg_group_log_size)); + if (total_log_cnt > 0) { + const int64_t avg_log_size = total_group_log_size / total_log_cnt; + avg_log_batch_cnt_ = (avg_log_batch_cnt_ * 2 + avg_log_batch_cnt * 8)/10; + } } ATOMIC_STORE(&accum_log_cnt_, 0); ATOMIC_STORE(&accum_group_log_size_, 0); @@ -1235,19 +1245,34 @@ int LogSlidingWindow::check_and_switch_freeze_mode() { int ret = OB_SUCCESS; int64_t total_append_cnt = 0; - for (int i = 0; i < APPEND_CNT_ARRAY_SIZE; ++i) { - total_append_cnt += ATOMIC_LOAD(&append_cnt_array_[i]); - ATOMIC_STORE(&append_cnt_array_[i], 0); - } + // for (int i = 0; i < APPEND_CNT_ARRAY_SIZE; ++i) { + // total_append_cnt += ATOMIC_LOAD(&append_cnt_array_[i]); + // ATOMIC_STORE(&append_cnt_array_[i], 0); + // } + + // for (int i = 0; i < APPEND_CNT_ARRAY_SIZE; ++i) { + // total_append_cnt += ATOMIC_LOAD(&append_cnt_array_[i]); + // ATOMIC_STORE(&append_cnt_array_[i], 0); + // } + // adaptively + const int64_t avg_log_batch_cnt = avg_log_batch_cnt_; + // periodically + // const int64_t avg_log_batch_cnt = 20; + // feedback + // const int64_t avg_log_batch_cnt = 0; + const int64_t SWITCH_BARRIER = 10; + if (FEEDBACK_FREEZE_MODE == freeze_mode_) { - if (total_append_cnt >= APPEND_CNT_LB_FOR_PERIOD_FREEZE) { + // if (total_append_cnt >= APPEND_CNT_LB_FOR_PERIOD_FREEZE) { + if (avg_log_batch_cnt > SWITCH_BARRIER) { freeze_mode_ = PERIOD_FREEZE_MODE; - PALF_LOG(INFO, "switch freeze_mode to period", K_(palf_id), K_(self), K(total_append_cnt)); + PALF_LOG(ERROR, "switch freeze_mode to period", K_(palf_id), K_(self), K(total_append_cnt)); } } else if (PERIOD_FREEZE_MODE == freeze_mode_) { - if (total_append_cnt < APPEND_CNT_LB_FOR_PERIOD_FREEZE) { + // if (total_append_cnt < APPEND_CNT_LB_FOR_PERIOD_FREEZE) { + if (avg_log_batch_cnt <= SWITCH_BARRIER) { freeze_mode_ = FEEDBACK_FREEZE_MODE; - PALF_LOG(INFO, "switch freeze_mode to feedback", K_(palf_id), K_(self), K(total_append_cnt)); + PALF_LOG(ERROR, "switch freeze_mode to feedback", K_(palf_id), K_(self), K(total_append_cnt)); (void) feedback_freeze_last_log_(); } } else {} diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -396,6 +396,7 @@ private: const LogType &log_type, const char *log_data, const int64_t data_len, + const bool need_freeze_self, bool &is_need_handle); int append_to_group_log_(const LSN &lsn, const int64_t log_id, @@ -608,6 +609,7 @@ private: int64_t last_record_group_log_id_; int64_t append_cnt_array_[APPEND_CNT_ARRAY_SIZE]; FreezeMode freeze_mode_; + int64_t avg_log_batch_cnt_; bool is_inited_; private: DISALLOW_COPY_AND_ASSIGN(LogSlidingWindow); diff --git a/src/logservice/palf/lsn_allocator.cpp b/src/logservice/palf/lsn_allocator.cpp --- a/src/logservice/palf/lsn_allocator.cpp +++ b/src/logservice/palf/lsn_allocator.cpp @@ -297,7 +297,8 @@ int LSNAllocator::alloc_lsn_scn(const SCN &base_scn, SCN &scn, bool &is_new_group_log, bool &need_gen_padding_entry, - int64_t &padding_len) + int64_t &padding_len, + bool &need_freeze_self) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -439,6 +440,11 @@ int LSNAllocator::alloc_lsn_scn(const SCN &base_scn, is_next_need_cut = true; } } + // freeze immediately + if (need_freeze_self) { + is_new_group_log = true; + is_next_need_cut = true; + } if (is_new_group_log) { tmp_next_log_id_delta++; } diff --git a/src/logservice/palf/lsn_allocator.h b/src/logservice/palf/lsn_allocator.h --- a/src/logservice/palf/lsn_allocator.h +++ b/src/logservice/palf/lsn_allocator.h @@ -64,7 +64,8 @@ public: share::SCN &scn, bool &is_new_log, bool &need_gen_padding_entry, - int64_t &padding_len); + int64_t &padding_len, + bool &need_freeze_self); // 更新last_lsn和log_timestamp // receive_log/append_disk_log 时调用 int inc_update_last_log_info(const LSN &lsn, const int64_t log_id, const share::SCN &scn); diff --git a/src/share/ob_thread_define.h b/src/share/ob_thread_define.h --- a/src/share/ob_thread_define.h +++ b/src/share/ob_thread_define.h @@ -164,4 +164,5 @@ TG_DEF(SvrStartupHandler, SvrStartupHandler, QUEUE_THREAD, TG_DEF(TenantTTLManager, TTLManager, TIMER) TG_DEF(TenantTabletTTLMgr, TTLTabletMgr, TIMER) TG_DEF(TntSharedTimer, TntSharedTimer, TIMER) +TG_DEF(LogServerTest, LogServerTest, THREAD_POOL, ThreadCountPair(10, 10)) #endif