diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 745b5135c8..4f1c95568a 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -292,8 +292,9 @@ RowsetSharedPtr BetaRowsetWriter::build_tmp() { Status BetaRowsetWriter::_create_segment_writer( std::unique_ptr* writer) { - auto path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, - _num_segment++); + int32_t segment_id = _num_segment.fetch_add(1); + auto path = + BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, segment_id); auto fs = _rowset_meta->fs(); if (!fs) { return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); @@ -309,7 +310,7 @@ Status BetaRowsetWriter::_create_segment_writer( DCHECK(file_writer != nullptr); segment_v2::SegmentWriterOptions writer_options; writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; - writer->reset(new segment_v2::SegmentWriter(file_writer.get(), _num_segment, + writer->reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id, _context.tablet_schema, _context.data_dir, _context.max_rows_per_segment, writer_options)); { @@ -327,7 +328,6 @@ Status BetaRowsetWriter::_create_segment_writer( } Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr* writer) { - _segment_num_rows.push_back((*writer)->num_rows_written()); if ((*writer)->num_rows_written() == 0) { return Status::OK(); } @@ -346,7 +346,13 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr l(_lock); + _segment_num_rows.resize(_num_segment); + _segments_encoded_key_bounds.resize(_num_segment); + _segment_num_rows[(*writer)->get_segment_id()] = (*writer)->num_rows_written(); + _segments_encoded_key_bounds[(*writer)->get_segment_id()] = key_bounds; + } writer->reset(); return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index afa15b4018..5980fae81e 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -69,6 +69,7 @@ public: RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; } Status get_segment_num_rows(std::vector* segment_num_rows) const override { + std::lock_guard l(_lock); *segment_num_rows = _segment_num_rows; return Status::OK(); } @@ -93,8 +94,13 @@ private: /// Because we want to flush memtables in parallel. /// In other processes, such as merger or schema change, we will use this unified writer for data writing. std::unique_ptr _segment_writer; - mutable SpinLock _lock; // lock to protect _wblocks. + + mutable SpinLock _lock; // protect following vectors. std::vector _file_writers; + // for unique key table with merge-on-write + std::vector _segments_encoded_key_bounds; + // record rows number of every segment + std::vector _segment_num_rows; // counters and statistics maintained during data write std::atomic _num_rows_written; @@ -104,11 +110,6 @@ private: bool _is_pending = false; bool _already_built = false; - - // for unique key table with merge-on-write - std::vector _segments_encoded_key_bounds; - // record rows number of every segment - std::vector _segment_num_rows; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 636adb05bf..590bfa7a91 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -84,6 +84,7 @@ public: static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id, const TabletColumn& column, TabletSchemaSPtr tablet_schema); + uint32_t get_segment_id() { return _segment_id; } Slice min_encoded_key(); Slice max_encoded_key();