add checksum for chunk datum store dump && reset onetime expr after calc

This commit is contained in:
18523270951@163.com
2023-08-03 07:54:59 +00:00
committed by ob-robot
parent 3c996357a0
commit 65218721e1
4 changed files with 66 additions and 16 deletions

View File

@ -747,7 +747,7 @@ inline int ObChunkDatumStore::dump_one_block(BlockBuffer *item)
if (item->cur_pos_ <= 0) {
LOG_WARN("unexpected: dump zero", K(item), K(item->cur_pos_));
}
item->block->magic_ = Block::MAGIC;
item->block_->magic_ = Block::MAGIC;
if (OB_FAIL(item->get_block()->unswizzling())) {
LOG_WARN("convert block to copyable failed", K(ret));
} else if (item->capacity() < min_block_size) {
@ -764,17 +764,19 @@ inline int ObChunkDatumStore::dump_one_block(BlockBuffer *item)
item->get_block()->blk_size_);
tmp_dump_blk_->rows_ = item->get_block()->rows_;
tmp_dump_blk_->get_buffer()->fast_advance(item->data_size() - BlockBuffer::HEAD_SIZE);
tmp_dump_blk_->checksum_ = tmp_dump_blk_->get_checksum();
if (OB_FAIL(write_file(tmp_dump_blk_->get_buffer()->data(),
tmp_dump_blk_->get_buffer()->capacity()))) {
LOG_WARN("write block to file failed");
}
}
} else if (FALSE_IT(item->block_->checksum_ = item->block_->get_checksum())) {
} else if (OB_FAIL(write_file(item->data(), item->capacity()))) {
LOG_WARN("write block to file failed");
}
if (OB_SUCC(ret)) {
n_block_in_file_++;
LOG_DEBUG("RowStore Dumpped block", K_(item->block->rows),
LOG_DEBUG("RowStore Dumpped block", K_(item->block_->rows),
K_(item->cur_pos), K(item->capacity()));
}
if (OB_LIKELY(nullptr != io_event_observer_)) {
@ -2430,6 +2432,10 @@ int ObChunkDatumStore::Iterator::load_next_block(RowIterator& it)
set_read_file_iter_end();
} else {
if (OB_FAIL(prefetch_next_blk())) {
if (OB_ITER_END == ret) {
// check position already, should not return OB_ITER_END
ret = OB_ERR_UNEXPECTED;
}
LOG_WARN("prefetch next blk failed", K(ret));
}
}
@ -2452,6 +2458,7 @@ int ObChunkDatumStore::Iterator::load_next_block(RowIterator& it)
cur_chunk_n_blocks_ = store_->blocks_.get_size();
cur_nth_blk_ += cur_chunk_n_blocks_;
chunk_n_rows_ = store_->get_row_cnt_in_memory();
}
if (cur_nth_blk_ != store_->n_blocks_ - 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected status",
@ -2459,7 +2466,6 @@ int ObChunkDatumStore::Iterator::load_next_block(RowIterator& it)
}
}
}
}
if (OB_SUCC(ret)) {
it.store_ = store_;
it.cur_iter_blk_ = cur_iter_blk_;
@ -2485,10 +2491,10 @@ int ObChunkDatumStore::Iterator::read_next_blk()
LOG_WARN("aio wait failed", K(ret));
}
}
if (OB_SUCC(ret) && !aio_blk_->magic_check()) {
if (OB_SUCC(ret) && (!aio_blk_->magic_check() || !aio_blk_->checksum_check())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("read corrupt data", K(ret), K(aio_blk_->magic_),
K(store_->file_size_), K(cur_iter_pos_));
K(store_->file_size_), K(cur_iter_pos_), K(aio_blk_->checksum_));
}
if (OB_SUCC(ret)) {
// data block is larger than min block

View File

@ -388,9 +388,10 @@ public:
class BlockBuffer;
struct Block
{
static const int64_t MAGIC = 0xbc054e02d8536315;
static const int32_t MAGIC = 0x719a288e;
static const int32_t DEFAULT_CHECKSUM = 0x3eebc74e;
static const int32_t ROW_HEAD_SIZE = sizeof(StoredRow);
Block() : magic_(0), blk_size_(0), rows_(0){}
Block() : next_(0), blk_size_(0), rows_(0){}
static int inline min_buf_size(const common::ObIArray<ObExpr*> &exprs,
int64_t row_extend_size,
@ -459,6 +460,21 @@ public:
int unswizzling();
int swizzling(int64_t *col_cnt);
inline bool magic_check() { return MAGIC == magic_; }
inline bool checksum_check()
{
return get_checksum() == static_cast<int32_t> (checksum_);
}
int32_t get_checksum()
{
int32_t res = DEFAULT_CHECKSUM;
#ifdef ENABLE_DEBUG_LOG
res = static_cast<int32_t> (ob_crc64(get_buffer()->data() + BlockBuffer::HEAD_SIZE,
get_buffer()->capacity() - BlockBuffer::HEAD_SIZE));
#endif
return res;
}
int get_store_row(int64_t &cur_pos, const StoredRow *&sr);
inline Block* get_next() const { return next_; }
inline bool is_empty() { return get_buffer()->is_empty(); }
@ -474,7 +490,10 @@ public:
friend class BlockBuffer;
TO_STRING_KV(K_(magic), K_(blk_size), K_(rows));
union{
int64_t magic_; //for dump
struct {
int64_t magic_ : 32;
int64_t checksum_ : 32;
};
Block* next_; //for block list in mem
};
uint32 blk_size_; /* current blk's size, for dump/read */
@ -564,8 +583,8 @@ public:
int init(char *buf, const int64_t buf_size);
inline int64_t remain() const { return cap_ - cur_pos_; }
inline char *data() { return data_; }
inline Block *get_block() { return block; }
inline void set_block(Block *b) { block = b; }
inline Block *get_block() { return block_; }
inline void set_block(Block *b) { block_ = b; }
inline char *head() const { return data_ + cur_pos_; }
inline int64_t capacity() const { return cap_; }
inline void set_capacity(int64_t cap) { cap_ = cap; }
@ -576,7 +595,7 @@ public:
inline bool is_empty() const { return HEAD_SIZE >= cur_pos_; }
inline void reset() { cur_pos_ = 0; cap_ = 0; data_ = NULL; }
inline void reuse() { cur_pos_ = 0; fast_advance(HEAD_SIZE); block->rows_ = 0; }
inline void reuse() { cur_pos_ = 0; fast_advance(HEAD_SIZE); block_->rows_ = 0; }
inline int advance(int64_t size);
inline void fast_advance(int64_t size) { cur_pos_ += size; }
TO_STRING_KV(KP_(data), K_(cur_pos), K_(cap));
@ -586,7 +605,7 @@ public:
private:
union {
char *data_;
Block *block;
Block *block_;
};
int64_t cur_pos_;
int64_t cap_;

View File

@ -77,6 +77,11 @@ int ObSubQueryIterator::get_next_row()
return ret;
}
void ObSubQueryIterator::drain_exch()
{
op_.drain_exch();
}
int ObSubQueryIterator::rewind(const bool reset_onetime_plan /* = false */)
{
//根据subplan filter的语义,reset row iterator,其它的成员保持不变
@ -1265,6 +1270,25 @@ int ObSubPlanFilterOp::prepare_onetime_exprs()
eval_ctx_.set_batch_idx(0);
ret = prepare_onetime_exprs_inner();
}
/*
SPF
TSC
PX COORD
EXCHANGE (thread +1)
if PX COORD is onetime expr, SPF use extra thread calc it and release it until ITER_END,
A large number of threads are wasted when multiple onetime expr coexist
So we drain every onetime expr after calc
*/
for (int64_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
Iterator *iter = subplan_iters_.at(i - 1);
if (OB_ISNULL(iter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("subplan_iter is null", K(ret));
} else if (MY_SPEC.one_time_idxs_.has_member(i)) {
iter->drain_exch();
}
}
return ret;
}

View File

@ -105,6 +105,7 @@ public:
DatumRow probe_row_;
//hard core, 1M limit for each hashmap
const static int HASH_MAP_MEMORY_LIMIT = 1024 * 1024;
void drain_exch();
private: