Auto switch freeze mode for palf.

This commit is contained in:
obdev
2022-11-06 03:05:40 +00:00
committed by wangzelin.wzl
parent f40dd6025e
commit 525fc5ba35
8 changed files with 166 additions and 24 deletions

View File

@ -71,6 +71,8 @@ void LogLoopThread::run1()
void LogLoopThread::log_loop_()
{
int64_t last_switch_state_time = OB_INVALID_TIMESTAMP;
int64_t last_check_freeze_mode_time = OB_INVALID_TIMESTAMP;
int64_t last_sw_freeze_time = OB_INVALID_TIMESTAMP;
while (!has_set_stop()) {
int tmp_ret = OB_SUCCESS;
const int64_t start_ts = ObTimeUtility::current_time();
@ -81,8 +83,15 @@ void LogLoopThread::log_loop_()
}
last_switch_state_time = start_ts;
}
// try freeze log
// try switch freeze mode
const int64_t now = ObTimeUtility::current_time();
if (now - last_check_freeze_mode_time >= 1 * 1000 * 1000) {
if (OB_SUCCESS != (tmp_ret = palf_env_impl_->check_and_switch_freeze_mode())) {
PALF_LOG(WARN, "check_and_switch_freeze_mode failed", K(tmp_ret));
}
last_check_freeze_mode_time = now;
}
// try freeze log
if (OB_SUCCESS != (tmp_ret = palf_env_impl_->try_freeze_log_for_all())) {
PALF_LOG(WARN, "try_freeze_log_for_all failed", K(tmp_ret));
}

View File

@ -90,6 +90,7 @@ LogSlidingWindow::LogSlidingWindow()
accum_log_cnt_(0),
accum_group_log_size_(0),
last_record_group_log_id_(FIRST_VALID_LOG_ID - 1),
freeze_mode_(FEEDBACK_FREEZE_MODE),
is_inited_(false)
{}
@ -174,6 +175,8 @@ int LogSlidingWindow::init(const int64_t palf_id,
committed_end_lsn_ = palf_base_info.curr_lsn_;
MEMSET(append_cnt_array_, 0, APPEND_CNT_ARRAY_SIZE * sizeof(int64_t));
is_inited_ = true;
LogGroupEntryHeader group_header;
LogEntryHeader log_header;
@ -385,6 +388,18 @@ int LogSlidingWindow::submit_log(const char *buf,
K(valid_log_size), K(is_need_handle), K(is_need_handle_next));
}
}
// inc append count
const int64_t array_idx = get_itid() & APPEND_CNT_ARRAY_MASK;
OB_ASSERT(0 <= array_idx && array_idx < APPEND_CNT_ARRAY_SIZE);
ATOMIC_INC(&append_cnt_array_[array_idx]);
LSN last_submit_end_lsn, max_flushed_end_lsn;
get_last_submit_end_lsn_(last_submit_end_lsn);
get_max_flushed_end_lsn(max_flushed_end_lsn);
if (max_flushed_end_lsn >= last_submit_end_lsn) {
// all logs have been flushed, freeze last log in feedback mode
(void) feedback_freeze_last_log_();
}
}
if (OB_SUCC(ret) && is_need_handle_next) {
@ -961,7 +976,7 @@ int LogSlidingWindow::generate_group_entry_header_(const int64_t log_id,
return ret;
}
int LogSlidingWindow::try_freeze_last_log_(const int64_t expected_log_id,
int LogSlidingWindow::try_freeze_last_log_task_(const int64_t expected_log_id,
const LSN &expected_end_lsn,
bool &is_need_handle)
{
@ -1014,27 +1029,79 @@ int LogSlidingWindow::try_freeze_last_log_(const int64_t expected_log_id,
return ret;
}
int LogSlidingWindow::try_freeze_last_log()
int LogSlidingWindow::feedback_freeze_last_log_()
{
int ret = OB_SUCCESS;
LSN last_log_end_lsn;
int64_t last_log_id = OB_INVALID_LOG_ID;
bool is_need_handle = false;
if (OB_FAIL(lsn_allocator_.try_freeze(last_log_end_lsn, last_log_id))) {
if (FEEDBACK_FREEZE_MODE != freeze_mode_) {
// Only FEEDBACK_FREEZE_MODE need exec this fucntion
PALF_LOG(TRACE, "current freeze mode is not feedback", K_(palf_id), K_(self), K_(freeze_mode));
} else if (OB_FAIL(lsn_allocator_.try_freeze(last_log_end_lsn, last_log_id))) {
PALF_LOG(WARN, "lsn_allocator try_freeze failed", K(ret), K_(palf_id), K_(self), K(last_log_end_lsn), K(last_log_id));
} else if (last_log_id <= 0) {
// no log, no need freeze
} else if (OB_FAIL(try_freeze_last_log_(last_log_id, last_log_end_lsn, is_need_handle))) {
PALF_LOG(WARN, "try_freeze_last_log_ failed", K(ret), K_(palf_id), K_(self), K(last_log_id), K(last_log_end_lsn));
} else if (OB_FAIL(try_freeze_last_log_task_(last_log_id, last_log_end_lsn, is_need_handle))) {
PALF_LOG(WARN, "try_freeze_last_log_task_ failed", K(ret), K_(palf_id), K_(self), K(last_log_id), K(last_log_end_lsn));
} else {
bool is_committed_lsn_updated = false;
(void) handle_next_submit_log_(is_committed_lsn_updated);
(void) handle_committed_log_();
}
return ret;
}
int LogSlidingWindow::check_and_switch_freeze_mode()
{
int ret = OB_SUCCESS;
int64_t total_append_cnt = 0;
for (int i = 0; i < APPEND_CNT_ARRAY_SIZE; ++i) {
total_append_cnt += ATOMIC_LOAD(&append_cnt_array_[i]);
ATOMIC_STORE(&append_cnt_array_[i], 0);
}
if (FEEDBACK_FREEZE_MODE == freeze_mode_) {
if (total_append_cnt >= APPEND_CNT_LB_FOR_PERIOD_FREEZE) {
freeze_mode_ = PERIOD_FREEZE_MODE;
PALF_LOG(INFO, "switch freeze_mode to period", K_(palf_id), K_(self), K(total_append_cnt));
}
} else if (PERIOD_FREEZE_MODE == freeze_mode_) {
if (total_append_cnt < APPEND_CNT_LB_FOR_PERIOD_FREEZE) {
freeze_mode_ = FEEDBACK_FREEZE_MODE;
PALF_LOG(INFO, "switch freeze_mode to feedback", K_(palf_id), K_(self), K(total_append_cnt));
(void) feedback_freeze_last_log_();
}
} else {}
PALF_LOG(TRACE, "finish check_and_switch_freeze_mode", K_(palf_id), K_(self), K(total_append_cnt), K_(freeze_mode));
return ret;
}
int LogSlidingWindow::period_freeze_last_log()
{
int ret = OB_SUCCESS;
LSN last_log_end_lsn;
int64_t last_log_id = OB_INVALID_LOG_ID;
bool is_need_handle = false;
if (PERIOD_FREEZE_MODE != freeze_mode_) {
// Only PERIOD_FREEZE_MODE need exec this fucntion
PALF_LOG(TRACE, "current freeze mode is not period", K_(palf_id), K_(self), K_(freeze_mode));
} else if (OB_FAIL(lsn_allocator_.try_freeze(last_log_end_lsn, last_log_id))) {
PALF_LOG(WARN, "lsn_allocator try_freeze failed", K(ret), K_(palf_id), K_(self), K(last_log_end_lsn), K(last_log_id));
} else if (last_log_id <= 0) {
// no log, no need freeze
} else if (OB_FAIL(try_freeze_last_log_task_(last_log_id, last_log_end_lsn, is_need_handle))) {
PALF_LOG(WARN, "try_freeze_last_log_task_ failed", K(ret), K_(palf_id), K_(self), K(last_log_id), K(last_log_end_lsn));
} else {
bool is_committed_lsn_updated = false;
(void) handle_next_submit_log_(is_committed_lsn_updated);
}
// handle committed log periodically
// because committed_end_lsn may be advanced during reconfirm,
// so there is maybe some log that can be slid in sw.
(void) handle_committed_log_();
return ret;
}
int LogSlidingWindow::after_rebuild(const LSN &lsn)
{
int ret = OB_SUCCESS;
@ -1167,6 +1234,9 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx)
const int64_t last_submit_log_id = get_last_submit_log_id_();
if (log_id == last_submit_log_id) {
// 基于log_id连续性条件触发后续日志处理
// feedback mode下尝试冻结后面的log
(void) feedback_freeze_last_log_();
// 非feedback mode需触发handle next log
bool is_committed_lsn_updated = false;
(void) handle_next_submit_log_(is_committed_lsn_updated);
}
@ -1200,6 +1270,11 @@ int64_t LogSlidingWindow::get_last_submit_log_id_() const
return last_submit_log_id_;
}
void LogSlidingWindow::get_last_submit_end_lsn_(LSN &end_lsn) const
{
end_lsn = ATOMIC_LOAD(&last_submit_end_lsn_.val_);
}
void LogSlidingWindow::get_last_submit_log_info_(LSN &lsn, LSN &end_lsn,
int64_t &log_id, int64_t &log_proposal_id) const
{
@ -1304,7 +1379,7 @@ int LogSlidingWindow::set_last_submit_log_info_(const LSN &lsn,
ObSpinLockGuard guard(last_submit_info_lock_);
const int64_t old_submit_log_id = last_submit_log_id_;
last_submit_lsn_ = lsn;
last_submit_end_lsn_ = end_lsn;
ATOMIC_STORE(&last_submit_end_lsn_.val_, end_lsn.val_);
last_submit_log_id_ = log_id;
last_submit_log_pid_ = log_proposal_id;
PALF_LOG(TRACE, "set_last_submit_log_info_ success", K_(palf_id), K_(self), K(old_submit_log_id), K(lsn), K(log_id), \
@ -1893,8 +1968,8 @@ int LogSlidingWindow::freeze_pending_log_(LSN &last_lsn)
PALF_LOG(WARN, "lsn_allocator try_freeze failed", K(ret), K_(palf_id), K_(self), K(last_lsn));
} else if (last_log_id <= 0) {
// no log, no need freeze
} else if (OB_FAIL(try_freeze_last_log_(last_log_id, last_lsn, is_need_handle))) {
PALF_LOG(WARN, "try_freeze_last_log_ failed", K(ret), K_(palf_id), K_(self), K(last_lsn));
} else if (OB_FAIL(try_freeze_last_log_task_(last_log_id, last_lsn, is_need_handle))) {
PALF_LOG(WARN, "try_freeze_last_log_task_ failed", K(ret), K_(palf_id), K_(self), K(last_lsn));
} else {
const int64_t last_submit_log_id = get_last_submit_log_id_();
if (last_log_id == last_submit_log_id + 1) {

View File

@ -70,6 +70,12 @@ enum TruncateType
TRUNCATE_LOG = 2,
};
enum FreezeMode
{
PERIOD_FREEZE_MODE = 0,
FEEDBACK_FREEZE_MODE,
};
struct TruncateLogInfo
{
TruncateType truncate_type_;
@ -181,10 +187,12 @@ public:
virtual int to_leader_active();
virtual int try_advance_committed_end_lsn(const LSN &end_lsn);
virtual int64_t get_last_submit_log_id_() const;
virtual void get_last_submit_end_lsn_(LSN &end_lsn) const;
virtual int get_last_submit_log_info(LSN &last_submit_lsn, int64_t &log_id, int64_t &log_proposal_id) const;
virtual int get_last_slide_end_lsn(LSN &out_end_lsn) const;
virtual int64_t get_last_slide_log_ts() const;
virtual int try_freeze_last_log();
virtual int check_and_switch_freeze_mode();
virtual int period_freeze_last_log();
virtual int inc_update_log_ts_base(const int64_t log_ts);
// location cache will be removed TODO by yunlong
virtual int set_location_cache_cb(PalfLocationCacheCb *lc_cb);
@ -250,7 +258,8 @@ private:
const int64_t log_id,
const int64_t &log_proposal_id);
int try_freeze_prev_log_(const int64_t next_log_id, const LSN &lsn, bool &is_need_handle);
int try_freeze_last_log_(const int64_t expected_log_id, const LSN &expected_end_lsn, bool &is_need_handle);
int feedback_freeze_last_log_();
int try_freeze_last_log_task_(const int64_t expected_log_id, const LSN &expected_end_lsn, bool &is_need_handle);
int generate_new_group_log_(const LSN &lsn,
const int64_t log_id,
const int64_t log_ts,
@ -327,6 +336,9 @@ private:
public:
typedef common::ObLinearHashMap<common::ObAddr, LSN> SvrMatchOffsetMap;
static const int64_t TMP_HEADER_SER_BUF_LEN = 256; // log header序列化的临时buffer大小
static const int64_t APPEND_CNT_ARRAY_SIZE = 32; // append次数统计数组的size
static const uint64_t APPEND_CNT_ARRAY_MASK = APPEND_CNT_ARRAY_SIZE - 1;
static const int64_t APPEND_CNT_LB_FOR_PERIOD_FREEZE = 200000; // 切为PERIOD_FREEZE_MODE的append count下界
private:
struct LogTaskGuard
{
@ -447,6 +459,8 @@ private:
int64_t accum_log_cnt_;
int64_t accum_group_log_size_;
int64_t last_record_group_log_id_;
int64_t append_cnt_array_[APPEND_CNT_ARRAY_SIZE];
FreezeMode freeze_mode_;
bool is_inited_;
private:
DISALLOW_COPY_AND_ASSIGN(LogSlidingWindow);

View File

@ -556,8 +556,19 @@ bool PalfEnvImpl::FreezeLogFunctor::operator() (const LSKey &palf_id, PalfHandle
int tmp_ret = OB_SUCCESS;
if (NULL == palf_handle_impl) {
PALF_LOG(ERROR, "palf_handle_impl is NULL", KP(palf_handle_impl), K(palf_id));
} else if (OB_SUCCESS != (tmp_ret = palf_handle_impl->try_freeze_last_log())) {
PALF_LOG(WARN, "try_freeze_last_log failed", K(tmp_ret), K(palf_id));
} else if (OB_SUCCESS != (tmp_ret = palf_handle_impl->period_freeze_last_log())) {
PALF_LOG(WARN, "period_freeze_last_log failed", K(tmp_ret), K(palf_id));
} else {}
return true;
}
bool PalfEnvImpl::CheckFreezeModeFunctor::operator() (const LSKey &palf_id, PalfHandleImpl *palf_handle_impl)
{
int tmp_ret = OB_SUCCESS;
if (NULL == palf_handle_impl) {
PALF_LOG(ERROR, "palf_handle_impl is NULL", KP(palf_handle_impl), K(palf_id));
} else if (OB_SUCCESS != (tmp_ret = palf_handle_impl->check_and_switch_freeze_mode())) {
PALF_LOG(WARN, "check_and_switch_freeze_mode failed", K(tmp_ret), K(palf_id));
} else {}
return true;
}
@ -575,6 +586,19 @@ int PalfEnvImpl::try_switch_state_for_all()
return ret;
}
int PalfEnvImpl::check_and_switch_freeze_mode()
{
int ret = OB_SUCCESS;
CheckFreezeModeFunctor check_freeze_mode_functor;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(WARN, "PalfEnvImpl is not inited", K(ret));
} else if (OB_FAIL(palf_handle_impl_map_.for_each(check_freeze_mode_functor))) {
PALF_LOG(WARN, "palf_handle_impl_map_ for_each failed", K(ret));
} else {}
return ret;
}
int PalfEnvImpl::try_freeze_log_for_all()
{
int ret = OB_SUCCESS;

View File

@ -205,6 +205,7 @@ public:
PalfHandleImplGuard &palf_handle_impl_guard);
void revert_palf_handle_impl(PalfHandleImpl *palf_handle_impl);
int try_switch_state_for_all();
int check_and_switch_freeze_mode();
int try_freeze_log_for_all();
// =================== memory space management ==================
bool check_tenant_memory_enough();
@ -246,6 +247,13 @@ private:
~FreezeLogFunctor() {}
bool operator() (const LSKey &palf_id, PalfHandleImpl *palf_handle_impl);
};
class CheckFreezeModeFunctor
{
public:
CheckFreezeModeFunctor() {}
~CheckFreezeModeFunctor() {}
bool operator() (const LSKey &palf_id, PalfHandleImpl *palf_handle_impl);
};
struct LogGetRecycableFileCandidate {
LogGetRecycableFileCandidate();
~LogGetRecycableFileCandidate();

View File

@ -2025,15 +2025,26 @@ int PalfHandleImpl::reset_location_cache_cb()
return ret;
}
int PalfHandleImpl::try_freeze_last_log()
int PalfHandleImpl::check_and_switch_freeze_mode()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
RLockGuard guard(lock_);
sw_.try_freeze_last_log();
sw_.check_and_switch_freeze_mode();
}
return ret;
}
int PalfHandleImpl::period_freeze_last_log()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
RLockGuard guard(lock_);
sw_.period_freeze_last_log();
}
return ret;
}

View File

@ -766,7 +766,8 @@ public:
int inner_truncate_prefix_blocks(const LSN &lsn);
// ==================================================================
int check_and_switch_state();
int try_freeze_last_log();
int check_and_switch_freeze_mode();
int period_freeze_last_log();
int handle_prepare_request(const common::ObAddr &server,
const int64_t &proposal_id) override final;
int handle_prepare_response(const common::ObAddr &server,

View File

@ -444,7 +444,7 @@ TEST_F(TestLogSlidingWindow, test_receive_log)
// use correct prev_log_proposal_id
prev_log_proposal_id = curr_proposal_id;
// handle submit log
EXPECT_EQ(OB_SUCCESS, log_sw_.try_freeze_last_log());
EXPECT_EQ(OB_SUCCESS, log_sw_.period_freeze_last_log());
EXPECT_EQ(OB_EAGAIN, log_sw_.receive_log(src_server, push_log_type, prev_lsn, prev_log_proposal_id, lsn, data_buf_, group_entry_size, true, truncate_log_info));
LSN old_lsn = lsn;
// test lsn > group_buffer capacity case, will return -4023
@ -501,7 +501,7 @@ TEST_F(TestLogSlidingWindow, test_receive_log)
EXPECT_TRUE(group_header.check_integrity(data_buf_ + group_header_size, group_entry_size - group_header_size));
PALF_LOG(INFO, "begin receive log with log_id=3, and proposal_id 21");
// handle submit log
EXPECT_EQ(OB_SUCCESS, log_sw_.try_freeze_last_log());
EXPECT_EQ(OB_SUCCESS, log_sw_.period_freeze_last_log());
EXPECT_EQ(OB_EAGAIN, log_sw_.receive_log(src_server, push_log_type, prev_lsn, prev_log_proposal_id, lsn, data_buf_, group_entry_size, true, truncate_log_info));
EXPECT_TRUE(TRUNCATE_CACHED_LOG_TASK == truncate_log_info.truncate_type_);
EXPECT_EQ(log_id, truncate_log_info.truncate_log_id_);
@ -614,7 +614,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_log)
int64_t log_ts = -1;
// submit first log
EXPECT_EQ(OB_SUCCESS, log_sw_.submit_log(buf, buf_len, ref_ts, lsn, log_ts));
EXPECT_EQ(OB_SUCCESS, log_sw_.try_freeze_last_log());
EXPECT_EQ(OB_SUCCESS, log_sw_.period_freeze_last_log());
// generate new group entry
LogEntryHeader log_entry_header;
LogGroupEntryHeader group_header;
@ -826,7 +826,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_for_rebuild)
src_server = self_;
TruncateLogInfo truncate_log_info;
// handle submit log
EXPECT_EQ(OB_SUCCESS, log_sw_.try_freeze_last_log());
EXPECT_EQ(OB_SUCCESS, log_sw_.period_freeze_last_log());
PALF_LOG(INFO, "begin receive log with log_id=2");
EXPECT_EQ(OB_SUCCESS, log_sw_.receive_log(src_server, push_log_type, prev_lsn, prev_log_proposal_id, lsn, data_buf_, group_entry_size, false, truncate_log_info));
// gen next group log