[feature-wip](unique-key-merge-on-write) fix thread safe issue in BetaRowsetWriter (#12875)
This commit is contained in:
@ -292,8 +292,9 @@ RowsetSharedPtr BetaRowsetWriter::build_tmp() {
|
||||
|
||||
Status BetaRowsetWriter::_create_segment_writer(
|
||||
std::unique_ptr<segment_v2::SegmentWriter>* writer) {
|
||||
auto path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
|
||||
_num_segment++);
|
||||
int32_t segment_id = _num_segment.fetch_add(1);
|
||||
auto path =
|
||||
BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, segment_id);
|
||||
auto fs = _rowset_meta->fs();
|
||||
if (!fs) {
|
||||
return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
|
||||
@ -309,7 +310,7 @@ Status BetaRowsetWriter::_create_segment_writer(
|
||||
DCHECK(file_writer != nullptr);
|
||||
segment_v2::SegmentWriterOptions writer_options;
|
||||
writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
|
||||
writer->reset(new segment_v2::SegmentWriter(file_writer.get(), _num_segment,
|
||||
writer->reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id,
|
||||
_context.tablet_schema, _context.data_dir,
|
||||
_context.max_rows_per_segment, writer_options));
|
||||
{
|
||||
@ -327,7 +328,6 @@ Status BetaRowsetWriter::_create_segment_writer(
|
||||
}
|
||||
|
||||
Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
|
||||
_segment_num_rows.push_back((*writer)->num_rows_written());
|
||||
if ((*writer)->num_rows_written() == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
@ -346,7 +346,13 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
|
||||
DCHECK_LE(min_key.compare(max_key), 0);
|
||||
key_bounds.set_min_key(min_key.to_string());
|
||||
key_bounds.set_max_key(max_key.to_string());
|
||||
_segments_encoded_key_bounds.emplace_back(key_bounds);
|
||||
{
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
_segment_num_rows.resize(_num_segment);
|
||||
_segments_encoded_key_bounds.resize(_num_segment);
|
||||
_segment_num_rows[(*writer)->get_segment_id()] = (*writer)->num_rows_written();
|
||||
_segments_encoded_key_bounds[(*writer)->get_segment_id()] = key_bounds;
|
||||
}
|
||||
writer->reset();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -69,6 +69,7 @@ public:
|
||||
RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
|
||||
|
||||
Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) const override {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
*segment_num_rows = _segment_num_rows;
|
||||
return Status::OK();
|
||||
}
|
||||
@ -93,8 +94,13 @@ private:
|
||||
/// Because we want to flush memtables in parallel.
|
||||
/// In other processes, such as merger or schema change, we will use this unified writer for data writing.
|
||||
std::unique_ptr<segment_v2::SegmentWriter> _segment_writer;
|
||||
mutable SpinLock _lock; // lock to protect _wblocks.
|
||||
|
||||
mutable SpinLock _lock; // protect following vectors.
|
||||
std::vector<io::FileWriterPtr> _file_writers;
|
||||
// for unique key table with merge-on-write
|
||||
std::vector<KeyBoundsPB> _segments_encoded_key_bounds;
|
||||
// record rows number of every segment
|
||||
std::vector<uint32_t> _segment_num_rows;
|
||||
|
||||
// counters and statistics maintained during data write
|
||||
std::atomic<int64_t> _num_rows_written;
|
||||
@ -104,11 +110,6 @@ private:
|
||||
|
||||
bool _is_pending = false;
|
||||
bool _already_built = false;
|
||||
|
||||
// for unique key table with merge-on-write
|
||||
std::vector<KeyBoundsPB> _segments_encoded_key_bounds;
|
||||
// record rows number of every segment
|
||||
std::vector<uint32_t> _segment_num_rows;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -84,6 +84,7 @@ public:
|
||||
|
||||
static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id,
|
||||
const TabletColumn& column, TabletSchemaSPtr tablet_schema);
|
||||
uint32_t get_segment_id() { return _segment_id; }
|
||||
Slice min_encoded_key();
|
||||
Slice max_encoded_key();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user