diff --git a/src/logservice/palf/log_engine.cpp b/src/logservice/palf/log_engine.cpp index 125174be6a..f7f120a208 100644 --- a/src/logservice/palf/log_engine.cpp +++ b/src/logservice/palf/log_engine.cpp @@ -305,6 +305,26 @@ int LogEngine::submit_flush_log_task(const FlushLogCbCtx &flush_log_cb_ctx, return ret; } +int LogEngine::submit_sliding_cb_task(const int cb_pool_tg_id, const LogSlidingCbCtx &sliding_cb_ctx) +{ + int ret = OB_SUCCESS; + LogSlidingCbTask *sliding_cb_task = NULL; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(ERROR, "LogEngine not inited!!!"); + } else if (0 >= cb_pool_tg_id || !sliding_cb_ctx.is_valid()) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(ERROR, "Invalid argument!!!", K(cb_pool_tg_id), K(sliding_cb_ctx)); + } else if (OB_FAIL(generate_sliding_cb_task_(sliding_cb_ctx, sliding_cb_task))) { + PALF_LOG(ERROR, "generate_sliding_cb_task_ failed", K(sliding_cb_ctx)); + } else if (OB_FAIL(push_task_into_cb_thread_pool(cb_pool_tg_id, sliding_cb_task))) { + PALF_LOG(ERROR, "submit sliding_cb_task failed"); + } else { + PALF_LOG(TRACE, "submit_sliding_cb_task success", K(ret), K(cb_pool_tg_id), K(sliding_cb_ctx)); + } + return ret; +} + int LogEngine::submit_flush_prepare_meta_task(const FlushMetaCbCtx &flush_meta_cb_ctx, const LogPrepareMeta &prepare_meta) { @@ -1192,6 +1212,23 @@ int LogEngine::generate_flush_log_task_(const FlushLogCbCtx &flush_log_cb_ctx, return ret; } +int LogEngine::generate_sliding_cb_task_(const LogSlidingCbCtx &sliding_cb_ctx, + LogSlidingCbTask *&sliding_cb_task) +{ + int ret = OB_SUCCESS; + // Be careful to handle the duration of this pointer + sliding_cb_task = NULL; + if (!sliding_cb_ctx.is_valid()) { + ret = OB_INVALID_ARGUMENT; + } else if (NULL == (sliding_cb_task = alloc_mgr_->alloc_log_sliding_cb_task(palf_id_, palf_epoch_))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + PALF_LOG(ERROR, "alloc_log_sliding_cb_task failed"); + } else if (OB_FAIL(sliding_cb_task->init(sliding_cb_ctx))) { + PALF_LOG(ERROR, "init LogSlidingCbTask failed"); + } else { + } + return ret; +} int LogEngine::generate_truncate_log_task_(const TruncateLogCbCtx &truncate_log_cb_ctx, LogIOTruncateLogTask *&truncate_log_task) { diff --git a/src/logservice/palf/log_engine.h b/src/logservice/palf/log_engine.h index 2b006cf0db..f5d4547e3b 100644 --- a/src/logservice/palf/log_engine.h +++ b/src/logservice/palf/log_engine.h @@ -36,10 +36,12 @@ class LogIOWorker; class PalfHandleImpl; class LogIOTask; class LogIOFlushLogTask; +class LogSlidingCbTask; class LogIOTruncateLogTask; class LogIOFlushMetaTask; class LogIOTruncatePrefixBlocksTask; class FlushLogCbCtx; +class LogSlidingCbCtx; class TruncateLogCbCtx; class FlushMetaCbCtx; class TruncatePrefixBlocksCbCtx; @@ -113,6 +115,8 @@ public: const int64_t buf_len); virtual int submit_flush_log_task(const FlushLogCbCtx &flush_log_cb_ctx, const LogWriteBuf &write_buf); + int submit_sliding_cb_task(const int cb_pool_tg_id, + const LogSlidingCbCtx &sliding_cb_ctx); int submit_flush_prepare_meta_task(const FlushMetaCbCtx &flush_meta_cb_ctx, const LogPrepareMeta &prepare_meta); @@ -400,6 +404,8 @@ private: int generate_flush_log_task_(const FlushLogCbCtx &flush_log_cb_ctx, const LogWriteBuf &write_buf, LogIOFlushLogTask *&flush_log_task); + int generate_sliding_cb_task_(const LogSlidingCbCtx &sliding_cb_ctx, + LogSlidingCbTask *&sliding_cb_task); int generate_truncate_log_task_(const TruncateLogCbCtx &truncate_log_cb_ctx, LogIOTruncateLogTask *&truncate_log_task); diff --git a/src/logservice/palf/log_io_task.cpp b/src/logservice/palf/log_io_task.cpp index 2048546d14..82fd8b98ba 100644 --- a/src/logservice/palf/log_io_task.cpp +++ b/src/logservice/palf/log_io_task.cpp @@ -218,6 +218,77 @@ void LogIOFlushLogTask::free_this_(IPalfEnvImpl *palf_env_impl) palf_env_impl->get_log_allocator()->free_log_io_flush_log_task(this); } +LogSlidingCbTask::LogSlidingCbTask(const int64_t palf_id, const int64_t palf_epoch) + : LogIOTask(palf_id, palf_epoch), sliding_cb_ctx_(), is_inited_(false) +{} + +LogSlidingCbTask::~LogSlidingCbTask() +{ + destroy(); +} + +int LogSlidingCbTask::init(const LogSlidingCbCtx &sliding_cb_ctx) +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + PALF_LOG(ERROR, "LogSlidingCbTask has been inited"); + } else if (!sliding_cb_ctx.is_valid()) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(ERROR, "Invaild arguments!!!", K(palf_id_), K(palf_epoch_), K(sliding_cb_ctx)); + } else { + sliding_cb_ctx_ = sliding_cb_ctx; + is_inited_ = true; + PALF_LOG(TRACE, "LogSlidingCbTask init success", K(ret), K(sliding_cb_ctx)); + } + return ret; +} + +void LogSlidingCbTask::destroy() +{ + if (IS_INIT) { + is_inited_ = false; + sliding_cb_ctx_.reset(); + PALF_LOG(TRACE, "LogSlidingCbTask destroy", KP(this)); + } +} + +int LogSlidingCbTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) +{ + UNUSED(tg_id); + UNUSED(palf_env_impl); + return OB_SUCCESS; +} + +// NB: the memory of 'this' will be release +int LogSlidingCbTask::after_consume_(IPalfEnvImpl *palf_env_impl) +{ + int ret = OB_SUCCESS; + int64_t palf_epoch = -1; + IPalfHandleImplGuard guard; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(ERROR, "LogIOFlusLoghTask not inited"); + } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { + PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(palf_id_)); + } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { + PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(palf_id_)); + } else if (palf_epoch != palf_epoch_) { + PALF_LOG(WARN, "palf_epoch has changed, drop task", K(palf_id_), K(palf_epoch), + K_(sliding_cb_ctx)); + } else if (OB_FAIL(guard.get_palf_handle_impl()->file_size_cb(sliding_cb_ctx_))) { + PALF_LOG(ERROR, "file_size_cb failed", K_(sliding_cb_ctx)); + } else {} + // free this object + palf_env_impl->get_log_allocator()->free_log_sliding_cb_task(this); + return ret; +} + +void LogSlidingCbTask::free_this_(IPalfEnvImpl *palf_env_impl) +{ + return; +} + LogIOTruncateLogTask::LogIOTruncateLogTask(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch), truncate_log_cb_ctx_(), is_inited_(false) {} diff --git a/src/logservice/palf/log_io_task.h b/src/logservice/palf/log_io_task.h index 595f84b1c4..dbc282b3b1 100644 --- a/src/logservice/palf/log_io_task.h +++ b/src/logservice/palf/log_io_task.h @@ -32,7 +32,8 @@ enum class LogIOTaskType FLUSH_META_TYPE = 2, TRUNCATE_PREFIX_TYPE = 3, TRUNCATE_LOG_TYPE = 4, - FLASHBACK_LOG_TYPE = 5 + FLASHBACK_LOG_TYPE = 5, + SLIDING_CB_TYPE = 6 }; class LogIOTask; @@ -209,6 +210,25 @@ private: int64_t palf_id_; bool is_inited_; }; + +class LogSlidingCbTask : public LogIOTask { +public: + LogSlidingCbTask(const int64_t palf_id,const int64_t palf_epoch); + ~LogSlidingCbTask(); +public: + int init(const LogSlidingCbCtx &sliding_cb_ctx); + void destroy(); + int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; + // IO cb thread will call this function to call fs cb + int after_consume_(IPalfEnvImpl *palf_env_impl) override final; + LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::SLIDING_CB_TYPE; } + void free_this_(IPalfEnvImpl *palf_env_impl) override final; + INHERIT_TO_STRING_KV("LogSlidingCbTask", LogIOTask, K_(is_inited), K_(sliding_cb_ctx)); +private: + LogSlidingCbCtx sliding_cb_ctx_; + bool is_inited_; +}; + } // end namespace palf } // end namespace oceanbase diff --git a/src/logservice/palf/log_io_task_cb_utils.cpp b/src/logservice/palf/log_io_task_cb_utils.cpp index 93c47bf2d1..a111abee2d 100644 --- a/src/logservice/palf/log_io_task_cb_utils.cpp +++ b/src/logservice/palf/log_io_task_cb_utils.cpp @@ -71,6 +71,45 @@ FlushLogCbCtx& FlushLogCbCtx::operator=(const FlushLogCbCtx &arg) return *this; } +LogSlidingCbCtx::LogSlidingCbCtx() + : palf_id_(INVALID_PALF_ID), + log_end_lsn_(), + log_proposal_id_(INVALID_PROPOSAL_ID) +{} + +LogSlidingCbCtx::LogSlidingCbCtx(const int64_t palf_id, + const LSN &log_end_lsn, + const int64_t log_proposal_id) + : palf_id_(palf_id), + log_end_lsn_(log_end_lsn), + log_proposal_id_(log_proposal_id) +{} + +LogSlidingCbCtx::~LogSlidingCbCtx() +{ + reset(); +} + +bool LogSlidingCbCtx::is_valid() const +{ + return (is_valid_palf_id(palf_id_) && log_end_lsn_.is_valid() && INVALID_PROPOSAL_ID != log_proposal_id_); +} + +void LogSlidingCbCtx::reset() +{ + palf_id_ = INVALID_PALF_ID; + log_end_lsn_.reset(); + log_proposal_id_ = INVALID_PROPOSAL_ID; +} + +LogSlidingCbCtx& LogSlidingCbCtx::operator=(const LogSlidingCbCtx &arg) +{ + palf_id_ = arg.palf_id_; + log_end_lsn_ = arg.log_end_lsn_; + log_proposal_id_ = arg.log_proposal_id_; + return *this; +} + TruncateLogCbCtx::TruncateLogCbCtx() : lsn_() { diff --git a/src/logservice/palf/log_io_task_cb_utils.h b/src/logservice/palf/log_io_task_cb_utils.h index bccb5d4b55..654dc470b0 100644 --- a/src/logservice/palf/log_io_task_cb_utils.h +++ b/src/logservice/palf/log_io_task_cb_utils.h @@ -46,6 +46,20 @@ struct FlushLogCbCtx int64_t begin_ts_; }; +struct LogSlidingCbCtx +{ + LogSlidingCbCtx(); + LogSlidingCbCtx(const int64_t palf_id, const LSN &log_end_lsn, const int64_t log_proposal_id); + ~LogSlidingCbCtx(); + bool is_valid() const; + void reset(); + LogSlidingCbCtx &operator=(const LogSlidingCbCtx &flush_log_cb_ctx); + TO_STRING_KV(K_(palf_id), K_(log_end_lsn), K_(log_proposal_id)); + int64_t palf_id_; + LSN log_end_lsn_; + int64_t log_proposal_id_; +}; + struct TruncateLogCbCtx { TruncateLogCbCtx(const LSN &lsn); TruncateLogCbCtx(); diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 77eb46076e..9e7b7ac33a 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -112,7 +112,6 @@ LogSlidingWindow::LogSlidingWindow() is_rebuilding_(false), last_rebuild_lsn_(), last_record_end_lsn_(PALF_INITIAL_LSN_VAL), - fs_cb_cost_stat_("[PALF STAT FS CB]", 2 * 1000 * 1000), log_life_time_stat_("[PALF STAT LOG LIFETIME]", 2 * 1000 * 1000), log_submit_wait_stat_("[PALF STAT LOG SUBMIT WAIT]", 2 * 1000 * 1000), log_submit_to_slide_cost_stat_("[PALF STAT LOG SLIDE WAIT]", 2 * 1000 * 1000), @@ -121,6 +120,7 @@ LogSlidingWindow::LogSlidingWindow() accum_group_log_size_(0), last_record_group_log_id_(FIRST_VALID_LOG_ID - 1), freeze_mode_(FEEDBACK_FREEZE_MODE), + cb_pool_tg_id_(-1), is_inited_(false) {} @@ -140,6 +140,7 @@ void LogSlidingWindow::destroy() log_engine_ = NULL; mm_ = NULL; mode_mgr_ = NULL; + cb_pool_tg_id_ = -1; } int LogSlidingWindow::flashback(const PalfBaseInfo &palf_base_info, const int64_t palf_id, common::ObILogAllocator *alloc_mgr) @@ -201,7 +202,8 @@ int LogSlidingWindow::init(const int64_t palf_id, palf::PalfFSCbWrapper *palf_fs_cb, common::ObILogAllocator *alloc_mgr, const PalfBaseInfo &palf_base_info, - const bool is_normal_replica) + const bool is_normal_replica, + const int cb_pool_tg_id) { int ret = OB_SUCCESS; const LogInfo &prev_log_info = palf_base_info.prev_log_info_; @@ -214,10 +216,11 @@ int LogSlidingWindow::init(const int64_t palf_id, || NULL == mm || NULL == mode_mgr || NULL == log_engine - || NULL == palf_fs_cb) { + || NULL == palf_fs_cb + || 0 >= cb_pool_tg_id) { ret = OB_INVALID_ARGUMENT; PALF_LOG(WARN, "invalid argumetns", K(ret), K(palf_id), K(self), K(palf_base_info), - KP(state_mgr), KP(mm), KP(mode_mgr), KP(log_engine), KP(palf_fs_cb)); + KP(state_mgr), KP(mm), KP(mode_mgr), KP(log_engine), KP(palf_fs_cb), K(cb_pool_tg_id)); } else if (is_normal_replica && OB_FAIL(do_init_mem_(palf_id, palf_base_info, alloc_mgr))) { PALF_LOG(WARN, "do_init_mem_ failed", K(ret), K(palf_id)); } else { @@ -249,6 +252,7 @@ int LogSlidingWindow::init(const int64_t palf_id, MEMSET(append_cnt_array_, 0, APPEND_CNT_ARRAY_SIZE * sizeof(int64_t)); + cb_pool_tg_id_ = cb_pool_tg_id; is_inited_ = true; LogGroupEntryHeader group_header; LogEntryHeader log_header; @@ -1951,52 +1955,42 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot const int64_t log_submit_ts = log_task->get_submit_ts(); log_task->unlock(); - int tmp_ret = OB_SUCCESS; const int64_t fs_cb_begin_ts = ObTimeUtility::current_time(); - if (OB_SUCCESS != (tmp_ret = palf_fs_cb_->update_end_lsn(palf_id_, log_end_lsn, log_proposal_id))) { - if (OB_EAGAIN == tmp_ret) { - if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) { - PALF_LOG(WARN, "update_end_lsn eagain", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task)); + LogSlidingCbCtx sliding_cb_ctx(palf_id_, log_end_lsn, log_proposal_id); + if (OB_FAIL(log_engine_->submit_sliding_cb_task(cb_pool_tg_id_, sliding_cb_ctx))) { + PALF_LOG(ERROR, "submit_sliding_cb_task failed", K_(palf_id), K_(self), K(log_id), KPC(log_task)); + } + + if (OB_SUCC(ret)) { + const int64_t log_life_time = fs_cb_begin_ts - log_gen_ts; + log_life_time_stat_.stat(log_life_time); + log_submit_wait_stat_.stat(log_submit_ts - log_gen_ts); + log_submit_to_slide_cost_stat_.stat(fs_cb_begin_ts - log_submit_ts); + + if (log_life_time > 100 * 1000) { + if (palf_reach_time_interval(100 * 1000, log_life_long_warn_time_)) { + PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "log_task life cost too much time", K_(palf_id), K_(self), K(log_id), KPC(log_task), + K(fs_cb_begin_ts), K(log_life_time)); } + } + + if (OB_FAIL(checksum_.verify_accum_checksum(log_task_header.data_checksum_, + log_task_header.accum_checksum_))) { + PALF_LOG(ERROR, "verify_accum_checksum failed", K_(palf_id), K_(self), K(log_id), KPC(log_task)); } else { - PALF_LOG(WARN, "update_end_lsn failed", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task)); + // update last_slide_lsn_ + (void) try_update_last_slide_log_info_(log_id, log_max_scn, log_begin_lsn, log_end_lsn, \ + log_proposal_id, log_accum_checksum); } - } - const int64_t fs_cb_cost = ObTimeUtility::current_time() - fs_cb_begin_ts; - fs_cb_cost_stat_.stat(fs_cb_cost); - if (fs_cb_cost > 1 * 1000) { - PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "fs_cb->update_end_lsn() cost too much time", K(tmp_ret), K_(palf_id), K_(self), - K(fs_cb_cost), K(log_id), K(log_begin_lsn), K(log_end_lsn), K(log_proposal_id)); - } - const int64_t log_life_time = fs_cb_begin_ts - log_gen_ts; - log_life_time_stat_.stat(log_life_time); - log_submit_wait_stat_.stat(log_submit_ts - log_gen_ts); - log_submit_to_slide_cost_stat_.stat(fs_cb_begin_ts - log_submit_ts); + MEM_BARRIER(); // ensure last_slide_log_info_ has been updated before fetch log streamingly - if (log_life_time > 100 * 1000) { - if (palf_reach_time_interval(100 * 1000, log_life_long_warn_time_)) { - PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "log_task life cost too much time", K_(palf_id), K_(self), K(log_id), KPC(log_task), - K(fs_cb_begin_ts), K(log_life_time)); + if (OB_SUCC(ret) + && (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm())) { + // Check if need fetch log streamingly, + try_fetch_log_streamingly_(log_end_lsn); } } - - if (OB_FAIL(checksum_.verify_accum_checksum(log_task_header.data_checksum_, - log_task_header.accum_checksum_))) { - PALF_LOG(ERROR, "verify_accum_checksum failed", K_(palf_id), K_(self), K(ret), K(log_id), KPC(log_task)); - } else { - // update last_slide_lsn_ - (void) try_update_last_slide_log_info_(log_id, log_max_scn, log_begin_lsn, log_end_lsn, \ - log_proposal_id, log_accum_checksum); - } - - MEM_BARRIER(); // ensure last_slide_log_info_ has been updated before fetch log streamingly - - if (OB_SUCC(ret) - && (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm())) { - // Check if need fetch log streamingly, - try_fetch_log_streamingly_(log_end_lsn); - } } if (0 == log_id % 100) { PALF_LOG(INFO, "sliding_cb finished", K_(palf_id), K_(self), K(ret), K(log_id)); diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index 5f34be471b..16503d08cc 100644 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -53,6 +53,7 @@ class LogModeMgr; class LogTask; class LogGroupEntry; class TruncateLogCbCtx; +class LogIOTask; enum FetchTriggerType { @@ -131,6 +132,8 @@ private: common::ObMemberList lagged_list_; }; +extern int push_task_into_cb_thread_pool(const int64_t tg_id, LogIOTask *io_task); + class LogSlidingWindow : public ISlidingCallBack { public: @@ -148,7 +151,8 @@ public: palf::PalfFSCbWrapper *palf_fs_cb, common::ObILogAllocator *alloc_mgr, const PalfBaseInfo &palf_base_info, - const bool is_normal_replica); + const bool is_normal_replica, + const int cb_pool_tg_id); virtual int sliding_cb(const int64_t sn, const FixedSlidingWindowSlot *data); virtual int64_t get_max_log_id() const; virtual const share::SCN get_max_scn() const; @@ -501,7 +505,6 @@ private: bool is_rebuilding_; LSN last_rebuild_lsn_; LSN last_record_end_lsn_; - ObMiniStat::ObStatItem fs_cb_cost_stat_; ObMiniStat::ObStatItem log_life_time_stat_; ObMiniStat::ObStatItem log_submit_wait_stat_; ObMiniStat::ObStatItem log_submit_to_slide_cost_stat_; @@ -511,6 +514,7 @@ private: int64_t last_record_group_log_id_; int64_t append_cnt_array_[APPEND_CNT_ARRAY_SIZE]; FreezeMode freeze_mode_; + int cb_pool_tg_id_; bool is_inited_; private: DISALLOW_COPY_AND_ASSIGN(LogSlidingWindow); diff --git a/src/logservice/palf/palf_env_impl.cpp b/src/logservice/palf/palf_env_impl.cpp index ff833a3f9b..1f67161288 100644 --- a/src/logservice/palf/palf_env_impl.cpp +++ b/src/logservice/palf/palf_env_impl.cpp @@ -364,7 +364,7 @@ int PalfEnvImpl::create_palf_handle_impl_(const int64_t palf_id, PALF_LOG(WARN, "alloc palf_handle_impl failed", K(ret)); } else if (OB_FAIL(palf_handle_impl->init(palf_id, access_mode, palf_base_info, replica_type, &fetch_log_engine_, base_dir, log_alloc_mgr_, log_block_pool_, &log_rpc_, &log_io_worker_, - this, self_, &election_timer_, palf_epoch))) { + this, self_, &election_timer_, palf_epoch, cb_thread_pool_.get_tg_id()))) { PALF_LOG(ERROR, "IPalfHandleImpl init failed", K(ret), K(palf_id)); // NB: always insert value into hash map finally. } else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, palf_handle_impl))) { @@ -917,6 +917,7 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id) LSKey hash_map_key(palf_id); bool is_integrity = true; const int64_t palf_epoch = ATOMIC_AAF(&last_palf_epoch_, 1); + const int cb_pool_tg_id = cb_thread_pool_.get_tg_id(); if (0 > (pret = snprintf(base_dir, MAX_PATH_SIZE, "%s/%ld", log_dir_, palf_id))) { ret = OB_ERR_UNEXPECTED; PALF_LOG(WARN, "snprint failed", K(ret), K(pret), K(palf_id)); @@ -924,7 +925,7 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id) ret = OB_ALLOCATE_MEMORY_FAILED; PALF_LOG(WARN, "alloc ipalf_handle_impl failed", K(ret)); } else if (OB_FAIL(tmp_palf_handle_impl->load(palf_id, &fetch_log_engine_, base_dir, log_alloc_mgr_, - log_block_pool_, &log_rpc_, &log_io_worker_, this, self_, &election_timer_, palf_epoch, is_integrity))) { + log_block_pool_, &log_rpc_, &log_io_worker_, this, self_, &election_timer_, palf_epoch, cb_pool_tg_id, is_integrity))) { PALF_LOG(ERROR, "PalfHandleImpl init failed", K(ret), K(palf_id)); } else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, tmp_palf_handle_impl))) { PALF_LOG(WARN, "palf_handle_impl_map_ insert_and_get failed", K(ret), K(palf_id), K(tmp_palf_handle_impl)); diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index aef55779af..c4a278685a 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -69,6 +69,7 @@ PalfHandleImpl::PalfHandleImpl() palf_env_impl_(NULL), append_cost_stat_("[PALF STAT WRITE LOG]", 2 * 1000 * 1000), flush_cb_cost_stat_("[PALF STAT FLUSH CB]", 2 * 1000 * 1000), + fs_cb_cost_stat_("[PALF STAT FS CB]", 2 * 1000 * 1000), replica_meta_lock_(), rebuilding_lock_(), config_change_lock_(), @@ -106,7 +107,8 @@ int PalfHandleImpl::init(const int64_t palf_id, IPalfEnvImpl *palf_env_impl, const common::ObAddr &self, common::ObOccamTimer *election_timer, - const int64_t palf_epoch) + const int64_t palf_epoch, + const int cb_pool_tg_id) { int ret = OB_SUCCESS; int pret = 0; @@ -128,11 +130,12 @@ int PalfHandleImpl::init(const int64_t palf_id, || NULL == palf_env_impl || false == self.is_valid() || NULL == election_timer - || palf_epoch < 0) { + || palf_epoch < 0 + || cb_pool_tg_id <= 0) { ret = OB_INVALID_ARGUMENT; PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(palf_base_info), K(replica_type), K(access_mode), K(log_dir), K(alloc_mgr), K(log_block_pool), K(log_rpc), - K(log_io_worker), K(palf_env_impl), K(self), K(election_timer), K(palf_epoch)); + K(log_io_worker), K(palf_env_impl), K(self), K(election_timer), K(palf_epoch), K(cb_pool_tg_id)); } else if (OB_FAIL(log_meta.generate_by_palf_base_info(palf_base_info, access_mode, replica_type))) { PALF_LOG(WARN, "generate_by_palf_base_info failed", K(ret), K(palf_id), K(palf_base_info), K(access_mode), K(replica_type)); } else if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", log_dir)) && false) { @@ -143,11 +146,11 @@ int PalfHandleImpl::init(const int64_t palf_id, PALF_LOG(WARN, "LogEngine init failed", K(ret), K(palf_id), K(log_dir), K(alloc_mgr), K(log_rpc), K(log_io_worker)); } else if (OB_FAIL(do_init_mem_(palf_id, palf_base_info, log_meta, log_dir, self, fetch_log_engine, - alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer))) { + alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer, cb_pool_tg_id))) { PALF_LOG(WARN, "PalfHandleImpl do_init_mem_ failed", K(ret), K(palf_id)); } else { PALF_EVENT("PalfHandleImpl init success", palf_id_, K(ret), K(self), K(access_mode), K(palf_base_info), - K(replica_type), K(log_dir), K(log_meta), K(palf_epoch)); + K(replica_type), K(log_dir), K(log_meta), K(palf_epoch), K(cb_pool_tg_id)); } return ret; } @@ -168,6 +171,7 @@ int PalfHandleImpl::load(const int64_t palf_id, const common::ObAddr &self, common::ObOccamTimer *election_timer, const int64_t palf_epoch, + const int cb_pool_tg_id, bool &is_integrity) { int ret = OB_SUCCESS; @@ -200,7 +204,7 @@ int PalfHandleImpl::load(const int64_t palf_id, } else if (OB_FAIL(construct_palf_base_info_(max_committed_end_lsn, palf_base_info))) { PALF_LOG(WARN, "construct_palf_base_info_ failed", K(ret), K(palf_id), K(entry_header), K(palf_base_info)); } else if (OB_FAIL(do_init_mem_(palf_id, palf_base_info, log_engine_.get_log_meta(), log_dir, self, - fetch_log_engine, alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer))) { + fetch_log_engine, alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer, cb_pool_tg_id))) { PALF_LOG(WARN, "PalfHandleImpl do_init_mem_ failed", K(ret), K(palf_id)); } else if (OB_FAIL(append_disk_log_to_sw_(max_committed_end_lsn))) { PALF_LOG(WARN, "append_disk_log_to_sw_ failed", K(ret), K(palf_id)); @@ -2258,7 +2262,8 @@ int PalfHandleImpl::do_init_mem_( LogRpc *log_rpc, LogIOWorker *log_io_worker, IPalfEnvImpl *palf_env_impl, - common::ObOccamTimer *election_timer) + common::ObOccamTimer *election_timer, + const int cb_pool_tg_id) { int ret = OB_SUCCESS; int pret = -1; @@ -2276,8 +2281,8 @@ int PalfHandleImpl::do_init_mem_( if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", log_dir)) && false) { ret = OB_ERR_UNEXPECTED; PALF_LOG(ERROR, "error unexpected", K(ret), K(palf_id)); - } else if (OB_FAIL(sw_.init(palf_id, self, &state_mgr_, &config_mgr_, &mode_mgr_, - &log_engine_, &fs_cb_wrapper_, alloc_mgr, palf_base_info, is_normal_replica))) { + } else if (OB_FAIL(sw_.init(palf_id, self, &state_mgr_, &config_mgr_, &mode_mgr_, &log_engine_, + &fs_cb_wrapper_, alloc_mgr, palf_base_info, is_normal_replica, cb_pool_tg_id))) { PALF_LOG(WARN, "sw_ init failed", K(ret), K(palf_id)); } else if (OB_FAIL(election_.init_and_start(palf_id, election_timer, @@ -4064,6 +4069,23 @@ int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn) return ret; } +int PalfHandleImpl::file_size_cb(const LogSlidingCbCtx &sliding_cb_ctx) +{ + int ret = OB_SUCCESS; + const int64_t fs_cb_begin_ts = ObTimeUtility::current_time(); + if (OB_FAIL(fs_cb_wrapper_.update_end_lsn(sliding_cb_ctx.palf_id_, \ + sliding_cb_ctx.log_end_lsn_, sliding_cb_ctx.log_proposal_id_))) { + PALF_LOG(WARN, "fs_cb_wrapper_.update_end_lsn failed", KPC(this), K(sliding_cb_ctx)); + } + const int64_t fs_cb_cost = ObTimeUtility::current_time() - fs_cb_begin_ts; + fs_cb_cost_stat_.stat(fs_cb_cost); + if (fs_cb_cost > 1 * 1000) { + PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "fs_cb_wrapper_.update_end_lsn() cost too much time", + K_(palf_id), K_(self), K(fs_cb_cost), K(sliding_cb_ctx)); + } + return ret; +} + PalfStat::PalfStat() : self_(), palf_id_(INVALID_PALF_ID), diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index 0fc1c54b0f..10d5cec5ef 100644 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -572,6 +572,7 @@ public: virtual int get_palf_epoch(int64_t &palf_epoch) const = 0; virtual int diagnose(PalfDiagnoseInfo &diagnose_info) const = 0; virtual int update_palf_stat() = 0; + virtual int file_size_cb(const LogSlidingCbCtx &sliding_cb_ctx) = 0; DECLARE_PURE_VIRTUAL_TO_STRING; }; @@ -594,7 +595,8 @@ public: IPalfEnvImpl *palf_env_impl, const common::ObAddr &self, common::ObOccamTimer *election_timer, - const int64_t palf_epoch); + const int64_t palf_epoch, + const int cb_pool_tg_id); bool check_can_be_used() const override final; // 重启接口 // 1. 生成迭代器,定位meta_storage和log_storage的终点; @@ -612,6 +614,7 @@ public: const common::ObAddr &self, common::ObOccamTimer *election_timer, const int64_t palf_epoch, + const int cb_pool_tg_id, bool &is_integrity); void destroy(); int start(); @@ -842,6 +845,7 @@ public: const int64_t timeout_us) override final; int diagnose(PalfDiagnoseInfo &diagnose_info) const; int update_palf_stat() override final; + int file_size_cb(const LogSlidingCbCtx &sliding_cb_ctx); TO_STRING_KV(K_(palf_id), K_(self), K_(has_set_deleted)); private: int do_init_mem_(const int64_t palf_id, @@ -854,7 +858,8 @@ private: LogRpc *log_rpc, LogIOWorker *log_io_worker, IPalfEnvImpl *palf_env_impl, - common::ObOccamTimer *election_timer); + common::ObOccamTimer *election_timer, + const int cb_pool_tg_id); int after_flush_prepare_meta_(const int64_t &proposal_id); int after_flush_config_change_meta_(const int64_t proposal_id, const LogConfigVersion &config_version); int after_flush_mode_meta_(const int64_t proposal_id, @@ -1034,6 +1039,7 @@ private: bool diskspace_enough_; ObMiniStat::ObStatItem append_cost_stat_; ObMiniStat::ObStatItem flush_cb_cost_stat_; + ObMiniStat::ObStatItem fs_cb_cost_stat_; // a spin lock for read/write replica_meta mutex SpinLock replica_meta_lock_; SpinLock rebuilding_lock_; diff --git a/src/share/allocator/ob_tenant_mutil_allocator.cpp b/src/share/allocator/ob_tenant_mutil_allocator.cpp index 716a271cd3..15313af2fb 100644 --- a/src/share/allocator/ob_tenant_mutil_allocator.cpp +++ b/src/share/allocator/ob_tenant_mutil_allocator.cpp @@ -29,6 +29,7 @@ namespace common ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id) : tenant_id_(tenant_id), total_limit_(INT64_MAX), pending_replay_mutator_size_(0), LOG_IO_FLUSH_LOG_TASK_SIZE(sizeof(palf::LogIOFlushLogTask)), + LOG_SLIDING_CB_TASK_SIZE(sizeof(palf::LogSlidingCbTask)), LOG_IO_TRUNCATE_LOG_TASK_SIZE(sizeof(palf::LogIOTruncateLogTask)), LOG_IO_FLUSH_META_TASK_SIZE(sizeof(palf::LogIOFlushMetaTask)), LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE(sizeof(palf::LogIOTruncatePrefixBlocksTask)), @@ -43,6 +44,7 @@ ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id) inner_table_replay_task_alloc_(ObMemAttr(tenant_id, ObModIds::OB_LOG_REPLAY_ENGINE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, inner_table_replay_blk_alloc_), user_table_replay_task_alloc_(ObMemAttr(tenant_id, ObModIds::OB_LOG_REPLAY_ENGINE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, user_table_replay_blk_alloc_), log_io_flush_log_task_alloc_(LOG_IO_FLUSH_LOG_TASK_SIZE, ObMemAttr(tenant_id, "FlushLog"), choose_blk_size(LOG_IO_FLUSH_LOG_TASK_SIZE), clog_blk_alloc_, this), + log_sliding_cb_task_alloc_(LOG_SLIDING_CB_TASK_SIZE, ObMemAttr(tenant_id, "SlidingCb"), choose_blk_size(LOG_SLIDING_CB_TASK_SIZE), clog_blk_alloc_, this), log_io_truncate_log_task_alloc_(LOG_IO_TRUNCATE_LOG_TASK_SIZE, ObMemAttr(tenant_id, "TruncateLog"), choose_blk_size(LOG_IO_TRUNCATE_LOG_TASK_SIZE), clog_blk_alloc_, this), log_io_flush_meta_task_alloc_(LOG_IO_FLUSH_META_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_FLUSH_META_TASK_SIZE), clog_blk_alloc_, this), log_io_truncate_prefix_blocks_task_alloc_(LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE), clog_blk_alloc_, this), @@ -82,6 +84,7 @@ void ObTenantMutilAllocator::try_purge() inner_table_replay_task_alloc_.purge_extra_cached_block(0); user_table_replay_task_alloc_.purge_extra_cached_block(0); log_io_flush_log_task_alloc_.purge_extra_cached_block(0); + log_sliding_cb_task_alloc_.purge_extra_cached_block(0); log_io_truncate_log_task_alloc_.purge_extra_cached_block(0); log_io_flush_meta_task_alloc_.purge_extra_cached_block(0); log_io_truncate_prefix_blocks_task_alloc_.purge_extra_cached_block(0); @@ -180,6 +183,28 @@ void ObTenantMutilAllocator::free_log_io_flush_log_task(LogIOFlushLogTask *ptr) } } +LogSlidingCbTask *ObTenantMutilAllocator::alloc_log_sliding_cb_task( + const int64_t palf_id, const int64_t palf_epoch) +{ + LogSlidingCbTask *ret_ptr = NULL; + void *ptr = log_sliding_cb_task_alloc_.alloc(); + if (NULL != ptr) { + ret_ptr = new(ptr)LogSlidingCbTask(palf_id, palf_epoch); + ATOMIC_INC(&flying_sliding_cb_task_); + } + return ret_ptr; +} + +void ObTenantMutilAllocator::free_log_sliding_cb_task(LogSlidingCbTask *ptr) +{ + if (OB_LIKELY(NULL != ptr)) { + ptr->~LogSlidingCbTask(); + log_sliding_cb_task_alloc_.free(ptr); + ATOMIC_DEC(&flying_sliding_cb_task_); + } +} + + LogIOTruncateLogTask *ObTenantMutilAllocator::alloc_log_io_truncate_log_task( const int64_t palf_id, const int64_t palf_epoch) { diff --git a/src/share/allocator/ob_tenant_mutil_allocator.h b/src/share/allocator/ob_tenant_mutil_allocator.h index 6e0cffa9d8..4cdf3381b1 100644 --- a/src/share/allocator/ob_tenant_mutil_allocator.h +++ b/src/share/allocator/ob_tenant_mutil_allocator.h @@ -25,6 +25,7 @@ namespace oceanbase namespace palf { class LogIOFlushLogTask; +class LogSlidingCbTask; class LogIOTruncateLogTask; class LogIOFlushMetaTask; class LogIOTruncatePrefixBlocksTask; @@ -43,7 +44,7 @@ class ObTraceProfile; class ObILogAllocator : public ObIAllocator { public: - ObILogAllocator() : flying_log_task_(0), flying_meta_task_(0) {} + ObILogAllocator() : flying_log_task_(0), flying_meta_task_(0), flying_sliding_cb_task_(0) {} virtual ~ObILogAllocator() {} public: @@ -55,6 +56,8 @@ public: virtual const ObBlockAllocMgr &get_clog_blk_alloc_mgr() const = 0; virtual palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0; virtual void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr) = 0; + virtual palf::LogSlidingCbTask *alloc_log_sliding_cb_task(const int64_t palf_id, const int64_t palf_epoch) = 0; + virtual void free_log_sliding_cb_task(palf::LogSlidingCbTask *ptr) = 0; virtual palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0; virtual void free_log_io_truncate_log_task(palf::LogIOTruncateLogTask *ptr) = 0; virtual palf::LogIOFlushMetaTask *alloc_log_io_flush_meta_task(const int64_t palf_id, const int64_t palf_epoch) = 0; @@ -74,6 +77,7 @@ public: protected: int64_t flying_log_task_; int64_t flying_meta_task_; + int64_t flying_sliding_cb_task_; }; // Interface for ReplayEngine module @@ -137,6 +141,8 @@ public: // V4.0 palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch); void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr); + palf::LogSlidingCbTask *alloc_log_sliding_cb_task(const int64_t palf_id, const int64_t palf_epoch); + void free_log_sliding_cb_task(palf::LogSlidingCbTask *ptr); palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch); void free_log_io_truncate_log_task(palf::LogIOTruncateLogTask *ptr); palf::LogIOFlushMetaTask *alloc_log_io_flush_meta_task(const int64_t palf_id, const int64_t palf_epoch); @@ -157,6 +163,7 @@ private: int64_t total_limit_; int64_t pending_replay_mutator_size_; const int LOG_IO_FLUSH_LOG_TASK_SIZE; + const int LOG_SLIDING_CB_TASK_SIZE; const int LOG_IO_TRUNCATE_LOG_TASK_SIZE; const int LOG_IO_FLUSH_META_TASK_SIZE; const int LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE; @@ -171,6 +178,7 @@ private: ObVSliceAlloc inner_table_replay_task_alloc_; ObVSliceAlloc user_table_replay_task_alloc_; ObSliceAlloc log_io_flush_log_task_alloc_; + ObSliceAlloc log_sliding_cb_task_alloc_; ObSliceAlloc log_io_truncate_log_task_alloc_; ObSliceAlloc log_io_flush_meta_task_alloc_; ObSliceAlloc log_io_truncate_prefix_blocks_task_alloc_; diff --git a/unittest/logservice/test_log_sliding_window.cpp b/unittest/logservice/test_log_sliding_window.cpp index 8a42f9e6a2..67031bbf10 100644 --- a/unittest/logservice/test_log_sliding_window.cpp +++ b/unittest/logservice/test_log_sliding_window.cpp @@ -148,19 +148,19 @@ TEST_F(TestLogSlidingWindow, test_init) gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, NULL, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, NULL, &palf_fs_cb_, NULL, base_info, true)); + &mock_mm_, &mock_mode_mgr_, NULL, &palf_fs_cb_, NULL, base_info, true, 1)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - NULL, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + NULL, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, NULL, NULL, &palf_fs_cb_, NULL, base_info, true)); + &mock_mm_, NULL, NULL, &palf_fs_cb_, NULL, base_info, true, 1)); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); // init twice EXPECT_EQ(OB_INIT_TWICE, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); } TEST_F(TestLogSlidingWindow, test_private_func_batch_01) @@ -175,7 +175,7 @@ TEST_F(TestLogSlidingWindow, test_private_func_batch_01) gen_default_palf_base_info_(base_info); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); log_id = 10 + PALF_SLIDING_WINDOW_SIZE; EXPECT_EQ(false, log_sw_.can_receive_larger_log_(log_id)); EXPECT_EQ(false, log_sw_.leader_can_submit_larger_log_(log_id)); @@ -194,7 +194,7 @@ TEST_F(TestLogSlidingWindow, test_to_follower_pending) gen_default_palf_base_info_(base_info); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); char *buf = data_buf_; int64_t buf_len = 1 * 1024 * 1024; share::SCN ref_scn; @@ -221,7 +221,7 @@ TEST_F(TestLogSlidingWindow, test_fetch_log) gen_default_palf_base_info_(base_info); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); prev_lsn.val_ = 1; EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.try_fetch_log(fetch_log_type, prev_lsn, fetch_start_lsn, fetch_start_log_id)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.try_fetch_log_for_reconfirm(dest, fetch_end_lsn, is_fetched)); @@ -244,7 +244,7 @@ TEST_F(TestLogSlidingWindow, test_report_log_task_trace) gen_default_palf_base_info_(base_info); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); EXPECT_EQ(OB_SUCCESS, log_sw_.report_log_task_trace(1)); char *buf = data_buf_; int64_t buf_len = 2 * 1024 * 1024; @@ -265,7 +265,7 @@ TEST_F(TestLogSlidingWindow, test_set_location_cache_cb) gen_default_palf_base_info_(base_info); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.set_location_cache_cb(NULL)); EXPECT_EQ(OB_SUCCESS, log_sw_.set_location_cache_cb(&cb)); EXPECT_EQ(OB_NOT_SUPPORTED, log_sw_.set_location_cache_cb(&cb)); @@ -278,7 +278,7 @@ TEST_F(TestLogSlidingWindow, test_reset_location_cache_cb) gen_default_palf_base_info_(base_info); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); EXPECT_EQ(OB_SUCCESS, log_sw_.reset_location_cache_cb()); } @@ -295,7 +295,7 @@ TEST_F(TestLogSlidingWindow, test_submit_log) share::SCN scn; EXPECT_EQ(OB_NOT_INIT, log_sw_.submit_log(buf, buf_len, ref_scn, lsn, scn)); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_log(NULL, buf_len, ref_scn, lsn, scn)); buf_len = 0; EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_log(buf, buf_len, ref_scn, lsn, scn)); @@ -327,7 +327,7 @@ TEST_F(TestLogSlidingWindow, test_submit_group_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); mock_state_mgr_.mock_proposal_id_ = 100; LSN lsn(10); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_group_log(lsn, NULL, 1024)); @@ -386,7 +386,7 @@ TEST_F(TestLogSlidingWindow, test_receive_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); char *buf = data_buf_; int64_t buf_len = 2 * 1024 * 1024; @@ -549,7 +549,7 @@ TEST_F(TestLogSlidingWindow, test_after_flush_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); int64_t curr_proposal_id = 10; // set default config meta @@ -612,7 +612,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); int64_t curr_proposal_id = 10; mock_state_mgr_.mock_proposal_id_ = curr_proposal_id; @@ -720,7 +720,7 @@ TEST_F(TestLogSlidingWindow, test_ack_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); int64_t curr_proposal_id = 10; mock_state_mgr_.mock_proposal_id_ = curr_proposal_id; log_sw_.self_ = self_; @@ -778,7 +778,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_for_rebuild) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); int64_t curr_proposal_id = 10; mock_state_mgr_.mock_proposal_id_ = curr_proposal_id; @@ -889,7 +889,7 @@ TEST_F(TestLogSlidingWindow, test_append_disk_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); int64_t curr_proposal_id = 10; mock_state_mgr_.mock_proposal_id_ = curr_proposal_id; // generate new group entry @@ -1011,7 +1011,7 @@ TEST_F(TestLogSlidingWindow, test_group_entry_truncate) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1)); int64_t curr_proposal_id = 10; mock_state_mgr_.mock_proposal_id_ = curr_proposal_id; // generate new group entry