[fixbug](vec-load) fix core of segment_writer while it is not thread-safe (#9569)
introduce in stream-load-vec #9280, it will cause multi-thread operate to same segment_write cause BetaRowset enable multi-thread of memtable flush, memtable flush call rowset_writer.add_block, it use member variable _segment_writer to write, so it will cause multi-thread in segment write. Co-authored-by: yixiutt <yixiu@selectdb.com>
This commit is contained in:
@ -302,9 +302,11 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
|
||||
}
|
||||
} else {
|
||||
vectorized::Block block = _collect_vskiplist_results();
|
||||
RETURN_NOT_OK(_rowset_writer->add_block(&block));
|
||||
// beta rowset flush parallel, segment write add block is not
|
||||
// thread safe, so use tmp variable segment_write instead of
|
||||
// member variable
|
||||
RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block));
|
||||
_flush_size = block.allocated_bytes();
|
||||
RETURN_NOT_OK(_rowset_writer->flush());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -104,23 +104,28 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
|
||||
if (UNLIKELY(_segment_writer == nullptr)) {
|
||||
RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
|
||||
}
|
||||
return _add_block(block, &_segment_writer);
|
||||
}
|
||||
|
||||
Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
|
||||
std::unique_ptr<segment_v2::SegmentWriter>* segment_writer) {
|
||||
size_t block_size_in_bytes = block->bytes();
|
||||
size_t block_row_num = block->rows();
|
||||
size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
|
||||
size_t row_offset = 0;
|
||||
|
||||
do {
|
||||
auto max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes);
|
||||
auto max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
|
||||
if (UNLIKELY(max_row_add < 1)) {
|
||||
// no space for another signle row, need flush now
|
||||
RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
|
||||
RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
|
||||
max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes);
|
||||
RETURN_NOT_OK(_flush_segment_writer(segment_writer));
|
||||
RETURN_NOT_OK(_create_segment_writer(segment_writer));
|
||||
max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
|
||||
DCHECK(max_row_add > 0);
|
||||
}
|
||||
|
||||
size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
|
||||
auto s = _segment_writer->append_block(block, row_offset, input_row_num);
|
||||
auto s = (*segment_writer)->append_block(block, row_offset, input_row_num);
|
||||
if (UNLIKELY(!s.ok())) {
|
||||
LOG(WARNING) << "failed to append block: " << s.to_string();
|
||||
return Status::OLAPInternalError(OLAP_ERR_WRITER_DATA_WRITE_ERROR);
|
||||
@ -250,6 +255,17 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) {
|
||||
if (block->rows() == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
std::unique_ptr<segment_v2::SegmentWriter> writer;
|
||||
RETURN_NOT_OK(_create_segment_writer(&writer));
|
||||
RETURN_NOT_OK(_add_block(block, &writer));
|
||||
RETURN_NOT_OK(_flush_segment_writer(&writer));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
RowsetSharedPtr BetaRowsetWriter::build() {
|
||||
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
|
||||
for (auto& wblock : _wblocks) {
|
||||
|
||||
@ -57,6 +57,7 @@ public:
|
||||
|
||||
// Return the file size flushed to disk in "flush_size"
|
||||
Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) override;
|
||||
Status flush_single_memtable(const vectorized::Block* block) override;
|
||||
|
||||
RowsetSharedPtr build() override;
|
||||
|
||||
@ -71,6 +72,8 @@ public:
|
||||
private:
|
||||
template <typename RowType>
|
||||
Status _add_row(const RowType& row);
|
||||
Status _add_block(const vectorized::Block* block,
|
||||
std::unique_ptr<segment_v2::SegmentWriter>* writer);
|
||||
|
||||
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer);
|
||||
|
||||
|
||||
@ -64,6 +64,9 @@ public:
|
||||
virtual Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
|
||||
return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED);
|
||||
}
|
||||
virtual Status flush_single_memtable(const vectorized::Block* block) {
|
||||
return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
// finish building and return pointer to the built rowset (guaranteed to be inited).
|
||||
// return nullptr when failed
|
||||
|
||||
Reference in New Issue
Block a user