Revert palf fs cb deadlock bug fix.
This commit is contained in:
@ -305,26 +305,6 @@ int LogEngine::submit_flush_log_task(const FlushLogCbCtx &flush_log_cb_ctx,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogEngine::submit_sliding_cb_task(const int cb_pool_tg_id, const LogSlidingCbCtx &sliding_cb_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
LogSlidingCbTask *sliding_cb_task = NULL;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
PALF_LOG(ERROR, "LogEngine not inited!!!");
|
||||
} else if (0 >= cb_pool_tg_id || !sliding_cb_ctx.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(ERROR, "Invalid argument!!!", K(cb_pool_tg_id), K(sliding_cb_ctx));
|
||||
} else if (OB_FAIL(generate_sliding_cb_task_(sliding_cb_ctx, sliding_cb_task))) {
|
||||
PALF_LOG(ERROR, "generate_sliding_cb_task_ failed", K(sliding_cb_ctx));
|
||||
} else if (OB_FAIL(push_task_into_cb_thread_pool(cb_pool_tg_id, sliding_cb_task))) {
|
||||
PALF_LOG(ERROR, "submit sliding_cb_task failed");
|
||||
} else {
|
||||
PALF_LOG(TRACE, "submit_sliding_cb_task success", K(ret), K(cb_pool_tg_id), K(sliding_cb_ctx));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogEngine::submit_flush_prepare_meta_task(const FlushMetaCbCtx &flush_meta_cb_ctx,
|
||||
const LogPrepareMeta &prepare_meta)
|
||||
{
|
||||
@ -1212,23 +1192,6 @@ int LogEngine::generate_flush_log_task_(const FlushLogCbCtx &flush_log_cb_ctx,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogEngine::generate_sliding_cb_task_(const LogSlidingCbCtx &sliding_cb_ctx,
|
||||
LogSlidingCbTask *&sliding_cb_task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// Be careful to handle the duration of this pointer
|
||||
sliding_cb_task = NULL;
|
||||
if (!sliding_cb_ctx.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} else if (NULL == (sliding_cb_task = alloc_mgr_->alloc_log_sliding_cb_task(palf_id_, palf_epoch_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
PALF_LOG(ERROR, "alloc_log_sliding_cb_task failed");
|
||||
} else if (OB_FAIL(sliding_cb_task->init(sliding_cb_ctx))) {
|
||||
PALF_LOG(ERROR, "init LogSlidingCbTask failed");
|
||||
} else {
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int LogEngine::generate_truncate_log_task_(const TruncateLogCbCtx &truncate_log_cb_ctx,
|
||||
LogIOTruncateLogTask *&truncate_log_task)
|
||||
{
|
||||
|
||||
@ -36,12 +36,10 @@ class LogIOWorker;
|
||||
class PalfHandleImpl;
|
||||
class LogIOTask;
|
||||
class LogIOFlushLogTask;
|
||||
class LogSlidingCbTask;
|
||||
class LogIOTruncateLogTask;
|
||||
class LogIOFlushMetaTask;
|
||||
class LogIOTruncatePrefixBlocksTask;
|
||||
class FlushLogCbCtx;
|
||||
class LogSlidingCbCtx;
|
||||
class TruncateLogCbCtx;
|
||||
class FlushMetaCbCtx;
|
||||
class TruncatePrefixBlocksCbCtx;
|
||||
@ -115,8 +113,6 @@ public:
|
||||
const int64_t buf_len);
|
||||
|
||||
virtual int submit_flush_log_task(const FlushLogCbCtx &flush_log_cb_ctx, const LogWriteBuf &write_buf);
|
||||
int submit_sliding_cb_task(const int cb_pool_tg_id,
|
||||
const LogSlidingCbCtx &sliding_cb_ctx);
|
||||
|
||||
int submit_flush_prepare_meta_task(const FlushMetaCbCtx &flush_meta_cb_ctx,
|
||||
const LogPrepareMeta &prepare_meta);
|
||||
@ -404,8 +400,6 @@ private:
|
||||
int generate_flush_log_task_(const FlushLogCbCtx &flush_log_cb_ctx,
|
||||
const LogWriteBuf &write_buf,
|
||||
LogIOFlushLogTask *&flush_log_task);
|
||||
int generate_sliding_cb_task_(const LogSlidingCbCtx &sliding_cb_ctx,
|
||||
LogSlidingCbTask *&sliding_cb_task);
|
||||
|
||||
int generate_truncate_log_task_(const TruncateLogCbCtx &truncate_log_cb_ctx,
|
||||
LogIOTruncateLogTask *&truncate_log_task);
|
||||
|
||||
@ -218,77 +218,6 @@ void LogIOFlushLogTask::free_this_(IPalfEnvImpl *palf_env_impl)
|
||||
palf_env_impl->get_log_allocator()->free_log_io_flush_log_task(this);
|
||||
}
|
||||
|
||||
LogSlidingCbTask::LogSlidingCbTask(const int64_t palf_id, const int64_t palf_epoch)
|
||||
: LogIOTask(palf_id, palf_epoch), sliding_cb_ctx_(), is_inited_(false)
|
||||
{}
|
||||
|
||||
LogSlidingCbTask::~LogSlidingCbTask()
|
||||
{
|
||||
destroy();
|
||||
}
|
||||
|
||||
int LogSlidingCbTask::init(const LogSlidingCbCtx &sliding_cb_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
PALF_LOG(ERROR, "LogSlidingCbTask has been inited");
|
||||
} else if (!sliding_cb_ctx.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(ERROR, "Invaild arguments!!!", K(palf_id_), K(palf_epoch_), K(sliding_cb_ctx));
|
||||
} else {
|
||||
sliding_cb_ctx_ = sliding_cb_ctx;
|
||||
is_inited_ = true;
|
||||
PALF_LOG(TRACE, "LogSlidingCbTask init success", K(ret), K(sliding_cb_ctx));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void LogSlidingCbTask::destroy()
|
||||
{
|
||||
if (IS_INIT) {
|
||||
is_inited_ = false;
|
||||
sliding_cb_ctx_.reset();
|
||||
PALF_LOG(TRACE, "LogSlidingCbTask destroy", KP(this));
|
||||
}
|
||||
}
|
||||
|
||||
int LogSlidingCbTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
|
||||
{
|
||||
UNUSED(tg_id);
|
||||
UNUSED(palf_env_impl);
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
|
||||
// NB: the memory of 'this' will be release
|
||||
int LogSlidingCbTask::after_consume_(IPalfEnvImpl *palf_env_impl)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t palf_epoch = -1;
|
||||
IPalfHandleImplGuard guard;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
PALF_LOG(ERROR, "LogIOFlusLoghTask not inited");
|
||||
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
|
||||
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(palf_id_));
|
||||
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
|
||||
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(palf_id_));
|
||||
} else if (palf_epoch != palf_epoch_) {
|
||||
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(palf_id_), K(palf_epoch),
|
||||
K_(sliding_cb_ctx));
|
||||
} else if (OB_FAIL(guard.get_palf_handle_impl()->file_size_cb(sliding_cb_ctx_))) {
|
||||
PALF_LOG(ERROR, "file_size_cb failed", K_(sliding_cb_ctx));
|
||||
} else {}
|
||||
// free this object
|
||||
palf_env_impl->get_log_allocator()->free_log_sliding_cb_task(this);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void LogSlidingCbTask::free_this_(IPalfEnvImpl *palf_env_impl)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
LogIOTruncateLogTask::LogIOTruncateLogTask(const int64_t palf_id, const int64_t palf_epoch)
|
||||
: LogIOTask(palf_id, palf_epoch), truncate_log_cb_ctx_(), is_inited_(false)
|
||||
{}
|
||||
|
||||
@ -32,8 +32,7 @@ enum class LogIOTaskType
|
||||
FLUSH_META_TYPE = 2,
|
||||
TRUNCATE_PREFIX_TYPE = 3,
|
||||
TRUNCATE_LOG_TYPE = 4,
|
||||
FLASHBACK_LOG_TYPE = 5,
|
||||
SLIDING_CB_TYPE = 6
|
||||
FLASHBACK_LOG_TYPE = 5
|
||||
};
|
||||
|
||||
class LogIOTask;
|
||||
@ -210,25 +209,6 @@ private:
|
||||
int64_t palf_id_;
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
class LogSlidingCbTask : public LogIOTask {
|
||||
public:
|
||||
LogSlidingCbTask(const int64_t palf_id,const int64_t palf_epoch);
|
||||
~LogSlidingCbTask();
|
||||
public:
|
||||
int init(const LogSlidingCbCtx &sliding_cb_ctx);
|
||||
void destroy();
|
||||
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final;
|
||||
// IO cb thread will call this function to call fs cb
|
||||
int after_consume_(IPalfEnvImpl *palf_env_impl) override final;
|
||||
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::SLIDING_CB_TYPE; }
|
||||
void free_this_(IPalfEnvImpl *palf_env_impl) override final;
|
||||
INHERIT_TO_STRING_KV("LogSlidingCbTask", LogIOTask, K_(is_inited), K_(sliding_cb_ctx));
|
||||
private:
|
||||
LogSlidingCbCtx sliding_cb_ctx_;
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
} // end namespace palf
|
||||
} // end namespace oceanbase
|
||||
|
||||
|
||||
@ -71,45 +71,6 @@ FlushLogCbCtx& FlushLogCbCtx::operator=(const FlushLogCbCtx &arg)
|
||||
return *this;
|
||||
}
|
||||
|
||||
LogSlidingCbCtx::LogSlidingCbCtx()
|
||||
: palf_id_(INVALID_PALF_ID),
|
||||
log_end_lsn_(),
|
||||
log_proposal_id_(INVALID_PROPOSAL_ID)
|
||||
{}
|
||||
|
||||
LogSlidingCbCtx::LogSlidingCbCtx(const int64_t palf_id,
|
||||
const LSN &log_end_lsn,
|
||||
const int64_t log_proposal_id)
|
||||
: palf_id_(palf_id),
|
||||
log_end_lsn_(log_end_lsn),
|
||||
log_proposal_id_(log_proposal_id)
|
||||
{}
|
||||
|
||||
LogSlidingCbCtx::~LogSlidingCbCtx()
|
||||
{
|
||||
reset();
|
||||
}
|
||||
|
||||
bool LogSlidingCbCtx::is_valid() const
|
||||
{
|
||||
return (is_valid_palf_id(palf_id_) && log_end_lsn_.is_valid() && INVALID_PROPOSAL_ID != log_proposal_id_);
|
||||
}
|
||||
|
||||
void LogSlidingCbCtx::reset()
|
||||
{
|
||||
palf_id_ = INVALID_PALF_ID;
|
||||
log_end_lsn_.reset();
|
||||
log_proposal_id_ = INVALID_PROPOSAL_ID;
|
||||
}
|
||||
|
||||
LogSlidingCbCtx& LogSlidingCbCtx::operator=(const LogSlidingCbCtx &arg)
|
||||
{
|
||||
palf_id_ = arg.palf_id_;
|
||||
log_end_lsn_ = arg.log_end_lsn_;
|
||||
log_proposal_id_ = arg.log_proposal_id_;
|
||||
return *this;
|
||||
}
|
||||
|
||||
TruncateLogCbCtx::TruncateLogCbCtx()
|
||||
: lsn_()
|
||||
{
|
||||
|
||||
@ -46,20 +46,6 @@ struct FlushLogCbCtx
|
||||
int64_t begin_ts_;
|
||||
};
|
||||
|
||||
struct LogSlidingCbCtx
|
||||
{
|
||||
LogSlidingCbCtx();
|
||||
LogSlidingCbCtx(const int64_t palf_id, const LSN &log_end_lsn, const int64_t log_proposal_id);
|
||||
~LogSlidingCbCtx();
|
||||
bool is_valid() const;
|
||||
void reset();
|
||||
LogSlidingCbCtx &operator=(const LogSlidingCbCtx &flush_log_cb_ctx);
|
||||
TO_STRING_KV(K_(palf_id), K_(log_end_lsn), K_(log_proposal_id));
|
||||
int64_t palf_id_;
|
||||
LSN log_end_lsn_;
|
||||
int64_t log_proposal_id_;
|
||||
};
|
||||
|
||||
struct TruncateLogCbCtx {
|
||||
TruncateLogCbCtx(const LSN &lsn);
|
||||
TruncateLogCbCtx();
|
||||
|
||||
@ -112,6 +112,7 @@ LogSlidingWindow::LogSlidingWindow()
|
||||
is_rebuilding_(false),
|
||||
last_rebuild_lsn_(),
|
||||
last_record_end_lsn_(PALF_INITIAL_LSN_VAL),
|
||||
fs_cb_cost_stat_("[PALF STAT FS CB]", 2 * 1000 * 1000),
|
||||
log_life_time_stat_("[PALF STAT LOG LIFETIME]", 2 * 1000 * 1000),
|
||||
log_submit_wait_stat_("[PALF STAT LOG SUBMIT WAIT]", 2 * 1000 * 1000),
|
||||
log_submit_to_slide_cost_stat_("[PALF STAT LOG SLIDE WAIT]", 2 * 1000 * 1000),
|
||||
@ -120,7 +121,6 @@ LogSlidingWindow::LogSlidingWindow()
|
||||
accum_group_log_size_(0),
|
||||
last_record_group_log_id_(FIRST_VALID_LOG_ID - 1),
|
||||
freeze_mode_(FEEDBACK_FREEZE_MODE),
|
||||
cb_pool_tg_id_(-1),
|
||||
is_inited_(false)
|
||||
{}
|
||||
|
||||
@ -140,7 +140,6 @@ void LogSlidingWindow::destroy()
|
||||
log_engine_ = NULL;
|
||||
mm_ = NULL;
|
||||
mode_mgr_ = NULL;
|
||||
cb_pool_tg_id_ = -1;
|
||||
}
|
||||
|
||||
int LogSlidingWindow::flashback(const PalfBaseInfo &palf_base_info, const int64_t palf_id, common::ObILogAllocator *alloc_mgr)
|
||||
@ -202,8 +201,7 @@ int LogSlidingWindow::init(const int64_t palf_id,
|
||||
palf::PalfFSCbWrapper *palf_fs_cb,
|
||||
common::ObILogAllocator *alloc_mgr,
|
||||
const PalfBaseInfo &palf_base_info,
|
||||
const bool is_normal_replica,
|
||||
const int cb_pool_tg_id)
|
||||
const bool is_normal_replica)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const LogInfo &prev_log_info = palf_base_info.prev_log_info_;
|
||||
@ -216,11 +214,10 @@ int LogSlidingWindow::init(const int64_t palf_id,
|
||||
|| NULL == mm
|
||||
|| NULL == mode_mgr
|
||||
|| NULL == log_engine
|
||||
|| NULL == palf_fs_cb
|
||||
|| 0 >= cb_pool_tg_id) {
|
||||
|| NULL == palf_fs_cb) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(WARN, "invalid argumetns", K(ret), K(palf_id), K(self), K(palf_base_info),
|
||||
KP(state_mgr), KP(mm), KP(mode_mgr), KP(log_engine), KP(palf_fs_cb), K(cb_pool_tg_id));
|
||||
KP(state_mgr), KP(mm), KP(mode_mgr), KP(log_engine), KP(palf_fs_cb));
|
||||
} else if (is_normal_replica && OB_FAIL(do_init_mem_(palf_id, palf_base_info, alloc_mgr))) {
|
||||
PALF_LOG(WARN, "do_init_mem_ failed", K(ret), K(palf_id));
|
||||
} else {
|
||||
@ -252,7 +249,6 @@ int LogSlidingWindow::init(const int64_t palf_id,
|
||||
|
||||
MEMSET(append_cnt_array_, 0, APPEND_CNT_ARRAY_SIZE * sizeof(int64_t));
|
||||
|
||||
cb_pool_tg_id_ = cb_pool_tg_id;
|
||||
is_inited_ = true;
|
||||
LogGroupEntryHeader group_header;
|
||||
LogEntryHeader log_header;
|
||||
@ -1960,13 +1956,24 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot
|
||||
const int64_t log_submit_ts = log_task->get_submit_ts();
|
||||
log_task->unlock();
|
||||
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const int64_t fs_cb_begin_ts = ObTimeUtility::current_time();
|
||||
LogSlidingCbCtx sliding_cb_ctx(palf_id_, log_end_lsn, log_proposal_id);
|
||||
if (OB_FAIL(log_engine_->submit_sliding_cb_task(cb_pool_tg_id_, sliding_cb_ctx))) {
|
||||
PALF_LOG(ERROR, "submit_sliding_cb_task failed", K_(palf_id), K_(self), K(log_id), KPC(log_task));
|
||||
if (OB_SUCCESS != (tmp_ret = palf_fs_cb_->update_end_lsn(palf_id_, log_end_lsn, log_proposal_id))) {
|
||||
if (OB_EAGAIN == tmp_ret) {
|
||||
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
|
||||
PALF_LOG(WARN, "update_end_lsn eagain", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
|
||||
}
|
||||
} else {
|
||||
PALF_LOG(WARN, "update_end_lsn failed", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
|
||||
}
|
||||
}
|
||||
const int64_t fs_cb_cost = ObTimeUtility::current_time() - fs_cb_begin_ts;
|
||||
fs_cb_cost_stat_.stat(fs_cb_cost);
|
||||
if (fs_cb_cost > 1 * 1000) {
|
||||
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "fs_cb->update_end_lsn() cost too much time", K(tmp_ret), K_(palf_id), K_(self),
|
||||
K(fs_cb_cost), K(log_id), K(log_begin_lsn), K(log_end_lsn), K(log_proposal_id));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
const int64_t log_life_time = fs_cb_begin_ts - log_gen_ts;
|
||||
log_life_time_stat_.stat(log_life_time);
|
||||
log_submit_wait_stat_.stat(log_submit_ts - log_gen_ts);
|
||||
@ -1981,7 +1988,7 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot
|
||||
|
||||
if (OB_FAIL(checksum_.verify_accum_checksum(log_task_header.data_checksum_,
|
||||
log_task_header.accum_checksum_))) {
|
||||
PALF_LOG(ERROR, "verify_accum_checksum failed", K_(palf_id), K_(self), K(log_id), KPC(log_task));
|
||||
PALF_LOG(ERROR, "verify_accum_checksum failed", K_(palf_id), K_(self), K(ret), K(log_id), KPC(log_task));
|
||||
} else {
|
||||
// update last_slide_lsn_
|
||||
(void) try_update_last_slide_log_info_(log_id, log_max_scn, log_begin_lsn, log_end_lsn, \
|
||||
@ -1996,7 +2003,6 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot
|
||||
try_fetch_log_streamingly_(log_end_lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (0 == log_id % 100) {
|
||||
PALF_LOG(INFO, "sliding_cb finished", K_(palf_id), K_(self), K(ret), K(log_id));
|
||||
}
|
||||
|
||||
@ -53,7 +53,6 @@ class LogModeMgr;
|
||||
class LogTask;
|
||||
class LogGroupEntry;
|
||||
class TruncateLogCbCtx;
|
||||
class LogIOTask;
|
||||
|
||||
enum FetchTriggerType
|
||||
{
|
||||
@ -132,8 +131,6 @@ private:
|
||||
common::ObMemberList lagged_list_;
|
||||
};
|
||||
|
||||
extern int push_task_into_cb_thread_pool(const int64_t tg_id, LogIOTask *io_task);
|
||||
|
||||
class LogSlidingWindow : public ISlidingCallBack
|
||||
{
|
||||
public:
|
||||
@ -151,8 +148,7 @@ public:
|
||||
palf::PalfFSCbWrapper *palf_fs_cb,
|
||||
common::ObILogAllocator *alloc_mgr,
|
||||
const PalfBaseInfo &palf_base_info,
|
||||
const bool is_normal_replica,
|
||||
const int cb_pool_tg_id);
|
||||
const bool is_normal_replica);
|
||||
virtual int sliding_cb(const int64_t sn, const FixedSlidingWindowSlot *data);
|
||||
virtual int64_t get_max_log_id() const;
|
||||
virtual const share::SCN get_max_scn() const;
|
||||
@ -505,6 +501,7 @@ private:
|
||||
bool is_rebuilding_;
|
||||
LSN last_rebuild_lsn_;
|
||||
LSN last_record_end_lsn_;
|
||||
ObMiniStat::ObStatItem fs_cb_cost_stat_;
|
||||
ObMiniStat::ObStatItem log_life_time_stat_;
|
||||
ObMiniStat::ObStatItem log_submit_wait_stat_;
|
||||
ObMiniStat::ObStatItem log_submit_to_slide_cost_stat_;
|
||||
@ -514,7 +511,6 @@ private:
|
||||
int64_t last_record_group_log_id_;
|
||||
int64_t append_cnt_array_[APPEND_CNT_ARRAY_SIZE];
|
||||
FreezeMode freeze_mode_;
|
||||
int cb_pool_tg_id_;
|
||||
bool is_inited_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(LogSlidingWindow);
|
||||
|
||||
@ -364,7 +364,7 @@ int PalfEnvImpl::create_palf_handle_impl_(const int64_t palf_id,
|
||||
PALF_LOG(WARN, "alloc palf_handle_impl failed", K(ret));
|
||||
} else if (OB_FAIL(palf_handle_impl->init(palf_id, access_mode, palf_base_info, replica_type,
|
||||
&fetch_log_engine_, base_dir, log_alloc_mgr_, log_block_pool_, &log_rpc_, &log_io_worker_,
|
||||
this, self_, &election_timer_, palf_epoch, cb_thread_pool_.get_tg_id()))) {
|
||||
this, self_, &election_timer_, palf_epoch))) {
|
||||
PALF_LOG(ERROR, "IPalfHandleImpl init failed", K(ret), K(palf_id));
|
||||
// NB: always insert value into hash map finally.
|
||||
} else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, palf_handle_impl))) {
|
||||
@ -917,7 +917,6 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id)
|
||||
LSKey hash_map_key(palf_id);
|
||||
bool is_integrity = true;
|
||||
const int64_t palf_epoch = ATOMIC_AAF(&last_palf_epoch_, 1);
|
||||
const int cb_pool_tg_id = cb_thread_pool_.get_tg_id();
|
||||
if (0 > (pret = snprintf(base_dir, MAX_PATH_SIZE, "%s/%ld", log_dir_, palf_id))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
PALF_LOG(WARN, "snprint failed", K(ret), K(pret), K(palf_id));
|
||||
@ -925,7 +924,7 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id)
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
PALF_LOG(WARN, "alloc ipalf_handle_impl failed", K(ret));
|
||||
} else if (OB_FAIL(tmp_palf_handle_impl->load(palf_id, &fetch_log_engine_, base_dir, log_alloc_mgr_,
|
||||
log_block_pool_, &log_rpc_, &log_io_worker_, this, self_, &election_timer_, palf_epoch, cb_pool_tg_id, is_integrity))) {
|
||||
log_block_pool_, &log_rpc_, &log_io_worker_, this, self_, &election_timer_, palf_epoch, is_integrity))) {
|
||||
PALF_LOG(ERROR, "PalfHandleImpl init failed", K(ret), K(palf_id));
|
||||
} else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, tmp_palf_handle_impl))) {
|
||||
PALF_LOG(WARN, "palf_handle_impl_map_ insert_and_get failed", K(ret), K(palf_id), K(tmp_palf_handle_impl));
|
||||
|
||||
@ -69,7 +69,6 @@ PalfHandleImpl::PalfHandleImpl()
|
||||
palf_env_impl_(NULL),
|
||||
append_cost_stat_("[PALF STAT WRITE LOG]", 2 * 1000 * 1000),
|
||||
flush_cb_cost_stat_("[PALF STAT FLUSH CB]", 2 * 1000 * 1000),
|
||||
fs_cb_cost_stat_("[PALF STAT FS CB]", 2 * 1000 * 1000),
|
||||
replica_meta_lock_(),
|
||||
rebuilding_lock_(),
|
||||
config_change_lock_(),
|
||||
@ -107,8 +106,7 @@ int PalfHandleImpl::init(const int64_t palf_id,
|
||||
IPalfEnvImpl *palf_env_impl,
|
||||
const common::ObAddr &self,
|
||||
common::ObOccamTimer *election_timer,
|
||||
const int64_t palf_epoch,
|
||||
const int cb_pool_tg_id)
|
||||
const int64_t palf_epoch)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int pret = 0;
|
||||
@ -130,12 +128,11 @@ int PalfHandleImpl::init(const int64_t palf_id,
|
||||
|| NULL == palf_env_impl
|
||||
|| false == self.is_valid()
|
||||
|| NULL == election_timer
|
||||
|| palf_epoch < 0
|
||||
|| cb_pool_tg_id <= 0) {
|
||||
|| palf_epoch < 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(palf_base_info), K(replica_type),
|
||||
K(access_mode), K(log_dir), K(alloc_mgr), K(log_block_pool), K(log_rpc),
|
||||
K(log_io_worker), K(palf_env_impl), K(self), K(election_timer), K(palf_epoch), K(cb_pool_tg_id));
|
||||
K(log_io_worker), K(palf_env_impl), K(self), K(election_timer), K(palf_epoch));
|
||||
} else if (OB_FAIL(log_meta.generate_by_palf_base_info(palf_base_info, access_mode, replica_type))) {
|
||||
PALF_LOG(WARN, "generate_by_palf_base_info failed", K(ret), K(palf_id), K(palf_base_info), K(access_mode), K(replica_type));
|
||||
} else if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", log_dir)) && false) {
|
||||
@ -146,11 +143,11 @@ int PalfHandleImpl::init(const int64_t palf_id,
|
||||
PALF_LOG(WARN, "LogEngine init failed", K(ret), K(palf_id), K(log_dir), K(alloc_mgr),
|
||||
K(log_rpc), K(log_io_worker));
|
||||
} else if (OB_FAIL(do_init_mem_(palf_id, palf_base_info, log_meta, log_dir, self, fetch_log_engine,
|
||||
alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer, cb_pool_tg_id))) {
|
||||
alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer))) {
|
||||
PALF_LOG(WARN, "PalfHandleImpl do_init_mem_ failed", K(ret), K(palf_id));
|
||||
} else {
|
||||
PALF_EVENT("PalfHandleImpl init success", palf_id_, K(ret), K(self), K(access_mode), K(palf_base_info),
|
||||
K(replica_type), K(log_dir), K(log_meta), K(palf_epoch), K(cb_pool_tg_id));
|
||||
K(replica_type), K(log_dir), K(log_meta), K(palf_epoch));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -171,7 +168,6 @@ int PalfHandleImpl::load(const int64_t palf_id,
|
||||
const common::ObAddr &self,
|
||||
common::ObOccamTimer *election_timer,
|
||||
const int64_t palf_epoch,
|
||||
const int cb_pool_tg_id,
|
||||
bool &is_integrity)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -204,7 +200,7 @@ int PalfHandleImpl::load(const int64_t palf_id,
|
||||
} else if (OB_FAIL(construct_palf_base_info_(max_committed_end_lsn, palf_base_info))) {
|
||||
PALF_LOG(WARN, "construct_palf_base_info_ failed", K(ret), K(palf_id), K(entry_header), K(palf_base_info));
|
||||
} else if (OB_FAIL(do_init_mem_(palf_id, palf_base_info, log_engine_.get_log_meta(), log_dir, self,
|
||||
fetch_log_engine, alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer, cb_pool_tg_id))) {
|
||||
fetch_log_engine, alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer))) {
|
||||
PALF_LOG(WARN, "PalfHandleImpl do_init_mem_ failed", K(ret), K(palf_id));
|
||||
} else if (OB_FAIL(append_disk_log_to_sw_(max_committed_end_lsn))) {
|
||||
PALF_LOG(WARN, "append_disk_log_to_sw_ failed", K(ret), K(palf_id));
|
||||
@ -2262,8 +2258,7 @@ int PalfHandleImpl::do_init_mem_(
|
||||
LogRpc *log_rpc,
|
||||
LogIOWorker *log_io_worker,
|
||||
IPalfEnvImpl *palf_env_impl,
|
||||
common::ObOccamTimer *election_timer,
|
||||
const int cb_pool_tg_id)
|
||||
common::ObOccamTimer *election_timer)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int pret = -1;
|
||||
@ -2281,8 +2276,8 @@ int PalfHandleImpl::do_init_mem_(
|
||||
if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", log_dir)) && false) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
PALF_LOG(ERROR, "error unexpected", K(ret), K(palf_id));
|
||||
} else if (OB_FAIL(sw_.init(palf_id, self, &state_mgr_, &config_mgr_, &mode_mgr_, &log_engine_,
|
||||
&fs_cb_wrapper_, alloc_mgr, palf_base_info, is_normal_replica, cb_pool_tg_id))) {
|
||||
} else if (OB_FAIL(sw_.init(palf_id, self, &state_mgr_, &config_mgr_, &mode_mgr_,
|
||||
&log_engine_, &fs_cb_wrapper_, alloc_mgr, palf_base_info, is_normal_replica))) {
|
||||
PALF_LOG(WARN, "sw_ init failed", K(ret), K(palf_id));
|
||||
} else if (OB_FAIL(election_.init_and_start(palf_id,
|
||||
election_timer,
|
||||
@ -4069,23 +4064,6 @@ int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PalfHandleImpl::file_size_cb(const LogSlidingCbCtx &sliding_cb_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t fs_cb_begin_ts = ObTimeUtility::current_time();
|
||||
if (OB_FAIL(fs_cb_wrapper_.update_end_lsn(sliding_cb_ctx.palf_id_, \
|
||||
sliding_cb_ctx.log_end_lsn_, sliding_cb_ctx.log_proposal_id_))) {
|
||||
PALF_LOG(WARN, "fs_cb_wrapper_.update_end_lsn failed", KPC(this), K(sliding_cb_ctx));
|
||||
}
|
||||
const int64_t fs_cb_cost = ObTimeUtility::current_time() - fs_cb_begin_ts;
|
||||
fs_cb_cost_stat_.stat(fs_cb_cost);
|
||||
if (fs_cb_cost > 1 * 1000) {
|
||||
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "fs_cb_wrapper_.update_end_lsn() cost too much time",
|
||||
K_(palf_id), K_(self), K(fs_cb_cost), K(sliding_cb_ctx));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
PalfStat::PalfStat()
|
||||
: self_(),
|
||||
palf_id_(INVALID_PALF_ID),
|
||||
|
||||
@ -572,7 +572,6 @@ public:
|
||||
virtual int get_palf_epoch(int64_t &palf_epoch) const = 0;
|
||||
virtual int diagnose(PalfDiagnoseInfo &diagnose_info) const = 0;
|
||||
virtual int update_palf_stat() = 0;
|
||||
virtual int file_size_cb(const LogSlidingCbCtx &sliding_cb_ctx) = 0;
|
||||
|
||||
DECLARE_PURE_VIRTUAL_TO_STRING;
|
||||
};
|
||||
@ -595,8 +594,7 @@ public:
|
||||
IPalfEnvImpl *palf_env_impl,
|
||||
const common::ObAddr &self,
|
||||
common::ObOccamTimer *election_timer,
|
||||
const int64_t palf_epoch,
|
||||
const int cb_pool_tg_id);
|
||||
const int64_t palf_epoch);
|
||||
bool check_can_be_used() const override final;
|
||||
// 重启接口
|
||||
// 1. 生成迭代器,定位meta_storage和log_storage的终点;
|
||||
@ -614,7 +612,6 @@ public:
|
||||
const common::ObAddr &self,
|
||||
common::ObOccamTimer *election_timer,
|
||||
const int64_t palf_epoch,
|
||||
const int cb_pool_tg_id,
|
||||
bool &is_integrity);
|
||||
void destroy();
|
||||
int start();
|
||||
@ -845,7 +842,6 @@ public:
|
||||
const int64_t timeout_us) override final;
|
||||
int diagnose(PalfDiagnoseInfo &diagnose_info) const;
|
||||
int update_palf_stat() override final;
|
||||
int file_size_cb(const LogSlidingCbCtx &sliding_cb_ctx);
|
||||
TO_STRING_KV(K_(palf_id), K_(self), K_(has_set_deleted));
|
||||
private:
|
||||
int do_init_mem_(const int64_t palf_id,
|
||||
@ -858,8 +854,7 @@ private:
|
||||
LogRpc *log_rpc,
|
||||
LogIOWorker *log_io_worker,
|
||||
IPalfEnvImpl *palf_env_impl,
|
||||
common::ObOccamTimer *election_timer,
|
||||
const int cb_pool_tg_id);
|
||||
common::ObOccamTimer *election_timer);
|
||||
int after_flush_prepare_meta_(const int64_t &proposal_id);
|
||||
int after_flush_config_change_meta_(const int64_t proposal_id, const LogConfigVersion &config_version);
|
||||
int after_flush_mode_meta_(const int64_t proposal_id,
|
||||
@ -1039,7 +1034,6 @@ private:
|
||||
bool diskspace_enough_;
|
||||
ObMiniStat::ObStatItem append_cost_stat_;
|
||||
ObMiniStat::ObStatItem flush_cb_cost_stat_;
|
||||
ObMiniStat::ObStatItem fs_cb_cost_stat_;
|
||||
// a spin lock for read/write replica_meta mutex
|
||||
SpinLock replica_meta_lock_;
|
||||
SpinLock rebuilding_lock_;
|
||||
|
||||
@ -29,7 +29,6 @@ namespace common
|
||||
ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id)
|
||||
: tenant_id_(tenant_id), total_limit_(INT64_MAX), pending_replay_mutator_size_(0),
|
||||
LOG_IO_FLUSH_LOG_TASK_SIZE(sizeof(palf::LogIOFlushLogTask)),
|
||||
LOG_SLIDING_CB_TASK_SIZE(sizeof(palf::LogSlidingCbTask)),
|
||||
LOG_IO_TRUNCATE_LOG_TASK_SIZE(sizeof(palf::LogIOTruncateLogTask)),
|
||||
LOG_IO_FLUSH_META_TASK_SIZE(sizeof(palf::LogIOFlushMetaTask)),
|
||||
LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE(sizeof(palf::LogIOTruncatePrefixBlocksTask)),
|
||||
@ -44,7 +43,6 @@ ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id)
|
||||
inner_table_replay_task_alloc_(ObMemAttr(tenant_id, ObModIds::OB_LOG_REPLAY_ENGINE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, inner_table_replay_blk_alloc_),
|
||||
user_table_replay_task_alloc_(ObMemAttr(tenant_id, ObModIds::OB_LOG_REPLAY_ENGINE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, user_table_replay_blk_alloc_),
|
||||
log_io_flush_log_task_alloc_(LOG_IO_FLUSH_LOG_TASK_SIZE, ObMemAttr(tenant_id, "FlushLog"), choose_blk_size(LOG_IO_FLUSH_LOG_TASK_SIZE), clog_blk_alloc_, this),
|
||||
log_sliding_cb_task_alloc_(LOG_SLIDING_CB_TASK_SIZE, ObMemAttr(tenant_id, "SlidingCb"), choose_blk_size(LOG_SLIDING_CB_TASK_SIZE), clog_blk_alloc_, this),
|
||||
log_io_truncate_log_task_alloc_(LOG_IO_TRUNCATE_LOG_TASK_SIZE, ObMemAttr(tenant_id, "TruncateLog"), choose_blk_size(LOG_IO_TRUNCATE_LOG_TASK_SIZE), clog_blk_alloc_, this),
|
||||
log_io_flush_meta_task_alloc_(LOG_IO_FLUSH_META_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_FLUSH_META_TASK_SIZE), clog_blk_alloc_, this),
|
||||
log_io_truncate_prefix_blocks_task_alloc_(LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE), clog_blk_alloc_, this),
|
||||
@ -84,7 +82,6 @@ void ObTenantMutilAllocator::try_purge()
|
||||
inner_table_replay_task_alloc_.purge_extra_cached_block(0);
|
||||
user_table_replay_task_alloc_.purge_extra_cached_block(0);
|
||||
log_io_flush_log_task_alloc_.purge_extra_cached_block(0);
|
||||
log_sliding_cb_task_alloc_.purge_extra_cached_block(0);
|
||||
log_io_truncate_log_task_alloc_.purge_extra_cached_block(0);
|
||||
log_io_flush_meta_task_alloc_.purge_extra_cached_block(0);
|
||||
log_io_truncate_prefix_blocks_task_alloc_.purge_extra_cached_block(0);
|
||||
@ -183,28 +180,6 @@ void ObTenantMutilAllocator::free_log_io_flush_log_task(LogIOFlushLogTask *ptr)
|
||||
}
|
||||
}
|
||||
|
||||
LogSlidingCbTask *ObTenantMutilAllocator::alloc_log_sliding_cb_task(
|
||||
const int64_t palf_id, const int64_t palf_epoch)
|
||||
{
|
||||
LogSlidingCbTask *ret_ptr = NULL;
|
||||
void *ptr = log_sliding_cb_task_alloc_.alloc();
|
||||
if (NULL != ptr) {
|
||||
ret_ptr = new(ptr)LogSlidingCbTask(palf_id, palf_epoch);
|
||||
ATOMIC_INC(&flying_sliding_cb_task_);
|
||||
}
|
||||
return ret_ptr;
|
||||
}
|
||||
|
||||
void ObTenantMutilAllocator::free_log_sliding_cb_task(LogSlidingCbTask *ptr)
|
||||
{
|
||||
if (OB_LIKELY(NULL != ptr)) {
|
||||
ptr->~LogSlidingCbTask();
|
||||
log_sliding_cb_task_alloc_.free(ptr);
|
||||
ATOMIC_DEC(&flying_sliding_cb_task_);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
LogIOTruncateLogTask *ObTenantMutilAllocator::alloc_log_io_truncate_log_task(
|
||||
const int64_t palf_id, const int64_t palf_epoch)
|
||||
{
|
||||
|
||||
@ -25,7 +25,6 @@ namespace oceanbase
|
||||
namespace palf
|
||||
{
|
||||
class LogIOFlushLogTask;
|
||||
class LogSlidingCbTask;
|
||||
class LogIOTruncateLogTask;
|
||||
class LogIOFlushMetaTask;
|
||||
class LogIOTruncatePrefixBlocksTask;
|
||||
@ -44,7 +43,7 @@ class ObTraceProfile;
|
||||
class ObILogAllocator : public ObIAllocator
|
||||
{
|
||||
public:
|
||||
ObILogAllocator() : flying_log_task_(0), flying_meta_task_(0), flying_sliding_cb_task_(0) {}
|
||||
ObILogAllocator() : flying_log_task_(0), flying_meta_task_(0) {}
|
||||
virtual ~ObILogAllocator() {}
|
||||
|
||||
public:
|
||||
@ -56,8 +55,6 @@ public:
|
||||
virtual const ObBlockAllocMgr &get_clog_blk_alloc_mgr() const = 0;
|
||||
virtual palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
|
||||
virtual void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr) = 0;
|
||||
virtual palf::LogSlidingCbTask *alloc_log_sliding_cb_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
|
||||
virtual void free_log_sliding_cb_task(palf::LogSlidingCbTask *ptr) = 0;
|
||||
virtual palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
|
||||
virtual void free_log_io_truncate_log_task(palf::LogIOTruncateLogTask *ptr) = 0;
|
||||
virtual palf::LogIOFlushMetaTask *alloc_log_io_flush_meta_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
|
||||
@ -77,7 +74,6 @@ public:
|
||||
protected:
|
||||
int64_t flying_log_task_;
|
||||
int64_t flying_meta_task_;
|
||||
int64_t flying_sliding_cb_task_;
|
||||
};
|
||||
|
||||
// Interface for ReplayEngine module
|
||||
@ -141,8 +137,6 @@ public:
|
||||
// V4.0
|
||||
palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch);
|
||||
void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr);
|
||||
palf::LogSlidingCbTask *alloc_log_sliding_cb_task(const int64_t palf_id, const int64_t palf_epoch);
|
||||
void free_log_sliding_cb_task(palf::LogSlidingCbTask *ptr);
|
||||
palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch);
|
||||
void free_log_io_truncate_log_task(palf::LogIOTruncateLogTask *ptr);
|
||||
palf::LogIOFlushMetaTask *alloc_log_io_flush_meta_task(const int64_t palf_id, const int64_t palf_epoch);
|
||||
@ -163,7 +157,6 @@ private:
|
||||
int64_t total_limit_;
|
||||
int64_t pending_replay_mutator_size_;
|
||||
const int LOG_IO_FLUSH_LOG_TASK_SIZE;
|
||||
const int LOG_SLIDING_CB_TASK_SIZE;
|
||||
const int LOG_IO_TRUNCATE_LOG_TASK_SIZE;
|
||||
const int LOG_IO_FLUSH_META_TASK_SIZE;
|
||||
const int LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE;
|
||||
@ -178,7 +171,6 @@ private:
|
||||
ObVSliceAlloc inner_table_replay_task_alloc_;
|
||||
ObVSliceAlloc user_table_replay_task_alloc_;
|
||||
ObSliceAlloc log_io_flush_log_task_alloc_;
|
||||
ObSliceAlloc log_sliding_cb_task_alloc_;
|
||||
ObSliceAlloc log_io_truncate_log_task_alloc_;
|
||||
ObSliceAlloc log_io_flush_meta_task_alloc_;
|
||||
ObSliceAlloc log_io_truncate_prefix_blocks_task_alloc_;
|
||||
|
||||
@ -148,19 +148,19 @@ TEST_F(TestLogSlidingWindow, test_init)
|
||||
gen_default_palf_base_info_(base_info);
|
||||
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, NULL,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, NULL, &palf_fs_cb_, NULL, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, NULL, &palf_fs_cb_, NULL, base_info, true));
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
NULL, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
NULL, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, NULL, NULL, &palf_fs_cb_, NULL, base_info, true, 1));
|
||||
&mock_mm_, NULL, NULL, &palf_fs_cb_, NULL, base_info, true));
|
||||
// init succ
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
// init twice
|
||||
EXPECT_EQ(OB_INIT_TWICE, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
}
|
||||
|
||||
TEST_F(TestLogSlidingWindow, test_private_func_batch_01)
|
||||
@ -175,7 +175,7 @@ TEST_F(TestLogSlidingWindow, test_private_func_batch_01)
|
||||
gen_default_palf_base_info_(base_info);
|
||||
// init succ
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
log_id = 10 + PALF_SLIDING_WINDOW_SIZE;
|
||||
EXPECT_EQ(false, log_sw_.can_receive_larger_log_(log_id));
|
||||
EXPECT_EQ(false, log_sw_.leader_can_submit_larger_log_(log_id));
|
||||
@ -194,7 +194,7 @@ TEST_F(TestLogSlidingWindow, test_to_follower_pending)
|
||||
gen_default_palf_base_info_(base_info);
|
||||
// init succ
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
char *buf = data_buf_;
|
||||
int64_t buf_len = 1 * 1024 * 1024;
|
||||
share::SCN ref_scn;
|
||||
@ -221,7 +221,7 @@ TEST_F(TestLogSlidingWindow, test_fetch_log)
|
||||
gen_default_palf_base_info_(base_info);
|
||||
// init succ
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
prev_lsn.val_ = 1;
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.try_fetch_log(fetch_log_type, prev_lsn, fetch_start_lsn, fetch_start_log_id));
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.try_fetch_log_for_reconfirm(dest, fetch_end_lsn, is_fetched));
|
||||
@ -244,7 +244,7 @@ TEST_F(TestLogSlidingWindow, test_report_log_task_trace)
|
||||
gen_default_palf_base_info_(base_info);
|
||||
// init succ
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.report_log_task_trace(1));
|
||||
char *buf = data_buf_;
|
||||
int64_t buf_len = 2 * 1024 * 1024;
|
||||
@ -265,7 +265,7 @@ TEST_F(TestLogSlidingWindow, test_set_location_cache_cb)
|
||||
gen_default_palf_base_info_(base_info);
|
||||
// init succ
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.set_location_cache_cb(NULL));
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.set_location_cache_cb(&cb));
|
||||
EXPECT_EQ(OB_NOT_SUPPORTED, log_sw_.set_location_cache_cb(&cb));
|
||||
@ -278,7 +278,7 @@ TEST_F(TestLogSlidingWindow, test_reset_location_cache_cb)
|
||||
gen_default_palf_base_info_(base_info);
|
||||
// init succ
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.reset_location_cache_cb());
|
||||
}
|
||||
|
||||
@ -295,7 +295,7 @@ TEST_F(TestLogSlidingWindow, test_submit_log)
|
||||
share::SCN scn;
|
||||
EXPECT_EQ(OB_NOT_INIT, log_sw_.submit_log(buf, buf_len, ref_scn, lsn, scn));
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_log(NULL, buf_len, ref_scn, lsn, scn));
|
||||
buf_len = 0;
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_log(buf, buf_len, ref_scn, lsn, scn));
|
||||
@ -327,7 +327,7 @@ TEST_F(TestLogSlidingWindow, test_submit_group_log)
|
||||
PalfBaseInfo base_info;
|
||||
gen_default_palf_base_info_(base_info);
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
mock_state_mgr_.mock_proposal_id_ = 100;
|
||||
LSN lsn(10);
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_group_log(lsn, NULL, 1024));
|
||||
@ -386,7 +386,7 @@ TEST_F(TestLogSlidingWindow, test_receive_log)
|
||||
PalfBaseInfo base_info;
|
||||
gen_default_palf_base_info_(base_info);
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
|
||||
char *buf = data_buf_;
|
||||
int64_t buf_len = 2 * 1024 * 1024;
|
||||
@ -549,7 +549,7 @@ TEST_F(TestLogSlidingWindow, test_after_flush_log)
|
||||
PalfBaseInfo base_info;
|
||||
gen_default_palf_base_info_(base_info);
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
int64_t curr_proposal_id = 10;
|
||||
|
||||
// set default config meta
|
||||
@ -612,7 +612,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_log)
|
||||
PalfBaseInfo base_info;
|
||||
gen_default_palf_base_info_(base_info);
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
int64_t curr_proposal_id = 10;
|
||||
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
|
||||
|
||||
@ -720,7 +720,7 @@ TEST_F(TestLogSlidingWindow, test_ack_log)
|
||||
PalfBaseInfo base_info;
|
||||
gen_default_palf_base_info_(base_info);
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
int64_t curr_proposal_id = 10;
|
||||
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
|
||||
log_sw_.self_ = self_;
|
||||
@ -778,7 +778,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_for_rebuild)
|
||||
PalfBaseInfo base_info;
|
||||
gen_default_palf_base_info_(base_info);
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
int64_t curr_proposal_id = 10;
|
||||
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
|
||||
|
||||
@ -889,7 +889,7 @@ TEST_F(TestLogSlidingWindow, test_append_disk_log)
|
||||
PalfBaseInfo base_info;
|
||||
gen_default_palf_base_info_(base_info);
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
int64_t curr_proposal_id = 10;
|
||||
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
|
||||
// generate new group entry
|
||||
@ -1011,7 +1011,7 @@ TEST_F(TestLogSlidingWindow, test_group_entry_truncate)
|
||||
PalfBaseInfo base_info;
|
||||
gen_default_palf_base_info_(base_info);
|
||||
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
|
||||
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
|
||||
int64_t curr_proposal_id = 10;
|
||||
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
|
||||
// generate new group entry
|
||||
|
||||
Reference in New Issue
Block a user