From 870bc8a7202c6392e7e90c8aae6ea22e97a92f5c Mon Sep 17 00:00:00 2001 From: HaHaJeff Date: Mon, 11 Dec 2023 09:17:20 +0000 Subject: [PATCH] fixed memory leak when palf epoch changed. --- .../env/ob_simple_log_cluster_env.h | 69 ++++--- .../logservice/test_ob_simple_log_engine.cpp | 36 +--- ...test_ob_simple_log_single_replica_func.cpp | 130 ++++++++++++++ src/logservice/palf/log_io_task.cpp | 168 +++++------------- src/logservice/palf/log_io_task.h | 29 +-- src/logservice/palf/log_io_worker.cpp | 1 - 6 files changed, 245 insertions(+), 188 deletions(-) diff --git a/mittest/logservice/env/ob_simple_log_cluster_env.h b/mittest/logservice/env/ob_simple_log_cluster_env.h index c30023c273..f4eb64afb1 100644 --- a/mittest/logservice/env/ob_simple_log_cluster_env.h +++ b/mittest/logservice/env/ob_simple_log_cluster_env.h @@ -258,15 +258,15 @@ private: class IOTaskCond : public LogIOTask { public: - IOTaskCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {} - virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final + IOTaskCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch), count_(0) {} + virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final { PALF_LOG(INFO, "before cond_wait"); cond_.wait(); PALF_LOG(INFO, "after cond_wait"); return OB_SUCCESS; }; - virtual int after_consume_(IPalfEnvImpl *palf_env_impl) override final + virtual int after_consume_(IPalfHandleImplGuard &guard) override final { return OB_SUCCESS; } @@ -280,37 +280,60 @@ public: virtual int64_t get_io_size_() const {return 0;} bool need_purge_throttling_() const {return true;} ObCond cond_; + int64_t count_; +}; + +class IOTaskConsumeCond : public LogIOTask { +public: + IOTaskConsumeCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {} + virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final + { + int ret = OB_SUCCESS; + PALF_LOG(INFO, "do_task_ success"); + if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) { + PALF_LOG(WARN, "push_task_into_cb_thread_pool failed", K(ret), K(tg_id), KP(this)); + } + return ret; + }; + virtual int after_consume_(IPalfHandleImplGuard &guard) override final + { + PALF_LOG(INFO, "before cond_wait"); + cond_.wait(); + PALF_LOG(INFO, "after cond_wait"); + return OB_SUCCESS; + } + virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; } + int init(int64_t palf_id) + { + palf_id_ = palf_id; + return OB_SUCCESS; + }; + virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);} + virtual int64_t get_io_size_() const {return 0;} + bool need_purge_throttling_() const {return true;} + ObCond cond_; }; -class IOTaskConsumeCond : public LogIOTask { +class IOTaskVerify : public LogIOTask { public: - IOTaskConsumeCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {} - virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final + IOTaskVerify(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch), count_(0), after_consume_count_(0) {} + virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard) { - int ret = OB_SUCCESS; - PALF_LOG(INFO, "do_task_ success"); - if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) { - PALF_LOG(WARN, "push_task_into_cb_thread_pool failed", K(ret), K(tg_id), KP(this)); - } - return ret; - }; - virtual int after_consume_(IPalfEnvImpl *palf_env_impl) override final - { - PALF_LOG(INFO, "before cond_wait"); - cond_.wait(); - PALF_LOG(INFO, "after cond_wait"); + count_ ++; return OB_SUCCESS; - } + }; + virtual int after_consume_(IPalfHandleImplGuard &guard) { return OB_SUCCESS; } virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; } + virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);} + int64_t get_io_size_() const {return 0;} + bool need_purge_throttling_() const {return true;} int init(int64_t palf_id) { palf_id_ = palf_id; return OB_SUCCESS; }; - virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);} - virtual int64_t get_io_size_() const {return 0;} - bool need_purge_throttling_() const {return true;} - ObCond cond_; + int64_t count_; + int64_t after_consume_count_; }; } // end namespace unittest } // end namespace oceanbase diff --git a/mittest/logservice/test_ob_simple_log_engine.cpp b/mittest/logservice/test_ob_simple_log_engine.cpp index cb6230c93d..6022d4c587 100644 --- a/mittest/logservice/test_ob_simple_log_engine.cpp +++ b/mittest/logservice/test_ob_simple_log_engine.cpp @@ -432,28 +432,6 @@ TEST_F(TestObSimpleLogClusterLogEngine, exception_path) } -class IOTaskVerify : public LogIOTask { -public: - IOTaskVerify(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch), count_(0), after_consume_count_(0) {} - virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) - { - count_ ++; - return OB_SUCCESS; - }; - virtual int after_consume_(IPalfEnvImpl *palf_env_impl) { return OB_SUCCESS; } - virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; } - virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);} - int64_t get_io_size_() const {return 0;} - bool need_purge_throttling_() const {return true;} - int init(int64_t palf_id) - { - palf_id_ = palf_id; - return OB_SUCCESS; - }; - int64_t count_; - int64_t after_consume_count_; -}; - TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) { SET_CASE_LOG_FILE(TEST_NAME, "io_reducer_func"); @@ -517,9 +495,9 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) int64_t prev_log_id_2 = 0; int64_t leader_idx_2 = 0; PalfHandleImplGuard leader_2; - IOTaskCond io_task_cond_2(id_2, log_engine->palf_epoch_); - IOTaskVerify io_task_verify_2(id_2, log_engine->palf_epoch_); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_2, leader_idx_2, leader_2)); + IOTaskCond io_task_cond_2(id_2, leader_2.get_palf_handle_impl()->log_engine_.palf_epoch_); + IOTaskVerify io_task_verify_2(id_2, leader_2.get_palf_handle_impl()->log_engine_.palf_epoch_); { LogIOWorker *log_io_worker = leader_2.palf_handle_impl_->log_engine_.log_io_worker_; // 聚合度为1的忽略 @@ -537,6 +515,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) LSN max_lsn_1 = leader_1.palf_handle_impl_->sw_.get_max_lsn(); const int64_t log_id_2 = leader_2.palf_handle_impl_->sw_.get_max_log_id(); LSN max_lsn_2 = leader_2.palf_handle_impl_->sw_.get_max_lsn(); + sleep(1); io_task_cond_2.cond_.signal(); wait_lsn_until_flushed(max_lsn_1, leader_1); wait_lsn_until_flushed(max_lsn_2, leader_2); @@ -586,9 +565,9 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) int64_t leader_idx_3 = 0; int64_t prev_log_id_3 = 0; PalfHandleImplGuard leader_3; - IOTaskCond io_task_cond_3(id_3, log_engine->palf_epoch_); - IOTaskVerify io_task_verify_3(id_3, log_engine->palf_epoch_); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_3, leader_idx_3, leader_3)); + IOTaskCond io_task_cond_3(id_3, leader_3.get_palf_handle_impl()->log_engine_.palf_epoch_); + IOTaskVerify io_task_verify_3(id_3, leader_3.get_palf_handle_impl()->log_engine_.palf_epoch_); { LogIOWorker *log_io_worker = leader_3.palf_handle_impl_->log_engine_.log_io_worker_; EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3)); @@ -634,9 +613,9 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) int64_t leader_idx_4 = 0; int64_t prev_log_id_4 = 0; PalfHandleImplGuard leader_4; - IOTaskCond io_task_cond_4(id_4, log_engine->palf_epoch_); - IOTaskVerify io_task_verify_4(id_4, log_engine->palf_epoch_); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_4, leader_idx_4, leader_4)); + IOTaskCond io_task_cond_4(id_4, leader_4.get_palf_handle_impl()->log_engine_.palf_epoch_); + IOTaskVerify io_task_verify_4(id_4, leader_4.get_palf_handle_impl()->log_engine_.palf_epoch_); { LogIOWorker *log_io_worker = leader_4.palf_handle_impl_->log_engine_.log_io_worker_; EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_4)); @@ -656,7 +635,6 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) io_task_cond_4.cond_.signal(); LSN log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_; PALF_LOG(INFO, "after signal", K(max_lsn), K(log_tail)); - wait_lsn_until_flushed(max_lsn, leader_4); sleep(1); log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_; PALF_LOG(INFO, "after flused case 4", K(max_lsn), K(log_tail)); diff --git a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp index 3593a325ad..9e9c859c10 100644 --- a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp +++ b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp @@ -26,6 +26,7 @@ #include "logservice/palf/log_group_entry_header.h" #include "logservice/palf/log_io_worker.h" #include "logservice/palf/lsn.h" +#include const std::string TEST_NAME = "single_replica"; @@ -1723,6 +1724,135 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback) } } +TEST_F(TestObSimpleLogClusterSingleReplica, test_iow_memleak) +{ + SET_CASE_LOG_FILE(TEST_NAME, "test_iow"); + OB_LOGGER.set_log_level("INFO"); + int64_t id = ATOMIC_AAF(&palf_id_, 1); + int64_t leader_idx = 0; + // case1: palf epoch has been changed during do_task + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_; + IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_; + ObILogAllocator *allocator = palf_env_impl->get_log_allocator(); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn())); + LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn(); + + IOTaskCond cond(id, leader.palf_env_impl_->last_palf_epoch_); + EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&cond)); + sleep(1); + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_NE(0, allocator->flying_log_task_); + EXPECT_NE(0, allocator->flying_meta_task_); + leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; + cond.cond_.signal(); + PALF_LOG(INFO, "runlin trace submit log 1"); + while (iow->queue_.size() > 0) { + PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size()); + sleep(1); + } + EXPECT_EQ(0, allocator->flying_log_task_); + EXPECT_EQ(0, allocator->flying_meta_task_); + } + delete_paxos_group(id); + + // case2: palf epoch has been changed during after_consume + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_; + IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_; + ObILogAllocator *allocator = palf_env_impl->get_log_allocator(); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn())); + LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn(); + IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_); + EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond)); + sleep(1); + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_NE(0, allocator->flying_log_task_); + EXPECT_NE(0, allocator->flying_meta_task_); + leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; + consume_cond.cond_.signal(); + PALF_LOG(INFO, "runlin trace submit log 2"); + IOTaskVerify verify(id, leader.get_palf_handle_impl()->log_engine_.palf_epoch_); + EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&verify)); + while (verify.count_ == 0) { + PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size()); + sleep(1); + } + EXPECT_EQ(0, allocator->flying_log_task_); + EXPECT_EQ(0, allocator->flying_meta_task_); + } + delete_paxos_group(id); + // case3: palf epoch has been changed during do_task when there is no io reduce + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_; + IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_; + bool need_stop = false; + std::thread throttling_th([palf_env_impl, &need_stop](){ + PalfEnvImpl *impl = dynamic_cast(palf_env_impl); + while (!need_stop) { + impl->log_io_worker_wrapper_.notify_need_writing_throttling(true); + usleep(1000); + } + }); + ObILogAllocator *allocator = palf_env_impl->get_log_allocator(); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn())); + LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn(); + // case2: palf epoch has been changed during after_consume + IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_); + EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond)); + sleep(1); + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_NE(0, allocator->flying_log_task_); + EXPECT_NE(0, allocator->flying_meta_task_); + leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; + consume_cond.cond_.signal(); + PALF_LOG(INFO, "runlin trace submit log 3"); + IOTaskVerify verify(id, leader.get_palf_handle_impl()->log_engine_.palf_epoch_); + EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&verify)); + while (verify.count_ == 0) { + PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size()); + sleep(1); + } + EXPECT_EQ(0, allocator->flying_log_task_); + EXPECT_EQ(0, allocator->flying_meta_task_); + need_stop = true; + throttling_th.join(); + } +} + } // namespace unittest } // namespace oceanbase diff --git a/src/logservice/palf/log_io_task.cpp b/src/logservice/palf/log_io_task.cpp index 6baa3bd3c0..abe96c14c4 100644 --- a/src/logservice/palf/log_io_task.cpp +++ b/src/logservice/palf/log_io_task.cpp @@ -73,36 +73,53 @@ void LogIOTask::reset() submit_seq_ = 0; } -// NB: if do_task failed, the caller is responsible for freeing LogIOTask. +// NB: if do_task failed, the caller(LogIOWorker) is responsible for freeing LogIOTask. int LogIOTask::do_task(int tg_id, IPalfEnvImpl *palf_env_impl) { int ret = OB_SUCCESS; int64_t do_task_ts = ObTimeUtility::current_time(); const int64_t delay_ts = do_task_ts - init_task_ts_; constexpr int64_t MAX_DELAY_TIME = 100 * 1000; + IPalfHandleImplGuard guard; + int64_t palf_epoch = -1; if (delay_ts >= MAX_DELAY_TIME) { PALF_LOG(INFO, "[io delay]", K(do_task_ts), K(delay_ts)); } - - if (OB_FAIL(do_task_(tg_id, palf_env_impl))) { + if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { + PALF_LOG(WARN, "get_palf_handle_impl failed", KPC(this)); + } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { + PALF_LOG(WARN, "get_palf_epoch failed", KPC(this)); + } else if (palf_epoch != palf_epoch_) { + ret = OB_STATE_NOT_MATCH; + PALF_LOG(WARN, "palf_epoch has been changed, drop task", KPC(this), K(palf_epoch)); + } else if (OB_FAIL(do_task_(tg_id, guard))) { PALF_LOG(WARN, "do_task_ failed", K(ret), K(tg_id), KPC(palf_env_impl)); - } + } else {} return ret; } -// NB: after after_consume, the caller needs free LogIOTask. +// NB: after after_consume, the caller(LogIOCb) needs free LogIOTask. int LogIOTask::after_consume(IPalfEnvImpl *palf_env_impl) { int ret = OB_SUCCESS; int64_t after_consume_ts = ObTimeUtility::current_time(); const int64_t delay_ts = after_consume_ts - push_cb_into_cb_pool_ts_; - if (OB_FAIL(after_consume_(palf_env_impl))) { - PALF_LOG(WARN, "after_consume_ failed", K(ret), KPC(palf_env_impl)); - } + int64_t palf_epoch = -1; + IPalfHandleImplGuard guard; constexpr int64_t MAX_DELAY_TIME = 100 * 1000; if (delay_ts >= MAX_DELAY_TIME) { PALF_LOG(INFO, "[io delay]", K(after_consume_ts), K(delay_ts)); } + if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { + PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); + } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { + PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_)); + } else if (palf_epoch != palf_epoch_) { + ret = OB_STATE_NOT_MATCH; + PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), KPC(this)); + } else if (OB_FAIL(after_consume_(guard))) { + PALF_LOG(WARN, "after_consume_ failed", K(ret), KPC(palf_env_impl)); + } else {} return ret; } @@ -174,22 +191,13 @@ void LogIOFlushLogTask::destroy() } } -int LogIOFlushLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) +int LogIOFlushLogTask::do_task_(int tg_id, IPalfHandleImplGuard &guard) { int ret = OB_SUCCESS; - int64_t palf_epoch = -1; - IPalfHandleImplGuard guard; const LSN flush_log_end_lsn = flush_log_cb_ctx_.lsn_ + flush_log_cb_ctx_.total_len_; if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogIOFlusLoghTask not inited", K(ret)); - } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); - } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_)); - } else if (palf_epoch != palf_epoch_) { - PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch), - K(flush_log_cb_ctx_)); } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_append_log( flush_log_cb_ctx_.lsn_, write_buf_, flush_log_cb_ctx_.scn_))) { PALF_LOG(ERROR, "LogEngine pwrite failed", K(ret), K(write_buf_)); @@ -203,25 +211,13 @@ int LogIOFlushLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) return ret; } -// NB: the memory of 'this' will be release -int LogIOFlushLogTask::after_consume_(IPalfEnvImpl *palf_env_impl) +int LogIOFlushLogTask::after_consume_(IPalfHandleImplGuard &guard) { int ret = OB_SUCCESS; - int64_t palf_epoch = -1; - IPalfHandleImplGuard guard; common::ObTimeGuard time_guard("after_consume log", 10 * 1000); if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogIOFlushLogTask not inited", K(ret), KPC(this)); - } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); - } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_)); - } else if (palf_epoch != palf_epoch_) { - PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch), - K(flush_log_cb_ctx_)); - // NB: the memory of 'this' has released after 'inner_after_flush_log', don't use any - // fields about 'this'. } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flush_log(flush_log_cb_ctx_))) { PALF_LOG(WARN, "PalfHandleImpl after_flush_log failed", K(ret)); } else { @@ -270,20 +266,11 @@ void LogIOTruncateLogTask::destroy() } } -int LogIOTruncateLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) +int LogIOTruncateLogTask::do_task_(int tg_id, IPalfHandleImplGuard &guard) { int ret = OB_SUCCESS; - int64_t palf_epoch = -1; - IPalfHandleImplGuard guard; if (IS_NOT_INIT) { ret = OB_NOT_INIT; - } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); - } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_)); - } else if (palf_epoch != palf_epoch_) { - PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch), - K(truncate_log_cb_ctx_)); } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_truncate_log(truncate_log_cb_ctx_.lsn_))) { PALF_LOG(WARN, "PalfHandleImpl inner_truncate_log failed", K(ret), K(palf_id_), K_(truncate_log_cb_ctx)); @@ -294,23 +281,13 @@ int LogIOTruncateLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) return ret; } -int LogIOTruncateLogTask::after_consume_(IPalfEnvImpl *palf_env_impl) +int LogIOTruncateLogTask::after_consume_(IPalfHandleImplGuard &guard) { int ret = OB_SUCCESS; int64_t palf_epoch = -1; - IPalfHandleImplGuard guard; if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogITruncateLogTask not inited!!!", K(ret)); - } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); - // NB: the memory of 'this' has released after 'inner_after_truncate_log', don't use any - // fields about 'this'. - } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_)); - } else if (palf_epoch != palf_epoch_) { - PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch), - K(truncate_log_cb_ctx_)); } else if (OB_FAIL( guard.get_palf_handle_impl()->inner_after_truncate_log(truncate_log_cb_ctx_))) { PALF_LOG(WARN, "PalfHandleImpl inner_after_truncate_log failed", K(ret)); @@ -373,21 +350,12 @@ void LogIOFlushMetaTask::destroy() } } -int LogIOFlushMetaTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) +int LogIOFlushMetaTask::do_task_(int tg_id, IPalfHandleImplGuard &guard) { int ret = OB_SUCCESS; - int64_t palf_epoch = -1; - IPalfHandleImplGuard guard; if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret)); - } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); - } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_)); - } else if (palf_epoch != palf_epoch_) { - PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch), - K(flush_meta_cb_ctx_)); } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_append_meta(buf_, buf_len_))) { PALF_LOG(ERROR, "PalfHandleImpl inner_append_meta failed", K(ret), K(palf_id_)); } else if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) { @@ -397,23 +365,12 @@ int LogIOFlushMetaTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) return ret; } -int LogIOFlushMetaTask::after_consume_(IPalfEnvImpl *palf_env_impl) +int LogIOFlushMetaTask::after_consume_(IPalfHandleImplGuard &guard) { int ret = OB_SUCCESS; - int64_t palf_epoch = -1; - IPalfHandleImplGuard guard; if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret)); - } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); - } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_)); - } else if (palf_epoch != palf_epoch_) { - PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch), - K(flush_meta_cb_ctx_)); - // NB: the memory of 'this' has released after 'inner_after_flush_meta', don't use any - // fields about 'this'. } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flush_meta(flush_meta_cb_ctx_))) { PALF_LOG(WARN, "PalfHandleImpl after_flush_meta failed", K(ret), KP(this)); } else { @@ -460,21 +417,12 @@ void LogIOTruncatePrefixBlocksTask::destroy() } } -int LogIOTruncatePrefixBlocksTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) +int LogIOTruncatePrefixBlocksTask::do_task_(int tg_id, IPalfHandleImplGuard &guard) { int ret = OB_SUCCESS; - int64_t palf_epoch = -1; - IPalfHandleImplGuard guard; if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogIOTruncatePrefixBlocksTask not inited!!!", K(ret)); - } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); - } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_)); - } else if (palf_epoch != palf_epoch_) { - PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch), - K(truncate_prefix_blocks_ctx_)); } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_truncate_prefix_blocks( truncate_prefix_blocks_ctx_.lsn_))) { PALF_LOG(ERROR, "PalfHandleImpl inner_truncate_prefix_blocks failed", K(ret), K(palf_id_)); @@ -485,23 +433,12 @@ int LogIOTruncatePrefixBlocksTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_im return ret; } -int LogIOTruncatePrefixBlocksTask::after_consume_(IPalfEnvImpl *palf_env_impl) +int LogIOTruncatePrefixBlocksTask::after_consume_(IPalfHandleImplGuard &guard) { int ret = OB_SUCCESS; - int64_t palf_epoch = -1; - IPalfHandleImplGuard guard; if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret)); - } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); - } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_)); - } else if (palf_epoch != palf_epoch_) { - PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch), - K(truncate_prefix_blocks_ctx_)); - // NB: the memory of 'this' has released after 'inner_after_truncate_prefix_blocks', don't use - // any fields about 'this'. } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_truncate_prefix_blocks( truncate_prefix_blocks_ctx_))) { PALF_LOG(WARN, "PalfHandleImpl inner_after_truncate_prefix_blocks failed", K(ret)); @@ -599,7 +536,6 @@ int BatchLogIOFlushLogTask::push_back(LogIOFlushLogTask *task) int BatchLogIOFlushLogTask::do_task(int tg_id, IPalfEnvImpl *palf_env_impl) { int ret = OB_SUCCESS; - // 释放内存 if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret), KPC(this)); @@ -666,6 +602,8 @@ int BatchLogIOFlushLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) } else if (palf_epoch != io_task->get_palf_epoch()) { PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch), KPC(io_task), KPC(io_task)); + io_task->free_this(palf_env_impl); + io_task_array_[i] = NULL; } else if (OB_FAIL(log_write_buf_array_.push_back(&io_task->write_buf_))) { PALF_LOG(ERROR, "log_write_buf_array_ push_back failed, unexpected error!!!", K(ret), KPC(this)); @@ -702,7 +640,8 @@ void BatchLogIOFlushLogTask::clear_memory_(IPalfEnvImpl *palf_env_impl) for (int64_t i = 0; i < count; i++) { LogIOFlushLogTask *task = io_task_array_[i]; if (NULL != task) { - palf_env_impl->get_log_allocator()->free_log_io_flush_log_task(task); + task->free_this(palf_env_impl); + io_task_array_[i] = NULL; } } } @@ -746,37 +685,28 @@ void LogIOFlashbackTask::destroy() } } -int LogIOFlashbackTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) +int LogIOFlashbackTask::do_task_(int tg_id, IPalfHandleImplGuard &guard) { - UNUSED(tg_id); int ret = OB_SUCCESS; - IPalfHandleImplGuard guard; if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogIOFlashbackTask not inited!!!", K(ret)); - } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl( palf_id_, guard))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_flashback(flashback_ctx_.flashback_scn_))) { - PALF_LOG(ERROR, "PalfHandleImpl inner_flashback failed", K(ret), K(palf_id_)); + PALF_LOG(WARN, "inner_flashback failed", KPC(this)); } else if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) { - PALF_LOG(WARN, "LogIOFlashbackTask after_consume", K(ret), KPC(palf_env_impl)); + PALF_LOG(WARN, "push_flush_cb_to_thread_pool_ failed", K(ret)); } else { PALF_LOG(INFO, "LogIOFlashbackTask do_task success", K(ret), K(palf_id_)); } return ret; } -int LogIOFlashbackTask::after_consume_(IPalfEnvImpl *palf_env_impl) +int LogIOFlashbackTask::after_consume_(IPalfHandleImplGuard &guard) { int ret = OB_SUCCESS; - IPalfHandleImplGuard guard; if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret)); - } else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { - PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_)); - // NB: the memory of 'this' has released after 'inner_after_flush_meta', don't use any - // fields about 'this'. } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flashback(flashback_ctx_))) { PALF_LOG(ERROR, "PalfHandleImpl inner_after_flashback failed", K(ret)); } else { @@ -829,21 +759,17 @@ int LogIOPurgeThrottlingTask::init(const PurgeThrottlingCbCtx & purge_ctx) return ret; } -int LogIOPurgeThrottlingTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) +int LogIOPurgeThrottlingTask::do_task_(int tg_id, IPalfHandleImplGuard &guard) { + UNUSED(guard); int ret = OB_SUCCESS; - PALF_LOG(INFO, "process PurgeThrottlingTask", KPC(this)); - if (OB_ISNULL(palf_env_impl->get_log_allocator())) { - ret = OB_ERR_UNEXPECTED; - PALF_LOG(ERROR, "log_allocator is NULL", KPC(this)); - } else { - palf_env_impl->get_log_allocator()->free_log_io_purge_throttling_task(this); - } + ret = push_task_into_cb_thread_pool_(tg_id, this); return ret; } -int LogIOPurgeThrottlingTask::after_consume_(IPalfEnvImpl *palf_env_impl) +int LogIOPurgeThrottlingTask::after_consume_(IPalfHandleImplGuard &guard) { + UNUSED(guard); return OB_SUCCESS; } @@ -852,7 +778,7 @@ void LogIOPurgeThrottlingTask::free_this_(IPalfEnvImpl *palf_env_impl) if (OB_ISNULL(palf_env_impl) || OB_ISNULL(palf_env_impl->get_log_allocator())) { LOG_ERROR_RET(OB_ERR_UNEXPECTED, "palf_env_impl or log_allocator is NULL", KPC(this), KP(palf_env_impl)) } else { - palf_env_impl->get_log_allocator()->free_log_io_purge_throttling_task(this); + palf_env_impl->get_log_allocator()->free_log_io_purge_throttling_task(this); } } diff --git a/src/logservice/palf/log_io_task.h b/src/logservice/palf/log_io_task.h index 833855547e..2751c964b5 100644 --- a/src/logservice/palf/log_io_task.h +++ b/src/logservice/palf/log_io_task.h @@ -36,6 +36,7 @@ enum class LogIOTaskType PURGE_THROTTLING_TYPE = 6, }; +class IPalfHandleImplGuard; class LogIOTask; int push_task_into_cb_thread_pool(const int64_t tg_id, LogIOTask *io_task); @@ -66,8 +67,8 @@ public: K(submit_seq_)); protected: - virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) = 0; - virtual int after_consume_(IPalfEnvImpl *palf_env_impl) = 0; + virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard) = 0; + virtual int after_consume_(IPalfHandleImplGuard &guard) = 0; virtual LogIOTaskType get_io_task_type_() const = 0; virtual void free_this_(IPalfEnvImpl *palf_env_impl) = 0; virtual int64_t get_io_size_() const = 0; @@ -98,9 +99,9 @@ public: INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(write_buf), K_(flush_log_cb_ctx), K(is_inited_)); private: // IO thread will call this function to flush log - int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; + int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final; // IO thread will call this function to submit async task - int after_consume_(IPalfEnvImpl *palf_env_impl) override final; + int after_consume_(IPalfHandleImplGuard &guard) override final; LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLUSH_LOG_TYPE; } void free_this_(IPalfEnvImpl *palf_env_impl) override final; int64_t get_io_size_() const override final; @@ -122,8 +123,8 @@ public: INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(truncate_log_cb_ctx)); private: - int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; - int after_consume_(IPalfEnvImpl *palf_env_impl) override final; + int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final; + int after_consume_(IPalfHandleImplGuard &guard) override final; LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::TRUNCATE_LOG_TYPE; } void free_this_(IPalfEnvImpl *palf_env_impl) override final; int64_t get_io_size_() const override final {return 0;} @@ -146,8 +147,8 @@ public: INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(flush_meta_cb_ctx)); private: - int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; - int after_consume_(IPalfEnvImpl *palf_env_impl) override final; + int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final; + int after_consume_(IPalfHandleImplGuard &guard) override final; LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLUSH_META_TYPE; } void free_this_(IPalfEnvImpl *palf_env_impl) override final; int64_t get_io_size_() const override final {return buf_len_;} @@ -170,8 +171,8 @@ public: INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(truncate_prefix_blocks_ctx)); private: - int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; - int after_consume_(IPalfEnvImpl *palf_env_impl) override final; + int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final; + int after_consume_(IPalfHandleImplGuard &guard) override final; LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::TRUNCATE_PREFIX_TYPE; } void free_this_(IPalfEnvImpl *palf_env_impl) override final; int64_t get_io_size_() const override final {return 0;} @@ -225,8 +226,8 @@ public: void destroy(); TO_STRING_KV(K_(palf_id), K_(flashback_ctx), K_(is_inited)); private: - int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; - int after_consume_(IPalfEnvImpl *palf_env_impl) override final; + int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final; + int after_consume_(IPalfHandleImplGuard &guard) override final; LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLASHBACK_LOG_TYPE; } void free_this_(IPalfEnvImpl *palf_env_impl) override final; int64_t get_io_size_() const override final {return 0;} @@ -246,8 +247,8 @@ public: void destroy(); INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(purge_ctx), K_(is_inited)); private: - int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; - int after_consume_(IPalfEnvImpl *palf_env_impl) override final; + int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final; + int after_consume_(IPalfHandleImplGuard &guard) override final; LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::PURGE_THROTTLING_TYPE; } void free_this_(IPalfEnvImpl *palf_env_impl) override final; int64_t get_io_size_() const override final {return 0;} diff --git a/src/logservice/palf/log_io_worker.cpp b/src/logservice/palf/log_io_worker.cpp index 176a274614..d1489f9a20 100644 --- a/src/logservice/palf/log_io_worker.cpp +++ b/src/logservice/palf/log_io_worker.cpp @@ -324,7 +324,6 @@ int LogIOWorker::reduce_io_task_(void *task) } if (false == last_io_task_has_been_reduced && OB_NOT_NULL(io_task)) { - io_task = reinterpret_cast(io_task); ret = handle_io_task_(io_task); } PALF_LOG(TRACE, "reduce_io_task_ finished", K(ret), K(tmp_ret), KPC(this));