revert checksum for chunk datum store

This commit is contained in:
18523270951@163.com
2023-08-03 16:42:20 +00:00
committed by ob-robot
parent ebf9698855
commit 044ba4348f
2 changed files with 16 additions and 41 deletions

View File

@ -747,7 +747,7 @@ inline int ObChunkDatumStore::dump_one_block(BlockBuffer *item)
if (item->cur_pos_ <= 0) { if (item->cur_pos_ <= 0) {
LOG_WARN("unexpected: dump zero", K(item), K(item->cur_pos_)); LOG_WARN("unexpected: dump zero", K(item), K(item->cur_pos_));
} }
item->block_->magic_ = Block::MAGIC; item->block->magic_ = Block::MAGIC;
if (OB_FAIL(item->get_block()->unswizzling())) { if (OB_FAIL(item->get_block()->unswizzling())) {
LOG_WARN("convert block to copyable failed", K(ret)); LOG_WARN("convert block to copyable failed", K(ret));
} else if (item->capacity() < min_block_size) { } else if (item->capacity() < min_block_size) {
@ -764,19 +764,17 @@ inline int ObChunkDatumStore::dump_one_block(BlockBuffer *item)
item->get_block()->blk_size_); item->get_block()->blk_size_);
tmp_dump_blk_->rows_ = item->get_block()->rows_; tmp_dump_blk_->rows_ = item->get_block()->rows_;
tmp_dump_blk_->get_buffer()->fast_advance(item->data_size() - BlockBuffer::HEAD_SIZE); tmp_dump_blk_->get_buffer()->fast_advance(item->data_size() - BlockBuffer::HEAD_SIZE);
tmp_dump_blk_->checksum_ = tmp_dump_blk_->get_checksum();
if (OB_FAIL(write_file(tmp_dump_blk_->get_buffer()->data(), if (OB_FAIL(write_file(tmp_dump_blk_->get_buffer()->data(),
tmp_dump_blk_->get_buffer()->capacity()))) { tmp_dump_blk_->get_buffer()->capacity()))) {
LOG_WARN("write block to file failed"); LOG_WARN("write block to file failed");
} }
} }
} else if (FALSE_IT(item->block_->checksum_ = item->block_->get_checksum())) {
} else if (OB_FAIL(write_file(item->data(), item->capacity()))) { } else if (OB_FAIL(write_file(item->data(), item->capacity()))) {
LOG_WARN("write block to file failed"); LOG_WARN("write block to file failed");
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
n_block_in_file_++; n_block_in_file_++;
LOG_DEBUG("RowStore Dumpped block", K_(item->block_->rows), LOG_DEBUG("RowStore Dumpped block", K_(item->block->rows),
K_(item->cur_pos), K(item->capacity())); K_(item->cur_pos), K(item->capacity()));
} }
if (OB_LIKELY(nullptr != io_event_observer_)) { if (OB_LIKELY(nullptr != io_event_observer_)) {
@ -2432,10 +2430,6 @@ int ObChunkDatumStore::Iterator::load_next_block(RowIterator& it)
set_read_file_iter_end(); set_read_file_iter_end();
} else { } else {
if (OB_FAIL(prefetch_next_blk())) { if (OB_FAIL(prefetch_next_blk())) {
if (OB_ITER_END == ret) {
// check position already, should not return OB_ITER_END
ret = OB_ERR_UNEXPECTED;
}
LOG_WARN("prefetch next blk failed", K(ret)); LOG_WARN("prefetch next blk failed", K(ret));
} }
} }
@ -2458,11 +2452,11 @@ int ObChunkDatumStore::Iterator::load_next_block(RowIterator& it)
cur_chunk_n_blocks_ = store_->blocks_.get_size(); cur_chunk_n_blocks_ = store_->blocks_.get_size();
cur_nth_blk_ += cur_chunk_n_blocks_; cur_nth_blk_ += cur_chunk_n_blocks_;
chunk_n_rows_ = store_->get_row_cnt_in_memory(); chunk_n_rows_ = store_->get_row_cnt_in_memory();
} if (cur_nth_blk_ != store_->n_blocks_ - 1) {
if (cur_nth_blk_ != store_->n_blocks_ - 1) { ret = OB_ERR_UNEXPECTED;
ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected status",
LOG_WARN("unexpected status", K(ret), K(cur_nth_blk_), K(store_->n_blocks_), K(store_->blocks_.get_size()));
K(ret), K(cur_nth_blk_), K(store_->n_blocks_), K(store_->blocks_.get_size())); }
} }
} }
} }
@ -2491,10 +2485,10 @@ int ObChunkDatumStore::Iterator::read_next_blk()
LOG_WARN("aio wait failed", K(ret)); LOG_WARN("aio wait failed", K(ret));
} }
} }
if (OB_SUCC(ret) && (!aio_blk_->magic_check() || !aio_blk_->checksum_check())) { if (OB_SUCC(ret) && !aio_blk_->magic_check()) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("read corrupt data", K(ret), K(aio_blk_->magic_), LOG_WARN("read corrupt data", K(ret), K(aio_blk_->magic_),
K(store_->file_size_), K(cur_iter_pos_), K(aio_blk_->checksum_)); K(store_->file_size_), K(cur_iter_pos_));
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
// data block is larger than min block // data block is larger than min block

View File

@ -388,10 +388,9 @@ public:
class BlockBuffer; class BlockBuffer;
struct Block struct Block
{ {
static const int32_t MAGIC = 0x719a288e; static const int64_t MAGIC = 0xbc054e02d8536315;
static const int32_t DEFAULT_CHECKSUM = 0x3eebc74e;
static const int32_t ROW_HEAD_SIZE = sizeof(StoredRow); static const int32_t ROW_HEAD_SIZE = sizeof(StoredRow);
Block() : next_(0), blk_size_(0), rows_(0){} Block() : magic_(0), blk_size_(0), rows_(0){}
static int inline min_buf_size(const common::ObIArray<ObExpr*> &exprs, static int inline min_buf_size(const common::ObIArray<ObExpr*> &exprs,
int64_t row_extend_size, int64_t row_extend_size,
@ -460,21 +459,6 @@ public:
int unswizzling(); int unswizzling();
int swizzling(int64_t *col_cnt); int swizzling(int64_t *col_cnt);
inline bool magic_check() { return MAGIC == magic_; } inline bool magic_check() { return MAGIC == magic_; }
inline bool checksum_check()
{
return get_checksum() == static_cast<int32_t> (checksum_);
}
int32_t get_checksum()
{
int32_t res = DEFAULT_CHECKSUM;
#ifdef ENABLE_DEBUG_LOG
res = static_cast<int32_t> (ob_crc64(get_buffer()->data() + BlockBuffer::HEAD_SIZE,
get_buffer()->capacity() - BlockBuffer::HEAD_SIZE));
#endif
return res;
}
int get_store_row(int64_t &cur_pos, const StoredRow *&sr); int get_store_row(int64_t &cur_pos, const StoredRow *&sr);
inline Block* get_next() const { return next_; } inline Block* get_next() const { return next_; }
inline bool is_empty() { return get_buffer()->is_empty(); } inline bool is_empty() { return get_buffer()->is_empty(); }
@ -490,10 +474,7 @@ public:
friend class BlockBuffer; friend class BlockBuffer;
TO_STRING_KV(K_(magic), K_(blk_size), K_(rows)); TO_STRING_KV(K_(magic), K_(blk_size), K_(rows));
union{ union{
struct { int64_t magic_; //for dump
int64_t magic_ : 32;
int64_t checksum_ : 32;
};
Block* next_; //for block list in mem Block* next_; //for block list in mem
}; };
uint32 blk_size_; /* current blk's size, for dump/read */ uint32 blk_size_; /* current blk's size, for dump/read */
@ -583,8 +564,8 @@ public:
int init(char *buf, const int64_t buf_size); int init(char *buf, const int64_t buf_size);
inline int64_t remain() const { return cap_ - cur_pos_; } inline int64_t remain() const { return cap_ - cur_pos_; }
inline char *data() { return data_; } inline char *data() { return data_; }
inline Block *get_block() { return block_; } inline Block *get_block() { return block; }
inline void set_block(Block *b) { block_ = b; } inline void set_block(Block *b) { block = b; }
inline char *head() const { return data_ + cur_pos_; } inline char *head() const { return data_ + cur_pos_; }
inline int64_t capacity() const { return cap_; } inline int64_t capacity() const { return cap_; }
inline void set_capacity(int64_t cap) { cap_ = cap; } inline void set_capacity(int64_t cap) { cap_ = cap; }
@ -595,7 +576,7 @@ public:
inline bool is_empty() const { return HEAD_SIZE >= cur_pos_; } inline bool is_empty() const { return HEAD_SIZE >= cur_pos_; }
inline void reset() { cur_pos_ = 0; cap_ = 0; data_ = NULL; } inline void reset() { cur_pos_ = 0; cap_ = 0; data_ = NULL; }
inline void reuse() { cur_pos_ = 0; fast_advance(HEAD_SIZE); block_->rows_ = 0; } inline void reuse() { cur_pos_ = 0; fast_advance(HEAD_SIZE); block->rows_ = 0; }
inline int advance(int64_t size); inline int advance(int64_t size);
inline void fast_advance(int64_t size) { cur_pos_ += size; } inline void fast_advance(int64_t size) { cur_pos_ += size; }
TO_STRING_KV(KP_(data), K_(cur_pos), K_(cap)); TO_STRING_KV(KP_(data), K_(cur_pos), K_(cap));
@ -605,7 +586,7 @@ public:
private: private:
union { union {
char *data_; char *data_;
Block *block_; Block *block;
}; };
int64_t cur_pos_; int64_t cur_pos_;
int64_t cap_; int64_t cap_;