fixed memory leak when palf epoch changed.

This commit is contained in:
HaHaJeff
2023-12-11 09:17:20 +00:00
committed by ob-robot
parent 385a1d50d8
commit 870bc8a720
6 changed files with 245 additions and 188 deletions

View File

@ -258,15 +258,15 @@ private:
class IOTaskCond : public LogIOTask {
public:
IOTaskCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {}
virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final
IOTaskCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch), count_(0) {}
virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final
{
PALF_LOG(INFO, "before cond_wait");
cond_.wait();
PALF_LOG(INFO, "after cond_wait");
return OB_SUCCESS;
};
virtual int after_consume_(IPalfEnvImpl *palf_env_impl) override final
virtual int after_consume_(IPalfHandleImplGuard &guard) override final
{
return OB_SUCCESS;
}
@ -280,37 +280,60 @@ public:
virtual int64_t get_io_size_() const {return 0;}
bool need_purge_throttling_() const {return true;}
ObCond cond_;
int64_t count_;
};
class IOTaskConsumeCond : public LogIOTask {
public:
IOTaskConsumeCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {}
virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final
{
int ret = OB_SUCCESS;
PALF_LOG(INFO, "do_task_ success");
if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) {
PALF_LOG(WARN, "push_task_into_cb_thread_pool failed", K(ret), K(tg_id), KP(this));
}
return ret;
};
virtual int after_consume_(IPalfHandleImplGuard &guard) override final
{
PALF_LOG(INFO, "before cond_wait");
cond_.wait();
PALF_LOG(INFO, "after cond_wait");
return OB_SUCCESS;
}
virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; }
int init(int64_t palf_id)
{
palf_id_ = palf_id;
return OB_SUCCESS;
};
virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);}
virtual int64_t get_io_size_() const {return 0;}
bool need_purge_throttling_() const {return true;}
ObCond cond_;
};
class IOTaskConsumeCond : public LogIOTask {
class IOTaskVerify : public LogIOTask {
public:
IOTaskConsumeCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {}
virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final
IOTaskVerify(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch), count_(0), after_consume_count_(0) {}
virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
PALF_LOG(INFO, "do_task_ success");
if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) {
PALF_LOG(WARN, "push_task_into_cb_thread_pool failed", K(ret), K(tg_id), KP(this));
}
return ret;
};
virtual int after_consume_(IPalfEnvImpl *palf_env_impl) override final
{
PALF_LOG(INFO, "before cond_wait");
cond_.wait();
PALF_LOG(INFO, "after cond_wait");
count_ ++;
return OB_SUCCESS;
}
};
virtual int after_consume_(IPalfHandleImplGuard &guard) { return OB_SUCCESS; }
virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; }
virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);}
int64_t get_io_size_() const {return 0;}
bool need_purge_throttling_() const {return true;}
int init(int64_t palf_id)
{
palf_id_ = palf_id;
return OB_SUCCESS;
};
virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);}
virtual int64_t get_io_size_() const {return 0;}
bool need_purge_throttling_() const {return true;}
ObCond cond_;
int64_t count_;
int64_t after_consume_count_;
};
} // end namespace unittest
} // end namespace oceanbase

View File

@ -432,28 +432,6 @@ TEST_F(TestObSimpleLogClusterLogEngine, exception_path)
}
class IOTaskVerify : public LogIOTask {
public:
IOTaskVerify(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch), count_(0), after_consume_count_(0) {}
virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
{
count_ ++;
return OB_SUCCESS;
};
virtual int after_consume_(IPalfEnvImpl *palf_env_impl) { return OB_SUCCESS; }
virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; }
virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);}
int64_t get_io_size_() const {return 0;}
bool need_purge_throttling_() const {return true;}
int init(int64_t palf_id)
{
palf_id_ = palf_id;
return OB_SUCCESS;
};
int64_t count_;
int64_t after_consume_count_;
};
TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
{
SET_CASE_LOG_FILE(TEST_NAME, "io_reducer_func");
@ -517,9 +495,9 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
int64_t prev_log_id_2 = 0;
int64_t leader_idx_2 = 0;
PalfHandleImplGuard leader_2;
IOTaskCond io_task_cond_2(id_2, log_engine->palf_epoch_);
IOTaskVerify io_task_verify_2(id_2, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_2, leader_idx_2, leader_2));
IOTaskCond io_task_cond_2(id_2, leader_2.get_palf_handle_impl()->log_engine_.palf_epoch_);
IOTaskVerify io_task_verify_2(id_2, leader_2.get_palf_handle_impl()->log_engine_.palf_epoch_);
{
LogIOWorker *log_io_worker = leader_2.palf_handle_impl_->log_engine_.log_io_worker_;
// 聚合度为1的忽略
@ -537,6 +515,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
LSN max_lsn_1 = leader_1.palf_handle_impl_->sw_.get_max_lsn();
const int64_t log_id_2 = leader_2.palf_handle_impl_->sw_.get_max_log_id();
LSN max_lsn_2 = leader_2.palf_handle_impl_->sw_.get_max_lsn();
sleep(1);
io_task_cond_2.cond_.signal();
wait_lsn_until_flushed(max_lsn_1, leader_1);
wait_lsn_until_flushed(max_lsn_2, leader_2);
@ -586,9 +565,9 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
int64_t leader_idx_3 = 0;
int64_t prev_log_id_3 = 0;
PalfHandleImplGuard leader_3;
IOTaskCond io_task_cond_3(id_3, log_engine->palf_epoch_);
IOTaskVerify io_task_verify_3(id_3, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_3, leader_idx_3, leader_3));
IOTaskCond io_task_cond_3(id_3, leader_3.get_palf_handle_impl()->log_engine_.palf_epoch_);
IOTaskVerify io_task_verify_3(id_3, leader_3.get_palf_handle_impl()->log_engine_.palf_epoch_);
{
LogIOWorker *log_io_worker = leader_3.palf_handle_impl_->log_engine_.log_io_worker_;
EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3));
@ -634,9 +613,9 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
int64_t leader_idx_4 = 0;
int64_t prev_log_id_4 = 0;
PalfHandleImplGuard leader_4;
IOTaskCond io_task_cond_4(id_4, log_engine->palf_epoch_);
IOTaskVerify io_task_verify_4(id_4, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_4, leader_idx_4, leader_4));
IOTaskCond io_task_cond_4(id_4, leader_4.get_palf_handle_impl()->log_engine_.palf_epoch_);
IOTaskVerify io_task_verify_4(id_4, leader_4.get_palf_handle_impl()->log_engine_.palf_epoch_);
{
LogIOWorker *log_io_worker = leader_4.palf_handle_impl_->log_engine_.log_io_worker_;
EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_4));
@ -656,7 +635,6 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
io_task_cond_4.cond_.signal();
LSN log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
PALF_LOG(INFO, "after signal", K(max_lsn), K(log_tail));
wait_lsn_until_flushed(max_lsn, leader_4);
sleep(1);
log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
PALF_LOG(INFO, "after flused case 4", K(max_lsn), K(log_tail));

View File

@ -26,6 +26,7 @@
#include "logservice/palf/log_group_entry_header.h"
#include "logservice/palf/log_io_worker.h"
#include "logservice/palf/lsn.h"
#include <thread>
const std::string TEST_NAME = "single_replica";
@ -1723,6 +1724,135 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback)
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_iow_memleak)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_iow");
OB_LOGGER.set_log_level("INFO");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
// case1: palf epoch has been changed during do_task
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_;
IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_;
ObILogAllocator *allocator = palf_env_impl->get_log_allocator();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn();
IOTaskCond cond(id, leader.palf_env_impl_->last_palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&cond));
sleep(1);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_NE(0, allocator->flying_log_task_);
EXPECT_NE(0, allocator->flying_meta_task_);
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
cond.cond_.signal();
PALF_LOG(INFO, "runlin trace submit log 1");
while (iow->queue_.size() > 0) {
PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size());
sleep(1);
}
EXPECT_EQ(0, allocator->flying_log_task_);
EXPECT_EQ(0, allocator->flying_meta_task_);
}
delete_paxos_group(id);
// case2: palf epoch has been changed during after_consume
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_;
IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_;
ObILogAllocator *allocator = palf_env_impl->get_log_allocator();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn();
IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond));
sleep(1);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_NE(0, allocator->flying_log_task_);
EXPECT_NE(0, allocator->flying_meta_task_);
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
consume_cond.cond_.signal();
PALF_LOG(INFO, "runlin trace submit log 2");
IOTaskVerify verify(id, leader.get_palf_handle_impl()->log_engine_.palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&verify));
while (verify.count_ == 0) {
PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size());
sleep(1);
}
EXPECT_EQ(0, allocator->flying_log_task_);
EXPECT_EQ(0, allocator->flying_meta_task_);
}
delete_paxos_group(id);
// case3: palf epoch has been changed during do_task when there is no io reduce
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_;
IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_;
bool need_stop = false;
std::thread throttling_th([palf_env_impl, &need_stop](){
PalfEnvImpl *impl = dynamic_cast<PalfEnvImpl*>(palf_env_impl);
while (!need_stop) {
impl->log_io_worker_wrapper_.notify_need_writing_throttling(true);
usleep(1000);
}
});
ObILogAllocator *allocator = palf_env_impl->get_log_allocator();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn();
// case2: palf epoch has been changed during after_consume
IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond));
sleep(1);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_NE(0, allocator->flying_log_task_);
EXPECT_NE(0, allocator->flying_meta_task_);
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
consume_cond.cond_.signal();
PALF_LOG(INFO, "runlin trace submit log 3");
IOTaskVerify verify(id, leader.get_palf_handle_impl()->log_engine_.palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&verify));
while (verify.count_ == 0) {
PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size());
sleep(1);
}
EXPECT_EQ(0, allocator->flying_log_task_);
EXPECT_EQ(0, allocator->flying_meta_task_);
need_stop = true;
throttling_th.join();
}
}
} // namespace unittest
} // namespace oceanbase

View File

@ -73,36 +73,53 @@ void LogIOTask::reset()
submit_seq_ = 0;
}
// NB: if do_task failed, the caller is responsible for freeing LogIOTask.
// NB: if do_task failed, the caller(LogIOWorker) is responsible for freeing LogIOTask.
int LogIOTask::do_task(int tg_id, IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
int64_t do_task_ts = ObTimeUtility::current_time();
const int64_t delay_ts = do_task_ts - init_task_ts_;
constexpr int64_t MAX_DELAY_TIME = 100 * 1000;
IPalfHandleImplGuard guard;
int64_t palf_epoch = -1;
if (delay_ts >= MAX_DELAY_TIME) {
PALF_LOG(INFO, "[io delay]", K(do_task_ts), K(delay_ts));
}
if (OB_FAIL(do_task_(tg_id, palf_env_impl))) {
if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "get_palf_handle_impl failed", KPC(this));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "get_palf_epoch failed", KPC(this));
} else if (palf_epoch != palf_epoch_) {
ret = OB_STATE_NOT_MATCH;
PALF_LOG(WARN, "palf_epoch has been changed, drop task", KPC(this), K(palf_epoch));
} else if (OB_FAIL(do_task_(tg_id, guard))) {
PALF_LOG(WARN, "do_task_ failed", K(ret), K(tg_id), KPC(palf_env_impl));
}
} else {}
return ret;
}
// NB: after after_consume, the caller needs free LogIOTask.
// NB: after after_consume, the caller(LogIOCb) needs free LogIOTask.
int LogIOTask::after_consume(IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
int64_t after_consume_ts = ObTimeUtility::current_time();
const int64_t delay_ts = after_consume_ts - push_cb_into_cb_pool_ts_;
if (OB_FAIL(after_consume_(palf_env_impl))) {
PALF_LOG(WARN, "after_consume_ failed", K(ret), KPC(palf_env_impl));
}
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
constexpr int64_t MAX_DELAY_TIME = 100 * 1000;
if (delay_ts >= MAX_DELAY_TIME) {
PALF_LOG(INFO, "[io delay]", K(after_consume_ts), K(delay_ts));
}
if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
ret = OB_STATE_NOT_MATCH;
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), KPC(this));
} else if (OB_FAIL(after_consume_(guard))) {
PALF_LOG(WARN, "after_consume_ failed", K(ret), KPC(palf_env_impl));
} else {}
return ret;
}
@ -174,22 +191,13 @@ void LogIOFlushLogTask::destroy()
}
}
int LogIOFlushLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
int LogIOFlushLogTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
const LSN flush_log_end_lsn = flush_log_cb_ctx_.lsn_ + flush_log_cb_ctx_.total_len_;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlusLoghTask not inited", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(flush_log_cb_ctx_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_append_log(
flush_log_cb_ctx_.lsn_, write_buf_, flush_log_cb_ctx_.scn_))) {
PALF_LOG(ERROR, "LogEngine pwrite failed", K(ret), K(write_buf_));
@ -203,25 +211,13 @@ int LogIOFlushLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
return ret;
}
// NB: the memory of 'this' will be release
int LogIOFlushLogTask::after_consume_(IPalfEnvImpl *palf_env_impl)
int LogIOFlushLogTask::after_consume_(IPalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
common::ObTimeGuard time_guard("after_consume log", 10 * 1000);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushLogTask not inited", K(ret), KPC(this));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(flush_log_cb_ctx_));
// NB: the memory of 'this' has released after 'inner_after_flush_log', don't use any
// fields about 'this'.
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flush_log(flush_log_cb_ctx_))) {
PALF_LOG(WARN, "PalfHandleImpl after_flush_log failed", K(ret));
} else {
@ -270,20 +266,11 @@ void LogIOTruncateLogTask::destroy()
}
}
int LogIOTruncateLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
int LogIOTruncateLogTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(truncate_log_cb_ctx_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_truncate_log(truncate_log_cb_ctx_.lsn_))) {
PALF_LOG(WARN, "PalfHandleImpl inner_truncate_log failed", K(ret), K(palf_id_),
K_(truncate_log_cb_ctx));
@ -294,23 +281,13 @@ int LogIOTruncateLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
return ret;
}
int LogIOTruncateLogTask::after_consume_(IPalfEnvImpl *palf_env_impl)
int LogIOTruncateLogTask::after_consume_(IPalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogITruncateLogTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
// NB: the memory of 'this' has released after 'inner_after_truncate_log', don't use any
// fields about 'this'.
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(truncate_log_cb_ctx_));
} else if (OB_FAIL(
guard.get_palf_handle_impl()->inner_after_truncate_log(truncate_log_cb_ctx_))) {
PALF_LOG(WARN, "PalfHandleImpl inner_after_truncate_log failed", K(ret));
@ -373,21 +350,12 @@ void LogIOFlushMetaTask::destroy()
}
}
int LogIOFlushMetaTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
int LogIOFlushMetaTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(flush_meta_cb_ctx_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_append_meta(buf_, buf_len_))) {
PALF_LOG(ERROR, "PalfHandleImpl inner_append_meta failed", K(ret), K(palf_id_));
} else if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) {
@ -397,23 +365,12 @@ int LogIOFlushMetaTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
return ret;
}
int LogIOFlushMetaTask::after_consume_(IPalfEnvImpl *palf_env_impl)
int LogIOFlushMetaTask::after_consume_(IPalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(flush_meta_cb_ctx_));
// NB: the memory of 'this' has released after 'inner_after_flush_meta', don't use any
// fields about 'this'.
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flush_meta(flush_meta_cb_ctx_))) {
PALF_LOG(WARN, "PalfHandleImpl after_flush_meta failed", K(ret), KP(this));
} else {
@ -460,21 +417,12 @@ void LogIOTruncatePrefixBlocksTask::destroy()
}
}
int LogIOTruncatePrefixBlocksTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
int LogIOTruncatePrefixBlocksTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOTruncatePrefixBlocksTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(truncate_prefix_blocks_ctx_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_truncate_prefix_blocks(
truncate_prefix_blocks_ctx_.lsn_))) {
PALF_LOG(ERROR, "PalfHandleImpl inner_truncate_prefix_blocks failed", K(ret), K(palf_id_));
@ -485,23 +433,12 @@ int LogIOTruncatePrefixBlocksTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_im
return ret;
}
int LogIOTruncatePrefixBlocksTask::after_consume_(IPalfEnvImpl *palf_env_impl)
int LogIOTruncatePrefixBlocksTask::after_consume_(IPalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
K(truncate_prefix_blocks_ctx_));
// NB: the memory of 'this' has released after 'inner_after_truncate_prefix_blocks', don't use
// any fields about 'this'.
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_truncate_prefix_blocks(
truncate_prefix_blocks_ctx_))) {
PALF_LOG(WARN, "PalfHandleImpl inner_after_truncate_prefix_blocks failed", K(ret));
@ -599,7 +536,6 @@ int BatchLogIOFlushLogTask::push_back(LogIOFlushLogTask *task)
int BatchLogIOFlushLogTask::do_task(int tg_id, IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
// 释放内存
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret), KPC(this));
@ -666,6 +602,8 @@ int BatchLogIOFlushLogTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
} else if (palf_epoch != io_task->get_palf_epoch()) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_id_), K(palf_epoch),
KPC(io_task), KPC(io_task));
io_task->free_this(palf_env_impl);
io_task_array_[i] = NULL;
} else if (OB_FAIL(log_write_buf_array_.push_back(&io_task->write_buf_))) {
PALF_LOG(ERROR, "log_write_buf_array_ push_back failed, unexpected error!!!", K(ret),
KPC(this));
@ -702,7 +640,8 @@ void BatchLogIOFlushLogTask::clear_memory_(IPalfEnvImpl *palf_env_impl)
for (int64_t i = 0; i < count; i++) {
LogIOFlushLogTask *task = io_task_array_[i];
if (NULL != task) {
palf_env_impl->get_log_allocator()->free_log_io_flush_log_task(task);
task->free_this(palf_env_impl);
io_task_array_[i] = NULL;
}
}
}
@ -746,37 +685,28 @@ void LogIOFlashbackTask::destroy()
}
}
int LogIOFlashbackTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
int LogIOFlashbackTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{
UNUSED(tg_id);
int ret = OB_SUCCESS;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlashbackTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl( palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_flashback(flashback_ctx_.flashback_scn_))) {
PALF_LOG(ERROR, "PalfHandleImpl inner_flashback failed", K(ret), K(palf_id_));
PALF_LOG(WARN, "inner_flashback failed", KPC(this));
} else if (OB_FAIL(push_task_into_cb_thread_pool_(tg_id, this))) {
PALF_LOG(WARN, "LogIOFlashbackTask after_consume", K(ret), KPC(palf_env_impl));
PALF_LOG(WARN, "push_flush_cb_to_thread_pool_ failed", K(ret));
} else {
PALF_LOG(INFO, "LogIOFlashbackTask do_task success", K(ret), K(palf_id_));
}
return ret;
}
int LogIOFlashbackTask::after_consume_(IPalfEnvImpl *palf_env_impl)
int LogIOFlashbackTask::after_consume_(IPalfHandleImplGuard &guard)
{
int ret = OB_SUCCESS;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlushMetaTask not inited!!!", K(ret));
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id_));
// NB: the memory of 'this' has released after 'inner_after_flush_meta', don't use any
// fields about 'this'.
} else if (OB_FAIL(guard.get_palf_handle_impl()->inner_after_flashback(flashback_ctx_))) {
PALF_LOG(ERROR, "PalfHandleImpl inner_after_flashback failed", K(ret));
} else {
@ -829,21 +759,17 @@ int LogIOPurgeThrottlingTask::init(const PurgeThrottlingCbCtx & purge_ctx)
return ret;
}
int LogIOPurgeThrottlingTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
int LogIOPurgeThrottlingTask::do_task_(int tg_id, IPalfHandleImplGuard &guard)
{
UNUSED(guard);
int ret = OB_SUCCESS;
PALF_LOG(INFO, "process PurgeThrottlingTask", KPC(this));
if (OB_ISNULL(palf_env_impl->get_log_allocator())) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "log_allocator is NULL", KPC(this));
} else {
palf_env_impl->get_log_allocator()->free_log_io_purge_throttling_task(this);
}
ret = push_task_into_cb_thread_pool_(tg_id, this);
return ret;
}
int LogIOPurgeThrottlingTask::after_consume_(IPalfEnvImpl *palf_env_impl)
int LogIOPurgeThrottlingTask::after_consume_(IPalfHandleImplGuard &guard)
{
UNUSED(guard);
return OB_SUCCESS;
}

View File

@ -36,6 +36,7 @@ enum class LogIOTaskType
PURGE_THROTTLING_TYPE = 6,
};
class IPalfHandleImplGuard;
class LogIOTask;
int push_task_into_cb_thread_pool(const int64_t tg_id, LogIOTask *io_task);
@ -66,8 +67,8 @@ public:
K(submit_seq_));
protected:
virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) = 0;
virtual int after_consume_(IPalfEnvImpl *palf_env_impl) = 0;
virtual int do_task_(int tg_id, IPalfHandleImplGuard &guard) = 0;
virtual int after_consume_(IPalfHandleImplGuard &guard) = 0;
virtual LogIOTaskType get_io_task_type_() const = 0;
virtual void free_this_(IPalfEnvImpl *palf_env_impl) = 0;
virtual int64_t get_io_size_() const = 0;
@ -98,9 +99,9 @@ public:
INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(write_buf), K_(flush_log_cb_ctx), K(is_inited_));
private:
// IO thread will call this function to flush log
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final;
int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
// IO thread will call this function to submit async task
int after_consume_(IPalfEnvImpl *palf_env_impl) override final;
int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLUSH_LOG_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final;
@ -122,8 +123,8 @@ public:
INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(truncate_log_cb_ctx));
private:
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final;
int after_consume_(IPalfEnvImpl *palf_env_impl) override final;
int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::TRUNCATE_LOG_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final {return 0;}
@ -146,8 +147,8 @@ public:
INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(flush_meta_cb_ctx));
private:
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final;
int after_consume_(IPalfEnvImpl *palf_env_impl) override final;
int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLUSH_META_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final {return buf_len_;}
@ -170,8 +171,8 @@ public:
INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(truncate_prefix_blocks_ctx));
private:
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final;
int after_consume_(IPalfEnvImpl *palf_env_impl) override final;
int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::TRUNCATE_PREFIX_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final {return 0;}
@ -225,8 +226,8 @@ public:
void destroy();
TO_STRING_KV(K_(palf_id), K_(flashback_ctx), K_(is_inited));
private:
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final;
int after_consume_(IPalfEnvImpl *palf_env_impl) override final;
int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::FLASHBACK_LOG_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final {return 0;}
@ -246,8 +247,8 @@ public:
void destroy();
INHERIT_TO_STRING_KV("LogIOTask", LogIOTask, K_(purge_ctx), K_(is_inited));
private:
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final;
int after_consume_(IPalfEnvImpl *palf_env_impl) override final;
int do_task_(int tg_id, IPalfHandleImplGuard &guard) override final;
int after_consume_(IPalfHandleImplGuard &guard) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::PURGE_THROTTLING_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final;
int64_t get_io_size_() const override final {return 0;}

View File

@ -324,7 +324,6 @@ int LogIOWorker::reduce_io_task_(void *task)
}
if (false == last_io_task_has_been_reduced && OB_NOT_NULL(io_task)) {
io_task = reinterpret_cast<LogIOFlushLogTask *>(io_task);
ret = handle_io_task_(io_task);
}
PALF_LOG(TRACE, "reduce_io_task_ finished", K(ret), K(tmp_ret), KPC(this));