Fix for max padding group entry.

This commit is contained in:
obdev
2023-02-27 08:46:00 +00:00
committed by ob-robot
parent b93a775f12
commit 06450be1a7
7 changed files with 110 additions and 9 deletions

View File

@ -22,6 +22,7 @@
#include "share/ob_errno.h" #include "share/ob_errno.h"
#define private public #define private public
#include "logservice/palf/log_define.h" #include "logservice/palf/log_define.h"
#include "logservice/palf/lsn_allocator.h"
#include "share/scn.h" #include "share/scn.h"
#include "logservice/palf/log_rpc_processor.h" #include "logservice/palf/log_rpc_processor.h"
#include "env/ob_simple_log_cluster_env.h" #include "env/ob_simple_log_cluster_env.h"
@ -76,6 +77,71 @@ TEST_F(TestObSimpleLogClusterBasicFunc, submit_log)
PALF_LOG(INFO, "end test submit_log", K(id), K(guard)); PALF_LOG(INFO, "end test submit_log", K(id), K(guard));
} }
// test_max_padding_size: 测试padding entry最长的场景(2M+16K+88+4K-1B).
TEST_F(TestObSimpleLogClusterBasicFunc, test_max_padding_size)
{
SET_CASE_LOG_FILE(TEST_NAME, "submit_log");
//OB_LOGGER.set_log_level("TRACE");
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t create_ts = 100;
share::SCN create_scn;
create_scn.convert_for_logservice(create_ts);
int64_t leader_idx = 0;
PalfHandleImplGuard leader;
ObTimeGuard guard("submit_log", 0);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, create_scn, leader_idx, leader));
guard.click("create");
int64_t follower_1_idx = (leader_idx + 1) % ObSimpleLogClusterTestBase::member_cnt_;
int64_t follower_2_idx = (leader_idx + 2) % ObSimpleLogClusterTestBase::member_cnt_;
block_net(leader_idx, follower_1_idx);
const int64_t group_entry_header_total_size = LogGroupEntryHeader::HEADER_SER_SIZE + LogEntryHeader::HEADER_SER_SIZE;
const int64_t max_valid_group_entry_size = MAX_LOG_BODY_SIZE + group_entry_header_total_size;
// padding entry size上限如下,预期不会达到该值,故最大值应该是该值减1Byte
const int64_t max_padding_entry_size = max_valid_group_entry_size + CLOG_FILE_TAIL_PADDING_TRIGGER;
// 测试写一个最大padding entry的场景
// 首先写30条2MB的group log
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 30, leader_idx, (2 * 1024 * 1024 - group_entry_header_total_size)));
// 接着写文件尾最后一条有效的group entry, 确保文件剩余空间触发生成最大的padding entry
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, (4 * 1024 * 1024 - MAX_INFO_BLOCK_SIZE - max_valid_group_entry_size - CLOG_FILE_TAIL_PADDING_TRIGGER - group_entry_header_total_size + 1)));
// 提交一个2MB+16KB size的log buf, 触发生成padding
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
// 预期max_lsn为单个block文件size+最大valid group entry size
const LSN expected_max_lsn(PALF_BLOCK_SIZE + max_valid_group_entry_size);
guard.click("submit_log for max size padding entry finish");
CLOG_LOG(INFO, "leader submit log finished", K(expected_max_lsn));
while (leader.palf_handle_impl_->get_end_lsn() < expected_max_lsn) {
usleep(100 * 1000);
}
EXPECT_EQ(leader.palf_handle_impl_->get_end_lsn(), expected_max_lsn);
guard.click("wait leader end_lsn finish");
CLOG_LOG(INFO, "leader wait end_lsn finished", K(expected_max_lsn));
unblock_net(leader_idx, follower_1_idx);
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
PalfHandleImplGuard &follower_2 = *palf_list[follower_2_idx];
while (follower_2.palf_handle_impl_->get_end_lsn() < expected_max_lsn) {
usleep(100 * 1000);
}
EXPECT_EQ(follower_2.palf_handle_impl_->get_end_lsn(), expected_max_lsn);
guard.click("wait follower_2 end_lsn finish");
CLOG_LOG(INFO, "follower_2 wait end_lsn finished", K(expected_max_lsn));
PalfHandleImplGuard &follower_1 = *palf_list[follower_1_idx];
// follower_1依赖fetch log追日志,但end_lsn无法推到与leader一致
// 因为这里committed_end_lsn依赖周期性的keepAlive日志推进
while (follower_1.palf_handle_impl_->get_max_lsn() < expected_max_lsn) {
usleep(100 * 1000);
}
guard.click("wait follower_1 max_lsn finish");
CLOG_LOG(INFO, "follower_1 wait max_lsn finished", K(expected_max_lsn));
EXPECT_EQ(OB_SUCCESS, revert_cluster_palf_handle_guard(palf_list));
guard.click("delete");
PALF_LOG(INFO, "end test submit_log", K(id), K(guard));
}
TEST_F(TestObSimpleLogClusterBasicFunc, restart_and_clear_tmp_files) TEST_F(TestObSimpleLogClusterBasicFunc, restart_and_clear_tmp_files)
{ {
SET_CASE_LOG_FILE(TEST_NAME, "restart_and_clear_tmp_files"); SET_CASE_LOG_FILE(TEST_NAME, "restart_and_clear_tmp_files");

View File

@ -25,10 +25,12 @@ struct LsnTsInfo
{ {
LSN lsn_; LSN lsn_;
int64_t last_ack_time_us_; int64_t last_ack_time_us_;
LsnTsInfo() : lsn_(), last_ack_time_us_(OB_INVALID_TIMESTAMP) int64_t last_advance_time_us_;
LsnTsInfo()
: lsn_(), last_ack_time_us_(OB_INVALID_TIMESTAMP), last_advance_time_us_(OB_INVALID_TIMESTAMP)
{} {}
LsnTsInfo(const LSN &lsn, const int64_t ack_time_us) LsnTsInfo(const LSN &lsn, const int64_t ack_time_us)
: lsn_(lsn), last_ack_time_us_(ack_time_us) : lsn_(lsn), last_ack_time_us_(ack_time_us), last_advance_time_us_(ack_time_us)
{} {}
bool is_valid() const { bool is_valid() const {
return (lsn_.is_valid() && OB_INVALID_TIMESTAMP != last_ack_time_us_); return (lsn_.is_valid() && OB_INVALID_TIMESTAMP != last_ack_time_us_);
@ -37,13 +39,15 @@ struct LsnTsInfo
{ {
lsn_.reset(); lsn_.reset();
last_ack_time_us_ = OB_INVALID_TIMESTAMP; last_ack_time_us_ = OB_INVALID_TIMESTAMP;
last_advance_time_us_ = OB_INVALID_TIMESTAMP;
} }
void operator=(const LsnTsInfo &val) void operator=(const LsnTsInfo &val)
{ {
lsn_ = val.lsn_; lsn_ = val.lsn_;
last_ack_time_us_ = val.last_ack_time_us_; last_ack_time_us_ = val.last_ack_time_us_;
last_advance_time_us_ = val.last_advance_time_us_;
} }
TO_STRING_KV(K_(lsn), K_(last_ack_time_us)); TO_STRING_KV(K_(lsn), K_(last_ack_time_us), K_(last_advance_time_us));
}; };
struct LogMemberAckInfo struct LogMemberAckInfo

View File

@ -60,7 +60,13 @@ const int64_t PALF_PHY_BLOCK_SIZE = 1 << 26;
const int64_t PALF_BLOCK_SIZE = PALF_PHY_BLOCK_SIZE - MAX_INFO_BLOCK_SIZE; // log block size is 64M-MAX_INFO_BLOCK_SIZE by default. const int64_t PALF_BLOCK_SIZE = PALF_PHY_BLOCK_SIZE - MAX_INFO_BLOCK_SIZE; // log block size is 64M-MAX_INFO_BLOCK_SIZE by default.
const int64_t PALF_META_BLOCK_SIZE = PALF_PHY_BLOCK_SIZE - MAX_INFO_BLOCK_SIZE; // meta block size is 64M-MAX_INFO_BLOCK_SIZE by default. const int64_t PALF_META_BLOCK_SIZE = PALF_PHY_BLOCK_SIZE - MAX_INFO_BLOCK_SIZE; // meta block size is 64M-MAX_INFO_BLOCK_SIZE by default.
constexpr offset_t MAX_LOG_BUFFER_SIZE = MAX_LOG_BODY_SIZE + MAX_LOG_HEADER_SIZE; constexpr int64_t CLOG_FILE_TAIL_PADDING_TRIGGER = 4096; // 文件尾剩余空间补padding阈值
// The valid group_entry (not padding entry) size range is:
// (0, (MAX_LOG_BODY_SIZE + MAX_LOG_HEADER_SIZE) ).
// The padding group_entry size range is:
// [4KB, (max_valid_group_entry_size + CLOG_FILE_TAIL_PADDING_TRIGGER) ).
// So the MAX_LOG_BUFFER_SIZE is defined as below:
constexpr offset_t MAX_LOG_BUFFER_SIZE = MAX_LOG_BODY_SIZE + MAX_LOG_HEADER_SIZE + CLOG_FILE_TAIL_PADDING_TRIGGER;
constexpr offset_t LOG_DIO_ALIGN_SIZE = 4 * 1024; constexpr offset_t LOG_DIO_ALIGN_SIZE = 4 * 1024;
constexpr offset_t LOG_DIO_ALIGNED_BUF_SIZE_REDO = MAX_LOG_BUFFER_SIZE + LOG_DIO_ALIGN_SIZE; constexpr offset_t LOG_DIO_ALIGNED_BUF_SIZE_REDO = MAX_LOG_BUFFER_SIZE + LOG_DIO_ALIGN_SIZE;
@ -78,6 +84,8 @@ const int64_t MAX_ALLOWED_SKEW_FOR_REF_US = 3600L * 1000 * 1000; // 1h
// follower's group buffer size is 8MB larger than leader's. // follower's group buffer size is 8MB larger than leader's.
const int64_t FOLLOWER_DEFAULT_GROUP_BUFFER_SIZE = LEADER_DEFAULT_GROUP_BUFFER_SIZE + 8 * 1024 * 1024L; const int64_t FOLLOWER_DEFAULT_GROUP_BUFFER_SIZE = LEADER_DEFAULT_GROUP_BUFFER_SIZE + 8 * 1024 * 1024L;
const int64_t PALF_STAT_PRINT_INTERVAL_US = 1 * 1000 * 1000L; const int64_t PALF_STAT_PRINT_INTERVAL_US = 1 * 1000 * 1000L;
// The advance delay threshold for match lsn is 1s.
const int64_t MATCH_LSN_ADVANCE_DELAY_THRESHOLD_US = 1 * 1000 * 1000L;
const int64_t PALF_RECONFIRM_FETCH_MAX_LSN_INTERVAL = 1 * 1000 * 1000; const int64_t PALF_RECONFIRM_FETCH_MAX_LSN_INTERVAL = 1 * 1000 * 1000;
const int64_t PALF_FETCH_LOG_INTERVAL_US = 2 * 1000 * 1000L; // 2s const int64_t PALF_FETCH_LOG_INTERVAL_US = 2 * 1000 * 1000L; // 2s
const int64_t PALF_FETCH_LOG_RENEW_LEADER_INTERVAL_US = 5 * 1000 * 1000; // 5s const int64_t PALF_FETCH_LOG_RENEW_LEADER_INTERVAL_US = 5 * 1000 * 1000; // 5s

View File

@ -41,6 +41,12 @@ bool UpdateMatchLsnFunc::operator()(const common::ObAddr &server, LsnTsInfo &val
if (!value.is_valid()) { if (!value.is_valid()) {
bool_ret = false; bool_ret = false;
} else if (value.lsn_ <= new_end_lsn_) { } else if (value.lsn_ <= new_end_lsn_) {
old_end_lsn_ = value.lsn_;
old_advance_time_us_ = value.last_advance_time_us_;
if (value.lsn_ < new_end_lsn_) {
// Update last_advance_time_us_ when lsn really changes.
value.last_advance_time_us_ = new_ack_time_us_;
}
value.lsn_ = new_end_lsn_; value.lsn_ = new_end_lsn_;
value.last_ack_time_us_ = new_ack_time_us_; value.last_ack_time_us_ = new_ack_time_us_;
bool_ret = true; bool_ret = true;
@ -452,7 +458,7 @@ int LogSlidingWindow::submit_log(const char *buf,
} else if (is_need_handle && FALSE_IT(is_need_handle_next |= is_need_handle)) { } else if (is_need_handle && FALSE_IT(is_need_handle_next |= is_need_handle)) {
} else { } else {
PALF_LOG(INFO, "generate_new_group_log_ for padding log success", K_(palf_id), K_(self), K(log_id), PALF_LOG(INFO, "generate_new_group_log_ for padding log success", K_(palf_id), K_(self), K(log_id),
K(tmp_lsn), K(scn), K(is_need_handle), K(is_need_handle_next)); K(padding_size), K(tmp_lsn), K(scn), K(is_need_handle), K(is_need_handle_next));
// after gen padding_entry, update lsn to next block // after gen padding_entry, update lsn to next block
tmp_lsn.val_ += padding_size; tmp_lsn.val_ += padding_size;
log_id++; // inc log_id for following new log log_id++; // inc log_id for following new log
@ -3762,7 +3768,11 @@ int LogSlidingWindow::try_update_match_lsn_map_(const common::ObAddr &server, co
PALF_LOG(WARN, "match_lsn_map_.operate() failed", K(ret), K_(palf_id), K_(self), PALF_LOG(WARN, "match_lsn_map_.operate() failed", K(ret), K_(palf_id), K_(self),
K(server), K(end_lsn), "curr val", tmp_val); K(server), K(end_lsn), "curr val", tmp_val);
} else { } else {
// do nothing // Update successfully, check if it advances delay too long.
if (update_func.is_advance_delay_too_long()) {
PALF_LOG(WARN, "[MATCH LSN ADVANCE DELAY]match_lsn advance delay too much time",
K(ret), K_(palf_id), K_(self), K(server), K(update_func));
}
} }
} }
PALF_LOG(TRACE, "try_update_match_lsn_map_ finished", K(ret), K_(palf_id), K_(self), K(server), K(end_lsn)); PALF_LOG(TRACE, "try_update_match_lsn_map_ finished", K(ret), K_(palf_id), K_(self), K(server), K(end_lsn));

View File

@ -105,14 +105,26 @@ class UpdateMatchLsnFunc
{ {
public: public:
UpdateMatchLsnFunc(const LSN &end_lsn, const int64_t new_ack_time_us) UpdateMatchLsnFunc(const LSN &end_lsn, const int64_t new_ack_time_us)
: new_end_lsn_(end_lsn), new_ack_time_us_(new_ack_time_us) : new_end_lsn_(end_lsn), old_end_lsn_(), new_ack_time_us_(new_ack_time_us), old_advance_time_us_(OB_INVALID_TIMESTAMP)
{} {}
~UpdateMatchLsnFunc() {} ~UpdateMatchLsnFunc() {}
bool operator()(const common::ObAddr &server, LsnTsInfo &value); bool operator()(const common::ObAddr &server, LsnTsInfo &value);
TO_STRING_KV(K_(new_end_lsn), K_(new_ack_time_us)); bool is_advance_delay_too_long() const {
bool bool_ret = false;
if (old_end_lsn_ < new_end_lsn_
&& (new_ack_time_us_ - old_advance_time_us_) > MATCH_LSN_ADVANCE_DELAY_THRESHOLD_US) {
// Return true when advance delay exceeds 1s.
bool_ret = true;
}
return bool_ret;
}
TO_STRING_KV(K_(old_end_lsn), K_(new_end_lsn), K_(old_advance_time_us), K_(new_ack_time_us),
"advance delay(us)", new_ack_time_us_ - old_advance_time_us_);
private: private:
LSN new_end_lsn_; LSN new_end_lsn_;
LSN old_end_lsn_;
int64_t new_ack_time_us_; int64_t new_ack_time_us_;
int64_t old_advance_time_us_;
}; };
class GetLaggedListFunc class GetLaggedListFunc

View File

@ -79,7 +79,6 @@ private:
static const int64_t LOG_TS_DELTA_UPPER_BOUND = (1ul << LOG_TS_DELTA_BIT_CNT) - 1000; static const int64_t LOG_TS_DELTA_UPPER_BOUND = (1ul << LOG_TS_DELTA_BIT_CNT) - 1000;
static const uint64_t LOG_CUT_TRIGGER = 1 << 21; // 聚合日志跨2M边界时切分 static const uint64_t LOG_CUT_TRIGGER = 1 << 21; // 聚合日志跨2M边界时切分
static const uint64_t LOG_CUT_TRIGGER_MASK = (1 << 21) - 1; static const uint64_t LOG_CUT_TRIGGER_MASK = (1 << 21) - 1;
static const int64_t CLOG_FILE_TAIL_PADDING_TRIGGER = 4096; // 文件尾剩余空间补padding阈值
static const uint64_t MAX_SUPPORTED_BLOCK_ID = 0xfffffffff - 1000; // block_id告警阈值 static const uint64_t MAX_SUPPORTED_BLOCK_ID = 0xfffffffff - 1000; // block_id告警阈值
static const uint64_t MAX_SUPPORTED_BLOCK_OFFSET = 0xfffffff; // block_offset的最大支持256MB static const uint64_t MAX_SUPPORTED_BLOCK_OFFSET = 0xfffffff; // block_offset的最大支持256MB
private: private:

View File

@ -2837,6 +2837,8 @@ int PalfHandleImpl::try_send_committed_info_(const ObAddr &server,
AccessMode access_mode; AccessMode access_mode;
if (!log_lsn.is_valid() || !log_end_lsn.is_valid() || INVALID_PROPOSAL_ID == log_proposal_id) { if (!log_lsn.is_valid() || !log_end_lsn.is_valid() || INVALID_PROPOSAL_ID == log_proposal_id) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid arguments", K(ret), KPC(this), K(server), K(log_lsn), K(log_end_lsn),
K(log_proposal_id));
} else if (OB_FAIL(mode_mgr_.get_access_mode(access_mode))) { } else if (OB_FAIL(mode_mgr_.get_access_mode(access_mode))) {
PALF_LOG(WARN, "get_access_mode failed", K(ret), KPC(this)); PALF_LOG(WARN, "get_access_mode failed", K(ret), KPC(this));
} else if (AccessMode::APPEND == access_mode) { } else if (AccessMode::APPEND == access_mode) {