replace parity checksum in the log header with xxhash16
This commit is contained in:
@ -1393,6 +1393,8 @@ int FetchStream::read_log_(
|
||||
const char *buffer = nullptr;
|
||||
|
||||
if (OB_FAIL(ls_fetch_ctx_->get_next_group_entry(group_entry, group_start_lsn, buffer))) {
|
||||
// If failed, reset memory storage
|
||||
ls_fetch_ctx_->reset_memory_storage();
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_ERROR("get next_group_entry failed", KR(ret), K_(ls_fetch_ctx), K(resp));
|
||||
if (OB_CHECKSUM_ERROR == ret || OB_INVALID_DATA == ret) {
|
||||
|
@ -14,7 +14,11 @@
|
||||
#include "lib/checksum/ob_crc64.h" // ob_crc64
|
||||
#include "lib/checksum/ob_parity_check.h" // parity_check
|
||||
#include "lib/ob_errno.h" // errno
|
||||
#include "lib/checksum/ob_crc16.h" // ob_crc16
|
||||
#include "share/ob_cluster_version.h" // GET_MIN_DATA_VERSION
|
||||
#include "share/rc/ob_tenant_base.h" // MTL_ID
|
||||
#include "logservice/ob_log_base_header.h" // ObLogBaseHeader
|
||||
#include "lib/hash/xxhash.h" // XXH64
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -24,6 +28,15 @@ namespace palf
|
||||
|
||||
const int64_t LogEntryHeader::HEADER_SER_SIZE = sizeof(LogEntryHeader);
|
||||
const int64_t LogEntryHeader::PADDING_LOG_ENTRY_SIZE = sizeof(LogEntryHeader) + sizeof(logservice::ObLogBaseHeader);
|
||||
const int16_t LogEntryHeader::MAGIC = 0x4C48;
|
||||
|
||||
const int16_t LogEntryHeader::LOG_ENTRY_HEADER_VERSION = 1;
|
||||
const int64_t LogEntryHeader::PADDING_TYPE_MASK = 1 << 1;
|
||||
|
||||
const int16_t LogEntryHeader::LOG_ENTRY_HEADER_VERSION2 = 2;
|
||||
const int64_t LogEntryHeader::PADDING_TYPE_MASK_VERSION2 = 1ll << 62;
|
||||
const int64_t LogEntryHeader::CRC16_MASK = 0xffff;
|
||||
const int64_t LogEntryHeader::PARITY_MASK = 0x01;
|
||||
|
||||
LogEntryHeader::LogEntryHeader()
|
||||
: magic_(0),
|
||||
@ -65,24 +78,35 @@ bool LogEntryHeader::is_valid() const
|
||||
return (magic_ == LogEntryHeader::MAGIC && log_size_ > 0 && scn_.is_valid());
|
||||
}
|
||||
|
||||
bool LogEntryHeader::get_header_parity_check_res_() const
|
||||
uint16_t LogEntryHeader::calculate_header_checksum_() const
|
||||
{
|
||||
bool bool_ret = parity_check(reinterpret_cast<const uint16_t &>(magic_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint16_t &>(version_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint32_t &>(log_size_));
|
||||
bool_ret ^= parity_check((scn_.get_val_for_logservice()));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(data_checksum_));
|
||||
int64_t tmp_flag = (flag_ & ~(0x1));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(tmp_flag));
|
||||
return bool_ret;
|
||||
uint16_t checksum = 0;
|
||||
if (LOG_ENTRY_HEADER_VERSION == version_) {
|
||||
bool bool_ret = parity_check(reinterpret_cast<const uint16_t &>(magic_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint16_t &>(version_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint32_t &>(log_size_));
|
||||
bool_ret ^= parity_check((scn_.get_val_for_logservice()));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(data_checksum_));
|
||||
int64_t tmp_flag = (flag_ & ~PARITY_MASK);
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(tmp_flag));
|
||||
checksum = bool_ret ? 1 : 0;
|
||||
} else if (LOG_ENTRY_HEADER_VERSION2 == version_) {
|
||||
// NB: To avoid dealing with endianness issue, make the last two bytes of flag_ with zero.
|
||||
int64_t ori_flag = flag_;
|
||||
this->flag_ = (ori_flag & ~CRC16_MASK);
|
||||
checksum = xxhash_16(checksum, reinterpret_cast<const uint8_t*>(this), sizeof(LogEntryHeader));
|
||||
this->flag_ = ori_flag;
|
||||
} else {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected version", KPC(this));
|
||||
}
|
||||
PALF_LOG(TRACE, "update_header_checksum_", KPC(this), K(checksum));
|
||||
return checksum;
|
||||
}
|
||||
|
||||
void LogEntryHeader::update_header_checksum_()
|
||||
{
|
||||
if (get_header_parity_check_res_()) {
|
||||
flag_ |= 0x1;
|
||||
}
|
||||
PALF_LOG(TRACE, "update_header_checksum_ finished", K(*this), "parity flag", (flag_ & 0x1));
|
||||
reset_header_checksum_();
|
||||
flag_ = (flag_ | calculate_header_checksum_());
|
||||
}
|
||||
|
||||
int LogEntryHeader::generate_header(const char *log_data,
|
||||
@ -94,27 +118,36 @@ int LogEntryHeader::generate_header(const char *log_data,
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} else {
|
||||
magic_ = LogEntryHeader::MAGIC;
|
||||
version_ = LOG_ENTRY_HEADER_VERSION;
|
||||
version_ = get_version_();
|
||||
log_size_ = data_len;
|
||||
scn_ = scn;
|
||||
data_checksum_ = common::ob_crc64(log_data, data_len);
|
||||
// update header checksum after all member vars assigned
|
||||
(void) update_header_checksum_();
|
||||
PALF_LOG(TRACE, "generate_header", KPC(this));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool LogEntryHeader::check_header_checksum_() const
|
||||
{
|
||||
const int64_t header_checksum = get_header_parity_check_res_() ? 1 : 0;
|
||||
const int64_t saved_header_checksum = flag_ & (0x1);
|
||||
return (header_checksum == saved_header_checksum);
|
||||
bool bool_ret = false;
|
||||
const uint16_t header_checksum = calculate_header_checksum_();
|
||||
if (LOG_ENTRY_HEADER_VERSION2 != version_ && LOG_ENTRY_HEADER_VERSION != version_) {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "check_header_checksum_ failed, invalid version_", KPC(this));
|
||||
} else {
|
||||
int64_t mask = get_header_checksum_mask_();
|
||||
const uint16_t saved_header_checksum = (flag_ & mask);
|
||||
bool_ret = (header_checksum == saved_header_checksum);
|
||||
if (false == bool_ret) {
|
||||
PALF_LOG_RET(ERROR, OB_INVALID_DATA, "check_header_checksum_ failed", K(saved_header_checksum), K(header_checksum), KPC(this));
|
||||
}
|
||||
}
|
||||
return bool_ret;
|
||||
}
|
||||
|
||||
bool LogEntryHeader::is_padding_log_() const
|
||||
{
|
||||
return (flag_ & PADDING_TYPE_MASK) > 0;
|
||||
return (flag_ & get_padding_mask_()) > 0;
|
||||
}
|
||||
|
||||
// static member function
|
||||
@ -199,11 +232,11 @@ int LogEntryHeader::generate_padding_header_(const char *log_data,
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} else {
|
||||
magic_ = LogEntryHeader::MAGIC;
|
||||
version_ = LogEntryHeader::LOG_ENTRY_HEADER_VERSION;
|
||||
version_ = get_version_();
|
||||
log_size_ = padding_data_len;
|
||||
scn_ = scn;
|
||||
data_checksum_ = common::ob_crc64(log_data, base_header_len);
|
||||
flag_ = (flag_ | LogEntryHeader::PADDING_TYPE_MASK);
|
||||
flag_ = (flag_ | get_padding_mask_());
|
||||
// update header checksum after all member vars assigned
|
||||
(void) update_header_checksum_();
|
||||
PALF_LOG(INFO, "generate_padding_header_ success", KPC(this), K(log_data), K(base_header_len), K(padding_data_len));
|
||||
@ -211,6 +244,56 @@ int LogEntryHeader::generate_padding_header_(const char *log_data,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int16_t LogEntryHeader::get_version_() const
|
||||
{
|
||||
int16_t version = LOG_ENTRY_HEADER_VERSION;
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t min_data_version = 0;
|
||||
if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), min_data_version))) {
|
||||
PALF_LOG(WARN, "GET_MIN_DATA_VERSION failed", K(ret));
|
||||
} else if (min_data_version >= DATA_VERSION_4_3_3_0) {
|
||||
version = LOG_ENTRY_HEADER_VERSION2;
|
||||
}
|
||||
return version;
|
||||
}
|
||||
|
||||
int64_t LogEntryHeader::get_padding_mask_() const
|
||||
{
|
||||
int64_t padding_mask = 0;
|
||||
if (LOG_ENTRY_HEADER_VERSION2 == version_) {
|
||||
padding_mask = PADDING_TYPE_MASK_VERSION2;
|
||||
} else if (LOG_ENTRY_HEADER_VERSION == version_) {
|
||||
padding_mask = PADDING_TYPE_MASK;
|
||||
} else {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected version!!!", KPC(this));
|
||||
}
|
||||
return padding_mask;
|
||||
}
|
||||
|
||||
int64_t LogEntryHeader::get_header_checksum_mask_() const
|
||||
{
|
||||
int64_t header_checksum_mask = 0;
|
||||
if (LOG_ENTRY_HEADER_VERSION2 == version_) {
|
||||
header_checksum_mask = CRC16_MASK;
|
||||
} else if (LOG_ENTRY_HEADER_VERSION == version_) {
|
||||
header_checksum_mask = PARITY_MASK;
|
||||
} else {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected version!!!", KPC(this));
|
||||
}
|
||||
return header_checksum_mask;
|
||||
}
|
||||
|
||||
void LogEntryHeader::reset_header_checksum_()
|
||||
{
|
||||
if (LOG_ENTRY_HEADER_VERSION2 == version_) {
|
||||
flag_ &= (~CRC16_MASK);
|
||||
} else if (LOG_ENTRY_HEADER_VERSION == version_) {
|
||||
flag_ &= (~PARITY_MASK);
|
||||
} else {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected version_", KPC(this));
|
||||
}
|
||||
}
|
||||
|
||||
DEFINE_SERIALIZE(LogEntryHeader)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -245,6 +328,7 @@ DEFINE_DESERIALIZE(LogEntryHeader)
|
||||
ret = OB_BUF_NOT_ENOUGH;
|
||||
} else if (false == check_header_integrity()) {
|
||||
ret = OB_INVALID_DATA;
|
||||
PALF_LOG(WARN, "invalid log_entry_header", KR(ret), KPC(this));
|
||||
} else {
|
||||
pos = new_pos;
|
||||
}
|
||||
@ -263,5 +347,9 @@ DEFINE_GET_SERIALIZE_SIZE(LogEntryHeader)
|
||||
return size;
|
||||
}
|
||||
|
||||
int16_t xxhash_16(int16_t checksum, const uint8_t* data, const int64_t data_len)
|
||||
{
|
||||
return XXH64(data, data_len, checksum) & 0xffff;
|
||||
}
|
||||
} // namespace palf
|
||||
} // namespace oceanbase
|
||||
|
@ -58,11 +58,11 @@ public:
|
||||
"data_checksum", data_checksum_,
|
||||
"flag", flag_);
|
||||
public:
|
||||
static constexpr int16_t MAGIC = 0x4C48; // 'LH' means LOG ENTRY HEADER
|
||||
static const int16_t MAGIC; // 'LH' means LOG ENTRY HEADER
|
||||
static const int64_t HEADER_SER_SIZE;
|
||||
static const int64_t PADDING_LOG_ENTRY_SIZE;
|
||||
private:
|
||||
bool get_header_parity_check_res_() const;
|
||||
uint16_t calculate_header_checksum_() const;
|
||||
void update_header_checksum_();
|
||||
bool check_header_checksum_() const;
|
||||
bool is_padding_log_() const;
|
||||
@ -71,9 +71,17 @@ private:
|
||||
const int64_t base_header_len,
|
||||
const int64_t padding_data_len,
|
||||
const share::SCN &scn);
|
||||
int16_t get_version_() const;
|
||||
int64_t get_padding_mask_() const;
|
||||
int64_t get_header_checksum_mask_() const;
|
||||
void reset_header_checksum_();
|
||||
private:
|
||||
static constexpr int16_t LOG_ENTRY_HEADER_VERSION = 1;
|
||||
static constexpr int64_t PADDING_TYPE_MASK = 1 << 1;
|
||||
static const int16_t LOG_ENTRY_HEADER_VERSION;
|
||||
static const int64_t PADDING_TYPE_MASK;
|
||||
static const int16_t LOG_ENTRY_HEADER_VERSION2;
|
||||
static const int64_t PADDING_TYPE_MASK_VERSION2;
|
||||
static const int64_t CRC16_MASK;
|
||||
static const int64_t PARITY_MASK;
|
||||
private:
|
||||
int16_t magic_;
|
||||
int16_t version_;
|
||||
@ -81,8 +89,15 @@ private:
|
||||
share::SCN scn_;
|
||||
int64_t data_checksum_;
|
||||
// The lowest bit is used for parity check.
|
||||
int64_t flag_;
|
||||
// LOG_ENTRY_HEADER_VERSION
|
||||
// | sign bit | 61 unused bit | PADDING bit | PARITY CHECKSUM bit |
|
||||
//
|
||||
// LOG_ENTRY_HEADER_VERSION2
|
||||
// | sign bit | PADDING bit | 46 unused bit | 16 crc16 bit |
|
||||
mutable int64_t flag_;
|
||||
};
|
||||
|
||||
int16_t xxhash_16(int16_t checksum, const uint8_t* data, const int64_t data_len);
|
||||
}
|
||||
}
|
||||
#endif // OCEANBASE_LOGSERVICE_LOG_ENTRY_HEADER_
|
||||
|
@ -154,5 +154,10 @@ int LogGroupEntry::get_log_min_scn(SCN &min_scn) const
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool LogGroupEntry::check_compatibility() const
|
||||
{
|
||||
return header_.check_compatibility();
|
||||
}
|
||||
} // end namespace palf
|
||||
} // end namespace oceanbase
|
||||
|
@ -54,6 +54,7 @@ public:
|
||||
// param[in] upper_limit_scn the upper bound to determain which log entries can reserve
|
||||
// param[in] pre_accum_checksum, the accum_checksum of the pre log
|
||||
int truncate(const share::SCN &upper_limit_scn, const int64_t pre_accum_checksum);
|
||||
bool check_compatibility() const;
|
||||
|
||||
TO_STRING_KV("LogGroupEntryHeader", header_);
|
||||
NEED_SERIALIZE_AND_DESERIALIZE;
|
||||
|
@ -11,15 +11,18 @@
|
||||
*/
|
||||
|
||||
#include "log_group_entry_header.h" // LogGroupEntryHeader
|
||||
#include "log_entry.h" // LogEntry
|
||||
#include "log_entry_header.h" // LogEntryHeader
|
||||
#include "lib/checksum/ob_crc64.h" // ob_crc64
|
||||
#include "lib/checksum/ob_crc16.h" // ob_crc16
|
||||
#include "lib/checksum/ob_parity_check.h" // parity_check
|
||||
#include "lib/utility/utility.h" // !FALSE_IT
|
||||
#include "lib/oblog/ob_log_module.h" // LOG*
|
||||
#include "share/scn.h" // SCN
|
||||
#include "share/ob_cluster_version.h" // GET_MIN_DATA_VERSION
|
||||
#include "share/rc/ob_tenant_base.h" // MTL_ID
|
||||
#include "log_define.h" // is_valid_log_id...
|
||||
#include "log_writer_utils.h" // LogWriteBuf
|
||||
#include "log_entry.h" // LogEntry
|
||||
#include "log_entry_header.h" // LogEntryHeader
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -29,6 +32,18 @@ using namespace common;
|
||||
using namespace share;
|
||||
|
||||
const int64_t LogGroupEntryHeader::HEADER_SER_SIZE = sizeof(LogGroupEntryHeader);
|
||||
const int16_t LogGroupEntryHeader::MAGIC = 0x4752;
|
||||
|
||||
const int16_t LogGroupEntryHeader::LOG_GROUP_ENTRY_HEADER_VERSION = 1;
|
||||
const int64_t LogGroupEntryHeader::PADDING_TYPE_MASK = 1 << 1;
|
||||
const int64_t LogGroupEntryHeader::RAW_WRITE_MASK = 1 << 2;
|
||||
const int64_t LogGroupEntryHeader::PADDING_LOG_DATA_CHECKSUM = 0;
|
||||
|
||||
const int16_t LogGroupEntryHeader::LOG_GROUP_ENTRY_HEADER_VERSION2 = 2;
|
||||
const int64_t LogGroupEntryHeader::PADDING_TYPE_MASK_VERSION2 = 1ll << 62;
|
||||
const int64_t LogGroupEntryHeader::RAW_WRITE_MASK_VERSION2 = 1ll << 61;
|
||||
const int64_t LogGroupEntryHeader::CRC16_MASK = 0xffff;
|
||||
const int64_t LogGroupEntryHeader::PARITY_MASK = 0x01;
|
||||
|
||||
LogGroupEntryHeader::LogGroupEntryHeader()
|
||||
{
|
||||
@ -43,7 +58,7 @@ LogGroupEntryHeader::~LogGroupEntryHeader()
|
||||
bool LogGroupEntryHeader::is_valid() const
|
||||
{
|
||||
return LogGroupEntryHeader::MAGIC == magic_
|
||||
&& LOG_GROUP_ENTRY_HEADER_VERSION == version_
|
||||
&& (LOG_GROUP_ENTRY_HEADER_VERSION == version_ || LOG_GROUP_ENTRY_HEADER_VERSION2 == version_)
|
||||
&& INVALID_PROPOSAL_ID != proposal_id_
|
||||
&& true == committed_end_lsn_.is_valid()
|
||||
&& true == max_scn_.is_valid()
|
||||
@ -83,24 +98,23 @@ int LogGroupEntryHeader::generate(const bool is_raw_write,
|
||||
K(max_scn), K(log_id), K(committed_end_lsn), K(log_proposal_id));
|
||||
} else {
|
||||
magic_ = LogGroupEntryHeader::MAGIC;
|
||||
version_ = LOG_GROUP_ENTRY_HEADER_VERSION;
|
||||
version_ = get_version_();
|
||||
group_size_ = static_cast<int32_t>(data_len);
|
||||
max_scn_ = max_scn;
|
||||
log_id_ = log_id;
|
||||
committed_end_lsn_ = committed_end_lsn;
|
||||
proposal_id_ = log_proposal_id;
|
||||
if (is_padding_log) {
|
||||
flag_ |= PADDING_TYPE_MASK;
|
||||
flag_ = (flag_ | get_padding_mask_());
|
||||
}
|
||||
if (is_raw_write) {
|
||||
flag_ |= RAW_WRITE_MASK;
|
||||
flag_ = (flag_ | get_raw_write_mask_());
|
||||
}
|
||||
if (OB_FAIL(calculate_log_checksum_(is_padding_log, log_write_buf, data_len, data_checksum))) {
|
||||
PALF_LOG(ERROR, "calculate_log_checksum_ failed", K(ret));
|
||||
PALF_LOG(ERROR, "calculate_log_checksum_ failed", K(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
PALF_LOG(TRACE, "LogGroupEntryHeader generate", K(ret), K(is_padding_log), K(*this), K(data_checksum),
|
||||
"haeder_checksum", get_header_parity_check_res_());
|
||||
PALF_LOG(TRACE, "LogGroupEntryHeader generate", K(ret), K(is_padding_log), K(*this), K(data_checksum));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -211,19 +225,31 @@ int LogGroupEntryHeader::calculate_log_checksum_(const bool is_padding_log,
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool LogGroupEntryHeader::get_header_parity_check_res_() const
|
||||
uint16_t LogGroupEntryHeader::calculate_header_checksum_() const
|
||||
{
|
||||
bool bool_ret = parity_check(reinterpret_cast<const uint16_t &>(magic_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint16_t &>(version_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint32_t &>(group_size_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(proposal_id_));
|
||||
bool_ret ^= parity_check(committed_end_lsn_.val_);
|
||||
bool_ret ^= parity_check(max_scn_.get_val_for_logservice());
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(accumulated_checksum_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(log_id_));
|
||||
int64_t tmp_flag = (flag_ & ~(0x1));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(tmp_flag));
|
||||
return bool_ret;
|
||||
uint16_t checksum = 0;
|
||||
if (LOG_GROUP_ENTRY_HEADER_VERSION == version_) {
|
||||
bool bool_ret = parity_check(reinterpret_cast<const uint16_t &>(magic_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint16_t &>(version_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint32_t &>(group_size_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(proposal_id_));
|
||||
bool_ret ^= parity_check(committed_end_lsn_.val_);
|
||||
bool_ret ^= parity_check(max_scn_.get_val_for_logservice());
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(accumulated_checksum_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(log_id_));
|
||||
int64_t tmp_flag = (flag_ & ~PARITY_MASK);
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(tmp_flag));
|
||||
checksum = (bool_ret ? 1 : 0);
|
||||
} else if (LOG_GROUP_ENTRY_HEADER_VERSION2 == version_) {
|
||||
// NB: To avoid dealing with endianness issue, make the last two bytes of flag_ with zero.
|
||||
int64_t ori_flag = flag_;
|
||||
this->flag_ = (ori_flag & ~CRC16_MASK);
|
||||
checksum = xxhash_16(checksum, reinterpret_cast<const uint8_t*>(this), sizeof(LogGroupEntryHeader));
|
||||
this->flag_ = ori_flag;
|
||||
} else {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected version", KPC(this));
|
||||
}
|
||||
return checksum;
|
||||
}
|
||||
|
||||
void LogGroupEntryHeader::update_header_checksum()
|
||||
@ -233,13 +259,9 @@ void LogGroupEntryHeader::update_header_checksum()
|
||||
|
||||
void LogGroupEntryHeader::update_header_checksum_()
|
||||
{
|
||||
if (get_header_parity_check_res_()) {
|
||||
flag_ |= 0x1;
|
||||
} else {
|
||||
// group header可能会被复用并更新部分字段(比如raw write场景)
|
||||
flag_ = (flag_ & ~(0x1));
|
||||
}
|
||||
PALF_LOG(TRACE, "update_header_checksum_ finished", K(*this), "flag", (flag_ & 0x1));
|
||||
reset_header_checksum_();
|
||||
flag_ = (flag_ | calculate_header_checksum_());
|
||||
PALF_LOG(TRACE, "update_header_checksum_ finished", KPC(this));
|
||||
}
|
||||
|
||||
LogGroupEntryHeader& LogGroupEntryHeader::operator=(const LogGroupEntryHeader &header)
|
||||
@ -331,7 +353,7 @@ int LogGroupEntryHeader::update_committed_end_lsn(const LSN &lsn)
|
||||
void LogGroupEntryHeader::update_write_mode(const bool is_raw_write)
|
||||
{
|
||||
if (true == is_raw_write) {
|
||||
flag_ |= RAW_WRITE_MASK;
|
||||
flag_ = (flag_ | get_raw_write_mask_());
|
||||
}
|
||||
}
|
||||
|
||||
@ -406,9 +428,16 @@ void LogGroupEntryHeader::update_accumulated_checksum(int64_t accumulated_checks
|
||||
|
||||
bool LogGroupEntryHeader::check_header_checksum_() const
|
||||
{
|
||||
const int64_t header_checksum = get_header_parity_check_res_() ? 1 : 0;
|
||||
const int64_t saved_header_checksum = flag_ & (0x1);
|
||||
return (header_checksum == saved_header_checksum);
|
||||
bool bool_ret = false;
|
||||
const uint16_t header_checksum = calculate_header_checksum_();
|
||||
if (LOG_GROUP_ENTRY_HEADER_VERSION2 != version_ && LOG_GROUP_ENTRY_HEADER_VERSION != version_) {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "check_header_checksum_ failed, invalid version_", KPC(this));
|
||||
} else {
|
||||
int64_t mask = get_header_checksum_mask_();
|
||||
const uint16_t saved_header_checksum = (flag_ & mask);
|
||||
bool_ret = (header_checksum == saved_header_checksum);
|
||||
}
|
||||
return bool_ret;
|
||||
}
|
||||
|
||||
bool LogGroupEntryHeader::check_log_checksum_(const char *buf,
|
||||
@ -451,12 +480,12 @@ bool LogGroupEntryHeader::check_log_checksum_(const char *buf,
|
||||
|
||||
bool LogGroupEntryHeader::is_padding_log() const
|
||||
{
|
||||
return (flag_ & PADDING_TYPE_MASK) > 0;
|
||||
return (flag_ & get_padding_mask_()) > 0;
|
||||
}
|
||||
|
||||
bool LogGroupEntryHeader::is_raw_write() const
|
||||
{
|
||||
return (flag_ & RAW_WRITE_MASK) > 0;
|
||||
return (flag_ & get_raw_write_mask_()) > 0;
|
||||
}
|
||||
|
||||
int LogGroupEntryHeader::truncate(const char *buf,
|
||||
@ -508,5 +537,81 @@ int LogGroupEntryHeader::truncate(const char *buf,
|
||||
PALF_LOG(INFO, "truncate finished", K(ret), K(*this));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int16_t LogGroupEntryHeader::get_version_() const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int16_t version = LOG_GROUP_ENTRY_HEADER_VERSION;
|
||||
uint64_t min_data_version = 0;
|
||||
if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), min_data_version))) {
|
||||
PALF_LOG(WARN, "GET_MIN_DATA_VERSION failed", K(ret));
|
||||
} else if (min_data_version >= DATA_VERSION_4_3_3_0) {
|
||||
version = LOG_GROUP_ENTRY_HEADER_VERSION2;
|
||||
}
|
||||
return version;
|
||||
}
|
||||
|
||||
int64_t LogGroupEntryHeader::get_padding_mask_() const
|
||||
{
|
||||
int64_t mask = PADDING_TYPE_MASK;
|
||||
if (LOG_GROUP_ENTRY_HEADER_VERSION2 == version_) {
|
||||
mask = PADDING_TYPE_MASK_VERSION2;
|
||||
} else if (LOG_GROUP_ENTRY_HEADER_VERSION == version_) {
|
||||
mask = PADDING_TYPE_MASK;
|
||||
} else {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected version_", KPC(this));
|
||||
}
|
||||
return mask;
|
||||
}
|
||||
|
||||
int64_t LogGroupEntryHeader::get_header_checksum_mask_() const
|
||||
{
|
||||
int64_t header_checksum_mask = 0;
|
||||
if (LOG_GROUP_ENTRY_HEADER_VERSION2 == version_) {
|
||||
header_checksum_mask = CRC16_MASK;
|
||||
} else if (LOG_GROUP_ENTRY_HEADER_VERSION == version_) {
|
||||
header_checksum_mask = PARITY_MASK;
|
||||
} else {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected version!!!", KPC(this));
|
||||
}
|
||||
return header_checksum_mask;
|
||||
}
|
||||
|
||||
int64_t LogGroupEntryHeader::get_raw_write_mask_() const
|
||||
{
|
||||
int64_t mask = RAW_WRITE_MASK;
|
||||
if (LOG_GROUP_ENTRY_HEADER_VERSION2 == version_) {
|
||||
mask = RAW_WRITE_MASK_VERSION2;
|
||||
} else if (LOG_GROUP_ENTRY_HEADER_VERSION == version_) {
|
||||
mask = RAW_WRITE_MASK;
|
||||
} else {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected version_", KPC(this));
|
||||
}
|
||||
return mask;
|
||||
}
|
||||
|
||||
void LogGroupEntryHeader::reset_header_checksum_()
|
||||
{
|
||||
if (LOG_GROUP_ENTRY_HEADER_VERSION2 == version_) {
|
||||
flag_ &= (~CRC16_MASK);
|
||||
} else if (LOG_GROUP_ENTRY_HEADER_VERSION == version_) {
|
||||
flag_ &= (~PARITY_MASK);
|
||||
} else {
|
||||
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected version_", KPC(this));
|
||||
}
|
||||
}
|
||||
|
||||
bool LogGroupEntryHeader::check_compatibility() const
|
||||
{
|
||||
bool bool_ret = false;
|
||||
if (!is_valid()) {
|
||||
PALF_LOG_RET(WARN, OB_EAGAIN, "invalid LogGroupEntryHeader", KPC(this));
|
||||
} else if (LOG_GROUP_ENTRY_HEADER_VERSION2 == version_ && LOG_GROUP_ENTRY_HEADER_VERSION2 != get_version_()) {
|
||||
PALF_LOG_RET(WARN, OB_EAGAIN, "data version not match!!!", KPC(this));
|
||||
} else {
|
||||
bool_ret = true;
|
||||
}
|
||||
return bool_ret;
|
||||
}
|
||||
} // end namespace palf
|
||||
} // end namespace oceanbase
|
||||
|
@ -14,9 +14,8 @@
|
||||
#define OCEANBASE_LOGSERVICE_LOG_GROUP_ENTRY_HEADER_
|
||||
|
||||
#include "lib/ob_define.h" // Serialization
|
||||
#include "lib/ob_name_def.h"
|
||||
#include "lib/utility/ob_print_utils.h" // Print*
|
||||
#include "share/scn.h" // SCN
|
||||
#include "share/scn.h" // SCN
|
||||
#include "lsn.h" // LSN
|
||||
|
||||
namespace oceanbase
|
||||
@ -89,6 +88,7 @@ public:
|
||||
const share::SCN &cut_scn,
|
||||
const int64_t pre_accum_checksum);
|
||||
|
||||
bool check_compatibility() const;
|
||||
NEED_SERIALIZE_AND_DESERIALIZE;
|
||||
|
||||
TO_STRING_KV("magic", magic_,
|
||||
@ -109,17 +109,28 @@ private:
|
||||
int64_t &data_checksum);
|
||||
bool check_header_checksum_() const;
|
||||
bool check_log_checksum_(const char *buf, const int64_t data_len, int64_t &group_log_checksum) const;
|
||||
bool get_header_parity_check_res_() const;
|
||||
uint16_t calculate_header_checksum_() const;
|
||||
int16_t get_version_() const;
|
||||
int64_t get_padding_mask_() const;
|
||||
int64_t get_raw_write_mask_() const;
|
||||
int64_t get_header_checksum_mask_() const;
|
||||
void reset_header_checksum_();
|
||||
public:
|
||||
// Update this variable when modifying header's member
|
||||
static const int64_t HEADER_SER_SIZE;
|
||||
//GR means record
|
||||
static constexpr int16_t MAGIC = 0x4752;
|
||||
static const int16_t MAGIC;
|
||||
private:
|
||||
static constexpr int16_t LOG_GROUP_ENTRY_HEADER_VERSION = 1;
|
||||
static constexpr int64_t PADDING_TYPE_MASK = 1 << 1;
|
||||
static constexpr int64_t RAW_WRITE_MASK = 1 << 2;
|
||||
static constexpr int64_t PADDING_LOG_DATA_CHECKSUM = 0; // padding log的data_checksum为0
|
||||
static const int16_t LOG_GROUP_ENTRY_HEADER_VERSION;
|
||||
static const int64_t PADDING_TYPE_MASK;
|
||||
static const int64_t RAW_WRITE_MASK;
|
||||
static const int64_t PADDING_LOG_DATA_CHECKSUM;
|
||||
|
||||
static const int16_t LOG_GROUP_ENTRY_HEADER_VERSION2;
|
||||
static const int64_t PADDING_TYPE_MASK_VERSION2;
|
||||
static const int64_t RAW_WRITE_MASK_VERSION2;
|
||||
static const int64_t CRC16_MASK;
|
||||
static const int64_t PARITY_MASK;
|
||||
private:
|
||||
// Binary visualization, for LogGroupEntryHeader, its' magic number
|
||||
// is 0x4752, means GR(group header)
|
||||
@ -140,10 +151,16 @@ private:
|
||||
// The log id of this log, this field just only used for
|
||||
// sliding window
|
||||
int64_t log_id_;
|
||||
|
||||
// LOG_GROUP_ENTRY_HEADER_VERSION
|
||||
// | sign bit | 60 unused bit | RAW WRITE bit | PADDING bit | PARITY CHECKSUM bit |
|
||||
// The lowest bit is used for parity check.
|
||||
// The second bit from last is used for padding type flag.
|
||||
// The third bit from last is used for checking whether is RAW_WRITE
|
||||
int64_t flag_;
|
||||
//
|
||||
// LOG_GROUP_ENTRY_HEADER_VERSION2
|
||||
// | sign bit | PADDING bit | RAW WRITE BIT | 45 unused bit | 16 crc16 bit|
|
||||
mutable int64_t flag_;
|
||||
};
|
||||
|
||||
} // end namespace palf
|
||||
|
@ -55,8 +55,7 @@ bool LogMetaEntryHeader::is_valid() const
|
||||
{
|
||||
return MAGIC == magic_
|
||||
&& LOG_META_ENTRY_HEADER_VERSION == version_
|
||||
&& 0 != data_len_
|
||||
&& 0 != header_checksum_;
|
||||
&& 0 != data_len_;
|
||||
}
|
||||
|
||||
void LogMetaEntryHeader::reset()
|
||||
|
@ -238,6 +238,11 @@ int ObRemoteLogWriter::submit_entries_(ObFetchLogTask &task)
|
||||
} else if (OB_UNLIKELY(! entry.check_integrity())) {
|
||||
ret = OB_INVALID_DATA;
|
||||
LOG_WARN("entry is invalid", K(entry), K(lsn), K(task));
|
||||
} else if (! entry.check_compatibility()) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
LOG_ERROR("data version is not new enough to recover clog", KR(ret));
|
||||
}
|
||||
} else if (task.cur_lsn_ > lsn) {
|
||||
LOG_INFO("repeated log, just skip", K(lsn), K(entry), K(task));
|
||||
} else if (FALSE_IT(entry_size = entry.get_serialize_size())) {
|
||||
|
@ -109,6 +109,11 @@ int ObRestoreLogFunction::handle_group_entry(
|
||||
|| NULL == buffer)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
CLOG_LOG(WARN, "invalid argument", K(id), K(proposal_id), K(group_start_lsn), K(group_entry), K(buffer));
|
||||
} else if (!group_entry.check_compatibility()) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
LOG_ERROR("data version is not new enough to recover clog", KR(ret));
|
||||
}
|
||||
} else if (OB_FAIL(process_(id, proposal_id, group_start_lsn, group_entry.get_scn(),
|
||||
buffer, group_entry.get_serialize_size(), stop_flag))) {
|
||||
CLOG_LOG(WARN, "process failed", K(id), K(group_start_lsn), K(group_entry), K(buffer));
|
||||
|
@ -15,18 +15,26 @@
|
||||
#include "lib/net/ob_addr.h" // ObAddr
|
||||
#include "logservice/palf/log_define.h"
|
||||
#include "lib/checksum/ob_crc64.h" // ob_crc64
|
||||
#include "lib/checksum/ob_parity_check.h" // ob_crc64
|
||||
#define private public
|
||||
#include "logservice/palf/log_group_entry_header.h"
|
||||
#include "logservice/palf/log_entry.h"
|
||||
#include "logservice/palf/log_entry_header.h"
|
||||
#include "share/scn.h"
|
||||
#include "logservice/ob_log_base_header.h" // ObLogBaseHeader
|
||||
#include "logservice/palf/log_group_buffer.h"
|
||||
#include "logservice/palf/log_group_entry.h"
|
||||
#include "logservice/palf/log_writer_utils.h"
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "share/ob_cluster_version.h"
|
||||
#undef private
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <vector>
|
||||
#include <utility>
|
||||
#include <algorithm>
|
||||
#include <random>
|
||||
#include <map>
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -201,7 +209,8 @@ TEST(TestLogGroupEntryHeader, test_log_group_entry_header)
|
||||
EXPECT_EQ(OB_SUCCESS, header1.update_committed_end_lsn(new_lsn));
|
||||
EXPECT_EQ(new_proposal_id, header1.get_log_proposal_id());
|
||||
EXPECT_EQ(new_lsn, header1.get_committed_end_lsn());
|
||||
// header1.update_header_checksum();
|
||||
header1.update_header_checksum();
|
||||
EXPECT_TRUE(header1.check_header_integrity());
|
||||
EXPECT_TRUE(
|
||||
header1.check_integrity(buf + log_group_entry_header_size, data_len + log_entry_header_size));
|
||||
|
||||
@ -213,8 +222,6 @@ TEST(TestLogGroupEntryHeader, test_log_group_entry_header)
|
||||
EXPECT_EQ(log_group_entry1.get_header(), log_group_entry.get_header());
|
||||
EXPECT_EQ(log_group_entry1.get_header_size(), log_group_entry.get_header_size());
|
||||
EXPECT_EQ(data_len + log_entry_header_size, log_group_entry.get_data_len());
|
||||
EXPECT_TRUE(
|
||||
header1.check_integrity(buf + log_group_entry_header_size, data_len + log_entry_header_size));
|
||||
EXPECT_EQ(max_scn, log_group_entry.get_scn());
|
||||
EXPECT_EQ(committed_lsn, log_group_entry.get_committed_end_lsn());
|
||||
pos = 0;
|
||||
@ -408,14 +415,330 @@ TEST(TestPaddingLogEntry, test_generate_padding_log_entry)
|
||||
out_buf = nullptr;
|
||||
}
|
||||
|
||||
int16_t get_log_entry_header_parity_checksum(const LogEntryHeader &header)
|
||||
{
|
||||
bool bool_ret = parity_check(reinterpret_cast<const uint16_t &>(header.magic_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint16_t &>(header.version_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint32_t &>(header.log_size_));
|
||||
bool_ret ^= parity_check((header.scn_.get_val_for_logservice()));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(header.data_checksum_));
|
||||
int64_t tmp_flag = (header.flag_ & ~(0x0001));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(tmp_flag));
|
||||
PALF_LOG(INFO, "get_log_entry_header_parity_checksum", K(header), K(tmp_flag));
|
||||
return bool_ret ? 1 : 0;
|
||||
}
|
||||
|
||||
int16_t get_log_group_entry_header_parity_checksum(const LogGroupEntryHeader &header)
|
||||
{
|
||||
bool bool_ret = parity_check(reinterpret_cast<const uint16_t &>(header.magic_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint16_t &>(header.version_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint32_t &>(header.group_size_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(header.proposal_id_));
|
||||
bool_ret ^= parity_check(header.committed_end_lsn_.val_);
|
||||
bool_ret ^= parity_check(header.max_scn_.get_val_for_logservice());
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(header.accumulated_checksum_));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(header.log_id_));
|
||||
int64_t tmp_flag = (header.flag_ & ~(0x1));
|
||||
bool_ret ^= parity_check(reinterpret_cast<const uint64_t &>(tmp_flag));
|
||||
return bool_ret ? 1 : 00;
|
||||
}
|
||||
|
||||
TEST(TestUpgraedCompatibility, test_log_entry_header)
|
||||
{
|
||||
PALF_LOG(INFO, "TestUpgraedCompatibility");
|
||||
{
|
||||
PALF_LOG(INFO, "case1. test_log_entry");
|
||||
{
|
||||
// 新版本解析旧版本数据
|
||||
LogEntryHeader ori_header;
|
||||
constexpr int64_t data_len = 100;
|
||||
char log_data[data_len] = "helloworld";
|
||||
EXPECT_EQ(OB_SUCCESS, ori_header.generate_header(log_data, data_len, share::SCN::min_scn()));
|
||||
EXPECT_EQ(true, ori_header.check_header_integrity());
|
||||
EXPECT_EQ(true, ori_header.check_integrity(log_data, data_len));
|
||||
ori_header.version_ = LogEntryHeader::LOG_ENTRY_HEADER_VERSION;
|
||||
ori_header.flag_ &= ~LogEntryHeader::CRC16_MASK;
|
||||
ori_header.flag_ |= get_log_entry_header_parity_checksum(ori_header);
|
||||
constexpr int64_t serialize_buf_len = data_len + sizeof(LogEntryHeader);
|
||||
char serialize_buf[serialize_buf_len];
|
||||
int64_t pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, ori_header.serialize(serialize_buf, serialize_buf_len, pos));
|
||||
pos = 0;
|
||||
LogEntryHeader new_header;
|
||||
EXPECT_EQ(OB_SUCCESS, new_header.deserialize(serialize_buf, serialize_buf_len, pos));
|
||||
EXPECT_EQ(true, new_header.check_header_integrity());
|
||||
EXPECT_EQ(true, new_header.check_integrity(log_data, data_len));
|
||||
EXPECT_EQ(new_header.flag_, ori_header.flag_);
|
||||
EXPECT_EQ(new_header.data_checksum_, ori_header.data_checksum_);
|
||||
EXPECT_EQ(new_header.version_, ori_header.version_);
|
||||
|
||||
// 验证padding日志
|
||||
ori_header.version_ = LogEntryHeader::LOG_ENTRY_HEADER_VERSION2;
|
||||
ori_header.flag_ &= ~LogEntryHeader::CRC16_MASK;
|
||||
ori_header.flag_ |= LogEntryHeader::PADDING_TYPE_MASK_VERSION2;
|
||||
ori_header.update_header_checksum_();
|
||||
ori_header.update_header_checksum_();
|
||||
EXPECT_EQ(true, ori_header.is_padding_log_());
|
||||
EXPECT_EQ(true, ori_header.check_header_integrity());
|
||||
pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, ori_header.serialize(serialize_buf, serialize_buf_len, pos));
|
||||
pos = 0;
|
||||
new_header.reset();
|
||||
EXPECT_EQ(OB_SUCCESS, new_header.deserialize(serialize_buf, serialize_buf_len, pos));
|
||||
EXPECT_EQ(true, new_header.check_header_integrity());
|
||||
EXPECT_EQ(true, new_header.is_padding_log_());
|
||||
|
||||
ori_header.version_ = LogEntryHeader::LOG_ENTRY_HEADER_VERSION;
|
||||
ori_header.flag_ &= ~LogEntryHeader::CRC16_MASK;
|
||||
ori_header.flag_ |= LogEntryHeader::PADDING_TYPE_MASK;
|
||||
ori_header.flag_ |= get_log_entry_header_parity_checksum(ori_header);
|
||||
|
||||
pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, ori_header.serialize(serialize_buf, serialize_buf_len, pos));
|
||||
new_header.reset();
|
||||
pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, new_header.deserialize(serialize_buf, serialize_buf_len, pos));
|
||||
EXPECT_EQ(true, new_header.check_header_integrity());
|
||||
EXPECT_EQ(true, new_header.is_padding_log_());
|
||||
PALF_LOG(INFO, "print new_header", K(new_header), "is_padding", new_header.is_padding_log_());
|
||||
EXPECT_EQ(new_header.flag_, ori_header.flag_);
|
||||
EXPECT_EQ(new_header.data_checksum_, ori_header.data_checksum_);
|
||||
EXPECT_EQ(new_header.version_, ori_header.version_);
|
||||
}
|
||||
PALF_LOG(INFO, "case2. test_log_group_entry");
|
||||
{
|
||||
// 构造三条LogEntry,其version分别为2 1 2
|
||||
// | GroupHeader version x | EntryHeader version 2 | EntryHeader version1 | EntryHeader version2 |
|
||||
constexpr int64_t log_group_buf_len = 4096;
|
||||
char log_group_buf[log_group_buf_len] = {0};
|
||||
memset(log_group_buf, log_group_buf_len, 'c');
|
||||
LogEntryHeader log_entry_header1;
|
||||
constexpr int64_t data_len = 100;
|
||||
EXPECT_EQ(OB_SUCCESS, log_entry_header1.generate_header(log_group_buf + sizeof(LogGroupEntryHeader) + sizeof(LogEntryHeader), data_len, share::SCN::min_scn()));
|
||||
EXPECT_EQ(true, log_entry_header1.check_header_integrity());
|
||||
int64_t pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, log_entry_header1.serialize(log_group_buf + sizeof(LogGroupEntryHeader), log_group_buf_len - sizeof(LogGroupEntryHeader), pos));
|
||||
|
||||
PALF_LOG(INFO, "print log_entry_header1", K(log_entry_header1));
|
||||
LogEntryHeader log_entry_header2;
|
||||
pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, log_entry_header2.deserialize(log_group_buf + sizeof(LogGroupEntryHeader), log_group_buf_len - sizeof(LogGroupEntryHeader), pos));
|
||||
EXPECT_EQ(true, log_entry_header2.check_header_integrity());
|
||||
EXPECT_EQ(true, log_entry_header2.check_integrity(log_group_buf + sizeof(LogGroupEntryHeader) + sizeof(LogEntryHeader), data_len));
|
||||
EXPECT_EQ(log_entry_header2.flag_, log_entry_header1.flag_);
|
||||
EXPECT_EQ(log_entry_header2.data_checksum_, log_entry_header1.data_checksum_);
|
||||
EXPECT_EQ(log_entry_header2.version_, log_entry_header1.version_);
|
||||
log_entry_header2.version_ = LogEntryHeader::LOG_ENTRY_HEADER_VERSION;
|
||||
log_entry_header2.flag_ = 0;
|
||||
log_entry_header2.flag_ |= get_log_entry_header_parity_checksum(log_entry_header2);
|
||||
PALF_LOG(INFO, "print log_entry_header2", K(log_entry_header2));
|
||||
EXPECT_EQ(true, log_entry_header2.check_header_integrity());
|
||||
EXPECT_EQ(true, log_entry_header2.check_integrity(log_group_buf + sizeof(LogGroupEntryHeader) + sizeof(LogEntryHeader), data_len));
|
||||
pos = sizeof(LogGroupEntryHeader) + sizeof(LogEntryHeader) + data_len;
|
||||
EXPECT_EQ(OB_SUCCESS, log_entry_header2.serialize(log_group_buf, log_group_buf_len, pos));
|
||||
|
||||
LogEntryHeader log_entry_header3;
|
||||
pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, log_entry_header3.deserialize(log_group_buf + sizeof(LogGroupEntryHeader), log_group_buf_len - sizeof(LogGroupEntryHeader), pos));
|
||||
PALF_LOG(INFO, "print log_entry_header3", K(log_entry_header3));
|
||||
EXPECT_EQ(true, log_entry_header3.check_header_integrity());
|
||||
EXPECT_EQ(true, log_entry_header3.check_integrity(log_group_buf + sizeof(LogGroupEntryHeader) + sizeof(LogEntryHeader), data_len));
|
||||
EXPECT_EQ(log_entry_header3.flag_, log_entry_header1.flag_);
|
||||
EXPECT_EQ(log_entry_header3.data_checksum_, log_entry_header1.data_checksum_);
|
||||
EXPECT_EQ(log_entry_header3.version_, log_entry_header1.version_);
|
||||
EXPECT_EQ(true, log_entry_header3.check_header_integrity());
|
||||
EXPECT_EQ(true, log_entry_header3.check_integrity(log_group_buf + sizeof(LogGroupEntryHeader) + sizeof(LogEntryHeader), data_len));
|
||||
EXPECT_EQ(true, log_entry_header3.check_header_integrity());
|
||||
pos = sizeof(LogGroupEntryHeader) + sizeof(LogEntryHeader) + data_len + sizeof(LogEntryHeader) + data_len;
|
||||
EXPECT_EQ(OB_SUCCESS, log_entry_header3.serialize(log_group_buf, log_group_buf_len, pos));
|
||||
|
||||
pos = 0;
|
||||
LogGroupEntryHeader group_header;
|
||||
constexpr int64_t serialize_buf_len = 3*(data_len + sizeof(LogEntryHeader));
|
||||
LogWriteBuf write_buf;
|
||||
write_buf.push_back(log_group_buf, serialize_buf_len + sizeof(LogGroupEntryHeader));
|
||||
int64_t log_checksum = 0;
|
||||
EXPECT_EQ(OB_SUCCESS,
|
||||
group_header.generate(false, false, write_buf, serialize_buf_len,
|
||||
share::SCN::min_scn(), 1, LSN(0), 1, log_checksum));
|
||||
group_header.update_accumulated_checksum(1024);
|
||||
group_header.update_header_checksum();
|
||||
group_header.update_header_checksum();
|
||||
EXPECT_EQ(true, group_header.check_header_integrity());
|
||||
EXPECT_EQ(true, group_header.check_integrity(log_group_buf+sizeof(LogGroupEntryHeader), serialize_buf_len));
|
||||
EXPECT_EQ(false, group_header.is_raw_write());
|
||||
EXPECT_EQ(false, group_header.is_padding_log());
|
||||
char log_group_serialize_buf[log_group_buf_len] = {0};
|
||||
pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, group_header.serialize(log_group_serialize_buf, log_group_buf_len, pos));
|
||||
LogGroupEntryHeader new_group_header;
|
||||
pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, new_group_header.deserialize(log_group_serialize_buf, log_group_buf_len, pos));
|
||||
EXPECT_EQ(true, new_group_header.check_header_integrity());
|
||||
EXPECT_EQ(true, new_group_header.check_integrity(log_group_buf+sizeof(LogGroupEntryHeader), serialize_buf_len));
|
||||
EXPECT_EQ(false, new_group_header.is_raw_write());
|
||||
EXPECT_EQ(false, new_group_header.is_padding_log());
|
||||
|
||||
// pair <is_raw_write, is_padding_log>
|
||||
std::vector<std::pair<bool, bool>> input{{true, false}, {false, true}, {true, true}};
|
||||
|
||||
for (auto elem : input) {
|
||||
LogGroupEntryHeader tmp_header = group_header;
|
||||
tmp_header.reset();
|
||||
EXPECT_EQ(OB_SUCCESS,
|
||||
tmp_header.generate(elem.first, elem.second, write_buf, serialize_buf_len,
|
||||
share::SCN::min_scn(), 1, LSN(0), 1, log_checksum));
|
||||
tmp_header.update_accumulated_checksum(1023);
|
||||
tmp_header.update_header_checksum();
|
||||
tmp_header.update_header_checksum();
|
||||
EXPECT_EQ(true, tmp_header.check_header_integrity());
|
||||
EXPECT_EQ(true, tmp_header.check_integrity(log_group_buf+sizeof(LogGroupEntryHeader), serialize_buf_len));
|
||||
EXPECT_EQ(elem.first, tmp_header.is_raw_write());
|
||||
EXPECT_EQ(LogGroupEntryHeader::PADDING_TYPE_MASK_VERSION2, tmp_header.get_padding_mask_());
|
||||
PALF_LOG(INFO, "runlin print tmp_header", K(tmp_header));
|
||||
EXPECT_EQ(LogGroupEntryHeader::RAW_WRITE_MASK_VERSION2, tmp_header.get_raw_write_mask_());
|
||||
|
||||
// 构造旧版本的LogGroupEntryHeader中
|
||||
tmp_header.version_ = LogGroupEntryHeader::LOG_GROUP_ENTRY_HEADER_VERSION;
|
||||
tmp_header.flag_ &= ~LogGroupEntryHeader::CRC16_MASK;
|
||||
tmp_header.update_write_mode(elem.first);
|
||||
tmp_header.flag_ |= (elem.second ? LogGroupEntryHeader::PADDING_TYPE_MASK : 0);
|
||||
tmp_header.flag_ |= get_log_group_entry_header_parity_checksum(tmp_header);
|
||||
EXPECT_EQ(elem.second, tmp_header.is_padding_log());
|
||||
EXPECT_EQ(elem.first, tmp_header.is_raw_write());
|
||||
EXPECT_EQ(true, tmp_header.check_header_integrity());
|
||||
EXPECT_EQ(true, tmp_header.check_integrity(log_group_buf+sizeof(LogGroupEntryHeader), serialize_buf_len));
|
||||
pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, tmp_header.serialize(log_group_serialize_buf, log_group_buf_len, pos));
|
||||
EXPECT_EQ(LogGroupEntryHeader::PADDING_TYPE_MASK, tmp_header.get_padding_mask_());
|
||||
EXPECT_EQ(LogGroupEntryHeader::RAW_WRITE_MASK, tmp_header.get_raw_write_mask_());
|
||||
|
||||
PALF_LOG(INFO, "test new binary parse old binary", K(tmp_header), K(elem.first), K(elem.second));
|
||||
// 新版本解析旧版本数据
|
||||
LogGroupEntryHeader new_group_header;
|
||||
pos = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, new_group_header.deserialize(log_group_serialize_buf, log_group_buf_len, pos));
|
||||
EXPECT_EQ(true, new_group_header.check_header_integrity());
|
||||
EXPECT_EQ(true, new_group_header.check_integrity(log_group_buf+sizeof(LogGroupEntryHeader), serialize_buf_len));
|
||||
EXPECT_EQ(elem.first, new_group_header.is_raw_write());
|
||||
EXPECT_EQ(elem.second, new_group_header.is_padding_log());
|
||||
// 解析得到旧版本数据
|
||||
EXPECT_EQ(LogGroupEntryHeader::PADDING_TYPE_MASK, new_group_header.get_padding_mask_());
|
||||
EXPECT_EQ(LogGroupEntryHeader::RAW_WRITE_MASK, new_group_header.get_raw_write_mask_());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void bit_flip(uint8_t *ptr, int len, int bit_count)
|
||||
{
|
||||
// 保证magic和version不翻转
|
||||
const int arr_count = len * 8 - 32;
|
||||
std::vector<int> numbers(0, arr_count);
|
||||
numbers.resize(arr_count);
|
||||
for (int i = 0; i < arr_count; i++) {
|
||||
numbers[i] = i + 32;
|
||||
}
|
||||
std::random_device rd;
|
||||
auto rng = std::default_random_engine { rd() };
|
||||
std::shuffle(numbers.begin(), numbers.end(), rng); // 打乱顺序
|
||||
|
||||
for (int i = 0; i < bit_count; ++i) {
|
||||
int pos = numbers[i];
|
||||
uint8_t mask = (1 << (pos%8));
|
||||
*(ptr+pos/8) ^= mask;
|
||||
PALF_LOG(INFO, "runlin trace bit flip", K(pos));
|
||||
}
|
||||
}
|
||||
|
||||
TEST(TestBitFlip, test_log_entry_header)
|
||||
{
|
||||
std::srand(ObTimeUtility::current_time());
|
||||
PALF_LOG(INFO, "test_bit_flip_log_entry_header");
|
||||
LogEntryHeader log_entry_header;
|
||||
const int header_len = sizeof(LogEntryHeader);
|
||||
constexpr int data_len = 1024;
|
||||
char data[data_len]; memset(data, 'c', data_len);
|
||||
EXPECT_EQ(OB_SUCCESS, log_entry_header.generate_header(data, data_len, share::SCN::base_scn()));
|
||||
PALF_LOG(INFO, "origin header", K(log_entry_header));
|
||||
const int count = 1 << 10;
|
||||
struct Pair {
|
||||
LogEntryHeader header;
|
||||
int bit_count;
|
||||
};
|
||||
int ret = OB_SUCCESS;
|
||||
std::vector<Pair> array;
|
||||
std::map<int, int> count_array;
|
||||
for (int i = 1; i <= 1; i++) {
|
||||
count_array.insert(std::pair<int, int>(i, 0));
|
||||
for (int j = 0; j < count; j++) {
|
||||
LogEntryHeader tmp_header = log_entry_header;
|
||||
uint8_t *ptr = reinterpret_cast<uint8_t*>(&tmp_header);
|
||||
bit_flip(ptr, header_len, i);
|
||||
bool bool_ret = tmp_header.check_header_integrity();
|
||||
EXPECT_EQ(false, bool_ret);
|
||||
if (bool_ret) {
|
||||
count_array[i] ++;
|
||||
array.push_back(Pair{tmp_header, i});
|
||||
PALF_LOG(ERROR, "print info", K(log_entry_header), K(tmp_header), K(j), K(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
OB_LOGGER.set_file_name("print_info.log", true);
|
||||
OB_LOGGER.set_log_level("INFO");
|
||||
for (auto &p : count_array) {
|
||||
PALF_LOG(INFO, "runlin trace print", "bit_flip", p.first, "count", p.second);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(TestBitFlip, test_log_group_entry_header)
|
||||
{
|
||||
std::srand(ObTimeUtility::current_time());
|
||||
PALF_LOG(INFO, "test_bit_flip_log_group_entry_header");
|
||||
LogGroupEntryHeader header;
|
||||
LogWriteBuf write_buf;
|
||||
constexpr int data_len = 1024;
|
||||
char data[data_len]; memset(data, 'c', data_len);
|
||||
write_buf.push_back(data, data_len);
|
||||
share::SCN max_scn = share::SCN::min_scn();
|
||||
int64_t log_id = 1;
|
||||
LSN committed_lsn;
|
||||
committed_lsn.val_ = 1;
|
||||
int64_t proposal_id = 1;
|
||||
int64_t log_checksum = 0;
|
||||
EXPECT_EQ(OB_SUCCESS,
|
||||
header.generate(false, true, write_buf, data_len,
|
||||
max_scn, log_id, committed_lsn, proposal_id, log_checksum));
|
||||
const int header_len = sizeof(LogGroupEntryHeader);
|
||||
header.update_header_checksum();
|
||||
PALF_LOG(INFO, "origin header", K(header));
|
||||
const int count = 1 << 10;
|
||||
for (int i = 0; i < count; i++) {
|
||||
LogGroupEntryHeader tmp_header = header;
|
||||
uint8_t *ptr = reinterpret_cast<uint8_t*>(&tmp_header);
|
||||
const int bit_count = 1;
|
||||
bit_flip(ptr, header_len, bit_count);
|
||||
PALF_LOG(INFO, "current header", K(header), K(tmp_header), K(i), K(bit_count));
|
||||
bool bool_ret = tmp_header.check_header_integrity();
|
||||
EXPECT_EQ(false, bool_ret);
|
||||
if (bool_ret) {
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace unittest
|
||||
} // namespace oceanbase
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
system("rm -f test_log_entry_and_group_entry.log");
|
||||
system("rm -f print_info*");
|
||||
OB_LOGGER.set_file_name("test_log_entry_and_group_entry.log", true);
|
||||
OB_LOGGER.set_log_level("INFO");
|
||||
oceanbase::common::ObClusterVersion::get_instance().cluster_version_ = CLUSTER_CURRENT_VERSION;
|
||||
oceanbase::common::ObClusterVersion::get_instance().update_data_version(DATA_CURRENT_VERSION);
|
||||
PALF_LOG(INFO, "begin unittest::test_log_entry_and_group_entry");
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
|
@ -15,9 +15,13 @@
|
||||
#include "logservice/palf/log_define.h"
|
||||
#define private public
|
||||
#include "logservice/palf/log_meta.h"
|
||||
#include "logservice/palf/log_meta_entry_header.h"
|
||||
#undef private
|
||||
#include <gtest/gtest.h>
|
||||
#include <vector>
|
||||
#include <thread>
|
||||
|
||||
char **global_argv = nullptr;
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace unittest
|
||||
@ -25,11 +29,7 @@ namespace unittest
|
||||
using namespace common;
|
||||
using namespace palf;
|
||||
|
||||
TEST(TestLogMeta, test_log_meta)
|
||||
{
|
||||
int64_t proposal_id = INVALID_PROPOSAL_ID;
|
||||
proposal_id = 1;
|
||||
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 4096);
|
||||
TEST(TestLogMeta, test_log_meta) { int64_t proposal_id = INVALID_PROPOSAL_ID; proposal_id = 1; ObAddr addr(ObAddr::IPV4, "127.0.0.1", 4096);
|
||||
|
||||
// Prepare meta
|
||||
LogPrepareMeta log_prepare_meta1;
|
||||
|
Reference in New Issue
Block a user