[clog] fix observer restart failed with decompression log entry in last uncomplete block return -4527

This commit is contained in:
yy0
2021-08-19 17:01:48 +08:00
committed by wangzelin.wzl
parent 9d75514b8e
commit 4a1adf11e8

View File

@ -129,7 +129,6 @@ private:
int get_next_entry_(Type& entry, ObReadParam& param, bool& force_read, int64_t& persist_len); int get_next_entry_(Type& entry, ObReadParam& param, bool& force_read, int64_t& persist_len);
bool check_last_block_(const file_id_t file_id, const offset_t start_offset, const int64_t last_block_ts) const; bool check_last_block_(const file_id_t file_id, const offset_t start_offset, const int64_t last_block_ts) const;
bool is_compressed_item_(const ObCLogItemType item_type) const; bool is_compressed_item_(const ObCLogItemType item_type) const;
int check_compressed_entry_length_(const char* buf, const int64_t buf_size) const;
private: private:
// magic number length in byte. // magic number length in byte.
@ -467,21 +466,6 @@ bool ObRawEntryIterator<Type, Interface>::is_compressed_item_(const ObCLogItemTy
CLOG_ENTRY_COMPRESSED_ZSTD_138 == item_type); CLOG_ENTRY_COMPRESSED_ZSTD_138 == item_type);
} }
template <class Type, class Interface>
int ObRawEntryIterator<Type, Interface>::check_compressed_entry_length_(const char* buf, const int64_t buf_size) const
{
int ret = common::OB_SUCCESS;
ObCompressedLogEntry comp_entry;
int64_t consume_buf_len = 0;
if (OB_FAIL(comp_entry.deserialize(buf, buf_size, consume_buf_len))) {
CLOG_LOG(WARN, "failed to deserialize ObCompressedLogEntry", K(ret));
} else if (OB_UNLIKELY(consume_buf_len > buf_size)) {
ret = common::OB_DESERIALIZE_ERROR;
CLOG_LOG(WARN, "buf not enough", K(ret), K(consume_buf_len), K(buf_size));
}
return ret;
}
// return OB_EAGAIN: to prepare buffer and do get_next_entry_ again // return OB_EAGAIN: to prepare buffer and do get_next_entry_ again
template <class Type, class Interface> template <class Type, class Interface>
int ObRawEntryIterator<Type, Interface>::get_next_entry_( int ObRawEntryIterator<Type, Interface>::get_next_entry_(
@ -499,11 +483,11 @@ int ObRawEntryIterator<Type, Interface>::get_next_entry_(
if (is_compressed_item_(item_type)) { if (is_compressed_item_(item_type)) {
int64_t local_pos = 0; int64_t local_pos = 0;
int64_t uncompress_len = 0; int64_t uncompress_len = 0;
if (OB_FAIL(check_compressed_entry_length_(buf_cur_, buf_size))) { if (OB_FAIL(
CLOG_LOG(WARN, "failed to check compressed entry length", K(ret), K(param), K(cur_offset_), K(buf_size)); uncompress(buf_cur_, buf_size, compress_rbuf_.buf_, compress_rbuf_.buf_len_, uncompress_len, pos))) {
} else if (OB_FAIL(uncompress( CLOG_LOG(
buf_cur_, buf_size, compress_rbuf_.buf_, compress_rbuf_.buf_len_, uncompress_len, pos))) { WARN, "failed to uncompress, ret will be overwrite with OB_INVALID_DATA", K(ret), K(param), K(buf_size));
CLOG_LOG(WARN, "failed to uncompress", K(ret), K(param), K(buf_size)); ret = OB_INVALID_DATA;
} else if (OB_FAIL(entry.deserialize(compress_rbuf_.buf_, uncompress_len, local_pos))) { } else if (OB_FAIL(entry.deserialize(compress_rbuf_.buf_, uncompress_len, local_pos))) {
if (common::OB_DESERIALIZE_ERROR == ret) { if (common::OB_DESERIALIZE_ERROR == ret) {
ret = common::OB_INVALID_DATA; ret = common::OB_INVALID_DATA;
@ -521,10 +505,16 @@ int ObRawEntryIterator<Type, Interface>::get_next_entry_(
} else { /*do nothing*/ } else { /*do nothing*/
} }
} else { } else {
ret = entry.deserialize(buf_cur_, buf_size, pos); if (OB_FAIL(entry.deserialize(buf_cur_, buf_size, pos))) {
if (OB_SUCC(ret) && OB_UNLIKELY(pos > buf_size)) { CLOG_LOG(WARN,
ret = common::OB_DESERIALIZE_ERROR; "log entry deserialize error, maybe corrupted",
CLOG_LOG(WARN, "buf not enough", K(ret), K(entry), K(buf_size), K(pos)); K(ret),
K(item_type),
KP(buf_cur_),
KP(buf_end_),
K(pos),
K(cur_offset_),
K(file_id_));
} }
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {