fixed memory leak when palf epoch changed.

This commit is contained in:
HaHaJeff
2024-02-07 15:57:42 +00:00
committed by ob-robot
parent e783acb1c3
commit 315fe90c5e
6 changed files with 245 additions and 188 deletions

View File

@ -258,15 +258,15 @@ private:
class IOTaskCond : public LogIOTask { class IOTaskCond : public LogIOTask {
public: public:
IOTaskCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {} IOTaskCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch), count_(0) {}
virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final
{ {
PALF_LOG(INFO, "before cond_wait"); PALF_LOG(INFO, "before cond_wait");
cond_.wait(); cond_.wait();
PALF_LOG(INFO, "after cond_wait"); PALF_LOG(INFO, "after cond_wait");
return OB_SUCCESS; return OB_SUCCESS;
}; };
virtual int after_consume_(IPalfEnvImpl *palf_env_impl) override final virtual int after_consume_(IPalfHandleImplGuard &guard) override final
{ {
return OB_SUCCESS; return OB_SUCCESS;
} }
@ -280,37 +280,60 @@ public:
virtual int64_t get_io_size_() const {return 0;} virtual int64_t get_io_size_() const {return 0;}
bool need_purge_throttling_() const {return true;} bool need_purge_throttling_() const {return true;}
ObCond cond_; ObCond cond_;
int64_t count_;
};
class IOTaskConsumeCond : public LogIOTask {
public:
IOTaskConsumeCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {}
virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final
{
int ret = OB_SUCCESS;
PALF_LOG(INFO, "do_task_ success");
if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) {
PALF_LOG(WARN, "push_task_into_cb_thread_pool failed", K(ret), K(tg_id), KP(this));
}
return ret;
};
virtual int after_consume_(IPalfHandleImplGuard &guard) override final
{
PALF_LOG(INFO, "before cond_wait");
cond_.wait();
PALF_LOG(INFO, "after cond_wait");
return OB_SUCCESS;
}
virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; }
int init(int64_t palf_id)
{
palf_id_ = palf_id;
return OB_SUCCESS;
};
virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);}
virtual int64_t get_io_size_() const {return 0;}
bool need_purge_throttling_() const {return true;}
ObCond cond_;
}; };
class IOTaskConsumeCond : public LogIOTask { class IOTaskVerify : public LogIOTask {
public: public:
IOTaskConsumeCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {} IOTaskVerify(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch), count_(0), after_consume_count_(0) {}
virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard)
{ {
int ret = OB_SUCCESS; count_ ++;
PALF_LOG(INFO, "do_task_ success");
if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) {
PALF_LOG(WARN, "push_task_into_cb_thread_pool failed", K(ret), K(tg_id), KP(this));
}
return ret;
};
virtual int after_consume_(IPalfEnvImpl *palf_env_impl) override final
{
PALF_LOG(INFO, "before cond_wait");
cond_.wait();
PALF_LOG(INFO, "after cond_wait");
return OB_SUCCESS; return OB_SUCCESS;
} };
virtual int after_consume_(IPalfHandleImplGuard &guard) { return OB_SUCCESS; }
virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; } virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; }
virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);}
int64_t get_io_size_() const {return 0;}
bool need_purge_throttling_() const {return true;}
int init(int64_t palf_id) int init(int64_t palf_id)
{ {
palf_id_ = palf_id; palf_id_ = palf_id;
return OB_SUCCESS; return OB_SUCCESS;
}; };
virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);} int64_t count_;
virtual int64_t get_io_size_() const {return 0;} int64_t after_consume_count_;
bool need_purge_throttling_() const {return true;}
ObCond cond_;
}; };
} // end namespace unittest } // end namespace unittest
} // end namespace oceanbase } // end namespace oceanbase

View File

@ -432,28 +432,6 @@ TEST_F(TestObSimpleLogClusterLogEngine, exception_path)
} }
class IOTaskVerify : public LogIOTask {
public:
IOTaskVerify(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch), count_(0), after_consume_count_(0) {}
virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
{
count_ ++;
return OB_SUCCESS;
};
virtual int after_consume_(IPalfEnvImpl *palf_env_impl) { return OB_SUCCESS; }
virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; }
virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);}
int64_t get_io_size_() const {return 0;}
bool need_purge_throttling_() const {return true;}
int init(int64_t palf_id)
{
palf_id_ = palf_id;
return OB_SUCCESS;
};
int64_t count_;
int64_t after_consume_count_;
};
TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
{ {
SET_CASE_LOG_FILE(TEST_NAME, "io_reducer_func"); SET_CASE_LOG_FILE(TEST_NAME, "io_reducer_func");
@ -517,9 +495,9 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
int64_t prev_log_id_2 = 0; int64_t prev_log_id_2 = 0;
int64_t leader_idx_2 = 0; int64_t leader_idx_2 = 0;
PalfHandleImplGuard leader_2; PalfHandleImplGuard leader_2;
IOTaskCond io_task_cond_2(id_2, log_engine->palf_epoch_);
IOTaskVerify io_task_verify_2(id_2, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_2, leader_idx_2, leader_2)); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_2, leader_idx_2, leader_2));
IOTaskCond io_task_cond_2(id_2, leader_2.get_palf_handle_impl()->log_engine_.palf_epoch_);
IOTaskVerify io_task_verify_2(id_2, leader_2.get_palf_handle_impl()->log_engine_.palf_epoch_);
{ {
LogIOWorker *log_io_worker = leader_2.palf_handle_impl_->log_engine_.log_io_worker_; LogIOWorker *log_io_worker = leader_2.palf_handle_impl_->log_engine_.log_io_worker_;
// 聚合度为1的忽略 // 聚合度为1的忽略
@ -537,6 +515,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
LSN max_lsn_1 = leader_1.palf_handle_impl_->sw_.get_max_lsn(); LSN max_lsn_1 = leader_1.palf_handle_impl_->sw_.get_max_lsn();
const int64_t log_id_2 = leader_2.palf_handle_impl_->sw_.get_max_log_id(); const int64_t log_id_2 = leader_2.palf_handle_impl_->sw_.get_max_log_id();
LSN max_lsn_2 = leader_2.palf_handle_impl_->sw_.get_max_lsn(); LSN max_lsn_2 = leader_2.palf_handle_impl_->sw_.get_max_lsn();
sleep(1);
io_task_cond_2.cond_.signal(); io_task_cond_2.cond_.signal();
wait_lsn_until_flushed(max_lsn_1, leader_1); wait_lsn_until_flushed(max_lsn_1, leader_1);
wait_lsn_until_flushed(max_lsn_2, leader_2); wait_lsn_until_flushed(max_lsn_2, leader_2);
@ -586,9 +565,9 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
int64_t leader_idx_3 = 0; int64_t leader_idx_3 = 0;
int64_t prev_log_id_3 = 0; int64_t prev_log_id_3 = 0;
PalfHandleImplGuard leader_3; PalfHandleImplGuard leader_3;
IOTaskCond io_task_cond_3(id_3, log_engine->palf_epoch_);
IOTaskVerify io_task_verify_3(id_3, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_3, leader_idx_3, leader_3)); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_3, leader_idx_3, leader_3));
IOTaskCond io_task_cond_3(id_3, leader_3.get_palf_handle_impl()->log_engine_.palf_epoch_);
IOTaskVerify io_task_verify_3(id_3, leader_3.get_palf_handle_impl()->log_engine_.palf_epoch_);
{ {
LogIOWorker *log_io_worker = leader_3.palf_handle_impl_->log_engine_.log_io_worker_; LogIOWorker *log_io_worker = leader_3.palf_handle_impl_->log_engine_.log_io_worker_;
EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3)); EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3));
@ -634,9 +613,9 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
int64_t leader_idx_4 = 0; int64_t leader_idx_4 = 0;
int64_t prev_log_id_4 = 0; int64_t prev_log_id_4 = 0;
PalfHandleImplGuard leader_4; PalfHandleImplGuard leader_4;
IOTaskCond io_task_cond_4(id_4, log_engine->palf_epoch_);
IOTaskVerify io_task_verify_4(id_4, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_4, leader_idx_4, leader_4)); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_4, leader_idx_4, leader_4));
IOTaskCond io_task_cond_4(id_4, leader_4.get_palf_handle_impl()->log_engine_.palf_epoch_);
IOTaskVerify io_task_verify_4(id_4, leader_4.get_palf_handle_impl()->log_engine_.palf_epoch_);
{ {
LogIOWorker *log_io_worker = leader_4.palf_handle_impl_->log_engine_.log_io_worker_; LogIOWorker *log_io_worker = leader_4.palf_handle_impl_->log_engine_.log_io_worker_;
EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_4)); EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_4));
@ -656,7 +635,6 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
io_task_cond_4.cond_.signal(); io_task_cond_4.cond_.signal();
LSN log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_; LSN log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
PALF_LOG(INFO, "after signal", K(max_lsn), K(log_tail)); PALF_LOG(INFO, "after signal", K(max_lsn), K(log_tail));
wait_lsn_until_flushed(max_lsn, leader_4);
sleep(1); sleep(1);
log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_; log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
PALF_LOG(INFO, "after flused case 4", K(max_lsn), K(log_tail)); PALF_LOG(INFO, "after flused case 4", K(max_lsn), K(log_tail));

View File

@ -26,6 +26,7 @@
#include "logservice/palf/log_group_entry_header.h" #include "logservice/palf/log_group_entry_header.h"
#include "logservice/palf/log_io_worker.h" #include "logservice/palf/log_io_worker.h"
#include "logservice/palf/lsn.h" #include "logservice/palf/lsn.h"
#include <thread>
const std::string TEST_NAME = "single_replica"; const std::string TEST_NAME = "single_replica";
@ -1723,6 +1724,135 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback)
} }
} }
TEST_F(TestObSimpleLogClusterSingleReplica, test_iow_memleak)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_iow");
OB_LOGGER.set_log_level("INFO");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
// case1: palf epoch has been changed during do_task
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_;
IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_;
ObILogAllocator *allocator = palf_env_impl->get_log_allocator();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn();
IOTaskCond cond(id, leader.palf_env_impl_->last_palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&cond));
sleep(1);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_NE(0, allocator->flying_log_task_);
EXPECT_NE(0, allocator->flying_meta_task_);
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
cond.cond_.signal();
PALF_LOG(INFO, "runlin trace submit log 1");
while (iow->queue_.size() > 0) {
PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size());
sleep(1);
}
EXPECT_EQ(0, allocator->flying_log_task_);
EXPECT_EQ(0, allocator->flying_meta_task_);
}
delete_paxos_group(id);
// case2: palf epoch has been changed during after_consume
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_;
IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_;
ObILogAllocator *allocator = palf_env_impl->get_log_allocator();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn();
IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond));
sleep(1);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_NE(0, allocator->flying_log_task_);
EXPECT_NE(0, allocator->flying_meta_task_);
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
consume_cond.cond_.signal();
PALF_LOG(INFO, "runlin trace submit log 2");
IOTaskVerify verify(id, leader.get_palf_handle_impl()->log_engine_.palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&verify));
while (verify.count_ == 0) {
PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size());
sleep(1);
}
EXPECT_EQ(0, allocator->flying_log_task_);
EXPECT_EQ(0, allocator->flying_meta_task_);
}
delete_paxos_group(id);
// case3: palf epoch has been changed during do_task when there is no io reduce
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_;
IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_;
bool need_stop = false;
std::thread throttling_th([palf_env_impl, &need_stop](){
PalfEnvImpl *impl = dynamic_cast<PalfEnvImpl*>(palf_env_impl);
while (!need_stop) {
impl->log_io_worker_wrapper_.notify_need_writing_throttling(true);
usleep(1000);
}
});
ObILogAllocator *allocator = palf_env_impl->get_log_allocator();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn();
// case2: palf epoch has been changed during after_consume
IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond));
sleep(1);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_NE(0, allocator->flying_log_task_);
EXPECT_NE(0, allocator->flying_meta_task_);
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
consume_cond.cond_.signal();
PALF_LOG(INFO, "runlin trace submit log 3");
IOTaskVerify verify(id, leader.get_palf_handle_impl()->log_engine_.palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&verify));
while (verify.count_ == 0) {
PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size());
sleep(1);
}
EXPECT_EQ(0, allocator->flying_log_task_);
EXPECT_EQ(0, allocator->flying_meta_task_);
need_stop = true;
throttling_th.join();
}
}
} // namespace unittest } // namespace unittest
} // namespace oceanbase } // namespace oceanbase

View File

@ -73,36 +73,53 @@ void LogIOTask::reset()
submit_seq_ = 0; submit_seq_ = 0;
} }
// NB: if do_task failed, the caller is responsible for freeing LogIOTask. // NB: if do_task failed, the caller(LogIOWorker) is responsible for freeing LogIOTask.
int LogIOTask::do_task(int tg_id, IPalfEnvImpl *palf_env_impl) int LogIOTask::do_task(int tg_id, IPalfEnvImpl *palf_env_impl)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t do_task_ts = ObTimeUtility::current_time(); int64_t do_task_ts = ObTimeUtility::current_time();
const int64_t delay_ts = do_task_ts - init_task_ts_; const int64_t delay_ts = do_task_ts - init_task_ts_;
constexpr int64_t MAX_DELAY_TIME = 100 * 1000; constexpr int64_t MAX_DELAY_TIME = 100 * 1000;
IPalfHandleImplGuard guard;
int64_t palf_epoch = -1;
if (delay_ts >= MAX_DELAY_TIME) { if (delay_ts >= MAX_DELAY_TIME) {
PALF_LOG(INFO, "[io delay]", K(do_task_ts), K(delay_ts)); PALF_LOG(INFO, "[io delay]", K(do_task_ts), K(delay_ts));
} }
if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
if (OB_FAIL(do_task_(tg_id, palf_env_impl))) { PALF_LOG(WARN, "get_palf_handle_impl failed", KPC(this));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "get_palf_epoch failed", KPC(this));
} else if (palf_epoch != palf_epoch_) {
ret = OB_STATE_NOT_MATCH;
PALF_LOG(WARN, "palf_epoch has been changed, drop task", KPC(this), K(palf_epoch));
} else if (OB_FAIL(do_task_(tg_id, guard))) {
PALF_LOG(WARN, "do_task_ failed", K(ret), K(tg_id), KPC(palf_env_impl)); PALF_LOG(WARN, "do_task_ failed", K(ret), K(tg_id), KPC(palf_env_impl));
} } else {}
return ret; return ret;
} }
// NB: after after_consume, the caller needs free LogIOTask. // NB: after after_consume, the caller(LogIOCb) needs free LogIOTask.
int LogIOTask::after_consume(IPalfEnvImpl *palf_env_impl) int LogIOTask::after_consume(IPalfEnvImpl *palf_env_impl)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t after_consume_ts = ObTimeUtility::current_time(); int64_t after_consume_ts = ObTimeUtility::current_time();
const int64_t delay_ts = after_consume_ts - push_cb_into_cb_pool_ts_; const int64_t delay_ts = after_consume_ts - push_cb_into_cb_pool_ts_;
if (OB_FAIL(after_consume_(palf_env_impl))) { int64_t palf_epoch = -1;
PALF_LOG(WARN, "after_consume_ failed", K(ret), KPC(palf_env_impl)); IPalfHandleImplGuard guard;
}
constexpr int64_t MAX_DELAY_TIME = 100 * 1000; constexpr int64_t MAX_DELAY_TIME = 100 * 1000;
if (delay_ts >= MAX_DELAY_TIME) { if (delay_ts >= MAX_DELAY_TIME) {
PALF_LOG(INFO, "[io delay]", K(after_consume_ts), K(delay_ts)); PALF_LOG(INFO, "[io delay]", K(after_consume_ts), K(delay_ts));
} }
if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
ret = OB_STATE_NOT_MATCH;
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), KPC(this));
} else if (OB_FAIL(after_consume_(guard))) {
PALF_LOG(WARN, "after_consume_ failed", K(ret), KPC(palf_env_impl));
} else {}
return ret; return ret;
} }
@ -174,22 +191,13 @@ void LogIOFlushLogTask::destroy()
} }
} }
int LogIOFlushLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) int LogIOFlushLogTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
const LSN flush_log_end_lsn = flush_log_cb_ctx_.lsn_ + flush_log_cb_ctx_.total_len_; const LSN flush_log_end_lsn = flush_log_cb_ctx_.lsn_ + flush_log_cb_ctx_.total_len_;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlusLoghTask not inited", K(ret)); PALF_LOG(ERROR, "LogIOFlusLoghTask not inited", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(flush_log_cb_ctx_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_append_log( } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_append_log(
flush_log_cb_ctx_.lsn_, write_buf_, flush_log_cb_ctx_.scn_))) { flush_log_cb_ctx_.lsn_, write_buf_, flush_log_cb_ctx_.scn_))) {
PALF_LOG(ERROR, "LogEngine pwrite failed", K(ret), K(write_buf_)); PALF_LOG(ERROR, "LogEngine pwrite failed", K(ret), K(write_buf_));
@ -203,25 +211,13 @@ int LogIOFlushLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
return ret; return ret;
} }
// NB: the memory of 'this' will be release int LogIOFlushLogTask::after_consume_(IPalfHandleImplGuard &guard)
int LogIOFlushLogTask::after_consume_(IPalfEnvImpl *palf_env_impl)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
common::ObTimeGuard time_guard("after_consume log", 10 * 1000); common::ObTimeGuard time_guard("after_consume log", 10 * 1000);
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushLogTask not inited", K(ret), KPC(this)); PALF_LOG(ERROR, "LogIOFlushLogTask not inited", K(ret), KPC(this));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(flush_log_cb_ctx_));
// NB: the memory of 'this' has released after 'inner_after_flush_log', don't use any
// fields about 'this'.
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flush_log(flush_log_cb_ctx_))) { } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flush_log(flush_log_cb_ctx_))) {
PALF_LOG(WARN, "PalfHandleImpl after_flush_log failed", K(ret)); PALF_LOG(WARN, "PalfHandleImpl after_flush_log failed", K(ret));
} else { } else {
@ -270,20 +266,11 @@ void LogIOTruncateLogTask::destroy()
} }
} }
int LogIOTruncateLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) int LogIOTruncateLogTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(truncate_log_cb_ctx_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_truncate_log(truncate_log_cb_ctx_.lsn_))) { } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_truncate_log(truncate_log_cb_ctx_.lsn_))) {
PALF_LOG(WARN, "PalfHandleImpl inner_truncate_log failed", K(ret), K(palf_id_), PALF_LOG(WARN, "PalfHandleImpl inner_truncate_log failed", K(ret), K(palf_id_),
K_(truncate_log_cb_ctx)); K_(truncate_log_cb_ctx));
@ -294,23 +281,13 @@ int LogIOTruncateLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
return ret; return ret;
} }
int LogIOTruncateLogTask::after_consume_(IPalfEnvImpl *palf_env_impl) int LogIOTruncateLogTask::after_consume_(IPalfHandleImplGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t palf_epoch = -1; int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogITruncateLogTask not inited!!!", K(ret)); PALF_LOG(ERROR, "LogITruncateLogTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
// NB: the memory of 'this' has released after 'inner_after_truncate_log', don't use any
// fields about 'this'.
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(truncate_log_cb_ctx_));
} else if (OB_FAIL( } else if (OB_FAIL(
guard.get_palf_handle_impl()->inner_after_truncate_log(truncate_log_cb_ctx_))) { guard.get_palf_handle_impl()->inner_after_truncate_log(truncate_log_cb_ctx_))) {
PALF_LOG(WARN, "PalfHandleImpl inner_after_truncate_log failed", K(ret)); PALF_LOG(WARN, "PalfHandleImpl inner_after_truncate_log failed", K(ret));
@ -373,21 +350,12 @@ void LogIOFlushMetaTask::destroy()
} }
} }
int LogIOFlushMetaTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) int LogIOFlushMetaTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret)); PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(flush_meta_cb_ctx_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_append_meta(buf_, buf_len_))) { } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_append_meta(buf_, buf_len_))) {
PALF_LOG(ERROR, "PalfHandleImpl inner_append_meta failed", K(ret), K(palf_id_)); PALF_LOG(ERROR, "PalfHandleImpl inner_append_meta failed", K(ret), K(palf_id_));
} else if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) { } else if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) {
@ -397,23 +365,12 @@ int LogIOFlushMetaTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
return ret; return ret;
} }
int LogIOFlushMetaTask::after_consume_(IPalfEnvImpl *palf_env_impl) int LogIOFlushMetaTask::after_consume_(IPalfHandleImplGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret)); PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(flush_meta_cb_ctx_));
// NB: the memory of 'this' has released after 'inner_after_flush_meta', don't use any
// fields about 'this'.
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flush_meta(flush_meta_cb_ctx_))) { } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flush_meta(flush_meta_cb_ctx_))) {
PALF_LOG(WARN, "PalfHandleImpl after_flush_meta failed", K(ret), KP(this)); PALF_LOG(WARN, "PalfHandleImpl after_flush_meta failed", K(ret), KP(this));
} else { } else {
@ -460,21 +417,12 @@ void LogIOTruncatePrefixBlocksTask::destroy()
} }
} }
int LogIOTruncatePrefixBlocksTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) int LogIOTruncatePrefixBlocksTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOTruncatePrefixBlocksTask not inited!!!", K(ret)); PALF_LOG(ERROR, "LogIOTruncatePrefixBlocksTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(truncate_prefix_blocks_ctx_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_truncate_prefix_blocks( } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_truncate_prefix_blocks(
truncate_prefix_blocks_ctx_.lsn_))) { truncate_prefix_blocks_ctx_.lsn_))) {
PALF_LOG(ERROR, "PalfHandleImpl inner_truncate_prefix_blocks failed", K(ret), K(palf_id_)); PALF_LOG(ERROR, "PalfHandleImpl inner_truncate_prefix_blocks failed", K(ret), K(palf_id_));
@ -485,23 +433,12 @@ int LogIOTruncatePrefixBlocksTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_im
return ret; return ret;
} }
int LogIOTruncatePrefixBlocksTask::after_consume_(IPalfEnvImpl *palf_env_impl) int LogIOTruncatePrefixBlocksTask::after_consume_(IPalfHandleImplGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret)); PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(truncate_prefix_blocks_ctx_));
// NB: the memory of 'this' has released after 'inner_after_truncate_prefix_blocks', don't use
// any fields about 'this'.
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_truncate_prefix_blocks( } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_truncate_prefix_blocks(
truncate_prefix_blocks_ctx_))) { truncate_prefix_blocks_ctx_))) {
PALF_LOG(WARN, "PalfHandleImpl inner_after_truncate_prefix_blocks failed", K(ret)); PALF_LOG(WARN, "PalfHandleImpl inner_after_truncate_prefix_blocks failed", K(ret));
@ -599,7 +536,6 @@ int BatchLogIOFlushLogTask::push_back(LogIOFlushLogTask *task)
int BatchLogIOFlushLogTask::do_task(int tg_id, IPalfEnvImpl *palf_env_impl) int BatchLogIOFlushLogTask::do_task(int tg_id, IPalfEnvImpl *palf_env_impl)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
// 释放内存
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret), KPC(this)); PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret), KPC(this));
@ -666,6 +602,8 @@ int BatchLogIOFlushLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
} else if (palf_epoch != io_task->get_palf_epoch()) { } else if (palf_epoch != io_task->get_palf_epoch()) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch), PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
KPC(io_task), KPC(io_task)); KPC(io_task), KPC(io_task));
io_task->free_this(palf_env_impl);
io_task_array_[i] = NULL;
} else if (OB_FAIL(log_write_buf_array_.push_back(&io_task->write_buf_))) { } else if (OB_FAIL(log_write_buf_array_.push_back(&io_task->write_buf_))) {
PALF_LOG(ERROR, "log_write_buf_array_ push_back failed, unexpected error!!!", K(ret), PALF_LOG(ERROR, "log_write_buf_array_ push_back failed, unexpected error!!!", K(ret),
KPC(this)); KPC(this));
@ -702,7 +640,8 @@ void BatchLogIOFlushLogTask::clear_memory_(IPalfEnvImpl *palf_env_impl)
for (int64_t i = 0; i < count; i++) { for (int64_t i = 0; i < count; i++) {
LogIOFlushLogTask *task = io_task_array_[i]; LogIOFlushLogTask *task = io_task_array_[i];
if (NULL != task) { if (NULL != task) {
palf_env_impl->get_log_allocator()->free_log_io_flush_log_task(task); task->free_this(palf_env_impl);
io_task_array_[i] = NULL;
} }
} }
} }
@ -746,37 +685,28 @@ void LogIOFlashbackTask::destroy()
} }
} }
int LogIOFlashbackTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) int LogIOFlashbackTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{ {
UNUSED(tg_id);
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlashbackTask not inited!!!", K(ret)); PALF_LOG(ERROR, "LogIOFlashbackTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl( palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_flashback(flashback_ctx_.flashback_scn_))) { } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_flashback(flashback_ctx_.flashback_scn_))) {
PALF_LOG(ERROR, "PalfHandleImpl inner_flashback failed", K(ret), K(palf_id_)); PALF_LOG(WARN, "inner_flashback failed", KPC(this));
} else if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) { } else if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) {
PALF_LOG(WARN, "LogIOFlashbackTask after_consume", K(ret), KPC(palf_env_impl)); PALF_LOG(WARN, "push_flush_cb_to_thread_pool_ failed", K(ret));
} else { } else {
PALF_LOG(INFO, "LogIOFlashbackTask do_task success", K(ret), K(palf_id_)); PALF_LOG(INFO, "LogIOFlashbackTask do_task success", K(ret), K(palf_id_));
} }
return ret; return ret;
} }
int LogIOFlashbackTask::after_consume_(IPalfEnvImpl *palf_env_impl) int LogIOFlashbackTask::after_consume_(IPalfHandleImplGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret)); PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
// NB: the memory of 'this' has released after 'inner_after_flush_meta', don't use any
// fields about 'this'.
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flashback(flashback_ctx_))) { } else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flashback(flashback_ctx_))) {
PALF_LOG(ERROR, "PalfHandleImpl inner_after_flashback failed", K(ret)); PALF_LOG(ERROR, "PalfHandleImpl inner_after_flashback failed", K(ret));
} else { } else {
@ -829,21 +759,17 @@ int LogIOPurgeThrottlingTask::init(const PurgeThrottlingCbCtx & purge_ctx)
return ret; return ret;
} }
int LogIOPurgeThrottlingTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) int LogIOPurgeThrottlingTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{ {
UNUSED(guard);
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
PALF_LOG(INFO, "process PurgeThrottlingTask", KPC(this)); ret = push_task_into_cb_thread_pool_(tg_id, this);
if (OB_ISNULL(palf_env_impl->get_log_allocator())) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "log_allocator is NULL", KPC(this));
} else {
palf_env_impl->get_log_allocator()->free_log_io_purge_throttling_task(this);
}
return ret; return ret;
} }
int LogIOPurgeThrottlingTask::after_consume_(IPalfEnvImpl *palf_env_impl) int LogIOPurgeThrottlingTask::after_consume_(IPalfHandleImplGuard &guard)
{ {
UNUSED(guard);
return OB_SUCCESS; return OB_SUCCESS;
} }
@ -852,7 +778,7 @@ void LogIOPurgeThrottlingTask::free_this_(IPalfEnvImpl *palf_env_impl)
if (OB_ISNULL(palf_env_impl) || OB_ISNULL(palf_env_impl->get_log_allocator())) { if (OB_ISNULL(palf_env_impl) || OB_ISNULL(palf_env_impl->get_log_allocator())) {
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "palf_env_impl or log_allocator is NULL", KPC(this), KP(palf_env_impl)) LOG_ERROR_RET(OB_ERR_UNEXPECTED, "palf_env_impl or log_allocator is NULL", KPC(this), KP(palf_env_impl))
} else { } else {
palf_env_impl->get_log_allocator()->free_log_io_purge_throttling_task(this); palf_env_impl->get_log_allocator()->free_log_io_purge_throttling_task(this);
} }
} }

View File

@ -36,6 +36,7 @@ enum class LogIOTaskType
PURGE_THROTTLING_TYPE = 6, PURGE_THROTTLING_TYPE = 6,
}; };
class IPalfHandleImplGuard;
class LogIOTask; class LogIOTask;
int push_task_into_cb_thread_pool(const int64_t tg_id, LogIOTask *io_task); int push_task_into_cb_thread_pool(const int64_t tg_id, LogIOTask *io_task);
@ -66,8 +67,8 @@ public:
K(submit_seq_)); K(submit_seq_));
protected: protected:
virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) = 0; virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard) = 0;
virtual int after_consume_(IPalfEnvImpl *palf_env_impl) = 0; virtual int after_consume_(IPalfHandleImplGuard &guard) = 0;
virtual LogIOTaskType get_io_task_type_() const = 0; virtual LogIOTaskType get_io_task_type_() const = 0;
virtual void free_this_(IPalfEnvImpl *palf_env_impl) = 0; virtual void free_this_(IPalfEnvImpl *palf_env_impl) = 0;
virtual int64_t get_io_size_() const = 0; virtual int64_t get_io_size_() const = 0;
@ -98,9 +99,9 @@ public:
INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(write_buf), K_(flush_log_cb_ctx), K(is_inited_)); INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(write_buf), K_(flush_log_cb_ctx), K(is_inited_));
private: private:
// IO thread will call this function to flush log // IO thread will call this function to flush log
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
// IO thread will call this function to submit async task // IO thread will call this function to submit async task
int after_consume_(IPalfEnvImpl *palf_env_impl) override final; int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLUSH_LOG_TYPE; } LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLUSH_LOG_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final; void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final; int64_t get_io_size_() const override final;
@ -122,8 +123,8 @@ public:
INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(truncate_log_cb_ctx)); INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(truncate_log_cb_ctx));
private: private:
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
int after_consume_(IPalfEnvImpl *palf_env_impl) override final; int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::TRUNCATE_LOG_TYPE; } LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::TRUNCATE_LOG_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final; void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final {return 0;} int64_t get_io_size_() const override final {return 0;}
@ -146,8 +147,8 @@ public:
INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(flush_meta_cb_ctx)); INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(flush_meta_cb_ctx));
private: private:
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
int after_consume_(IPalfEnvImpl *palf_env_impl) override final; int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLUSH_META_TYPE; } LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLUSH_META_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final; void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final {return buf_len_;} int64_t get_io_size_() const override final {return buf_len_;}
@ -170,8 +171,8 @@ public:
INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(truncate_prefix_blocks_ctx)); INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(truncate_prefix_blocks_ctx));
private: private:
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
int after_consume_(IPalfEnvImpl *palf_env_impl) override final; int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::TRUNCATE_PREFIX_TYPE; } LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::TRUNCATE_PREFIX_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final; void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final {return 0;} int64_t get_io_size_() const override final {return 0;}
@ -225,8 +226,8 @@ public:
void destroy(); void destroy();
TO_STRING_KV(K_(palf_id), K_(flashback_ctx), K_(is_inited)); TO_STRING_KV(K_(palf_id), K_(flashback_ctx), K_(is_inited));
private: private:
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
int after_consume_(IPalfEnvImpl *palf_env_impl) override final; int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLASHBACK_LOG_TYPE; } LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLASHBACK_LOG_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final; void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final {return 0;} int64_t get_io_size_() const override final {return 0;}
@ -246,8 +247,8 @@ public:
void destroy(); void destroy();
INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(purge_ctx), K_(is_inited)); INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(purge_ctx), K_(is_inited));
private: private:
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final; int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
int after_consume_(IPalfEnvImpl *palf_env_impl) override final; int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::PURGE_THROTTLING_TYPE; } LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::PURGE_THROTTLING_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final; void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final {return 0;} int64_t get_io_size_() const override final {return 0;}

View File

@ -324,7 +324,6 @@ int LogIOWorker::reduce_io_task_(void *task)
} }
if (false == last_io_task_has_been_reduced && OB_NOT_NULL(io_task)) { if (false == last_io_task_has_been_reduced && OB_NOT_NULL(io_task)) {
io_task = reinterpret_cast<LogIOFlushLogTask *>(io_task);
ret = handle_io_task_(io_task); ret = handle_io_task_(io_task);
} }
PALF_LOG(TRACE, "reduce_io_task_ finished", K(ret), K(tmp_ret), KPC(this)); PALF_LOG(TRACE, "reduce_io_task_ finished", K(ret), K(tmp_ret), KPC(this));