diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index eaaaf64d4d..922aa633ef 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -807,6 +807,11 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStati update_rowset_schema(flush_schema); } if (_context.mow_context != nullptr) { + // ensure that the segment file writing is complete + auto* file_writer = _segment_creator.get_file_writer(segment_id); + if (file_writer) { + RETURN_IF_ERROR(file_writer->close()); + } RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); } return Status::OK(); diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 7eddd7877f..24b7ffed6a 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -151,7 +151,7 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, Status SegmentFlusher::close() { std::lock_guard l(_lock); - for (auto& file_writer : _file_writers) { + for (auto& [segment_id, file_writer] : _file_writers) { Status status = file_writer->close(); if (!status.ok()) { LOG(WARNING) << "failed to close file writer, path=" << file_writer->path() @@ -205,7 +205,7 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptrmax_rows_per_segment, writer_options, _context->mow_context)); { std::lock_guard l(_lock); - _file_writers.push_back(std::move(file_writer)); + _file_writers.emplace(segment_id, std::move(file_writer)); } auto s = writer->init(); if (!s.ok()) { @@ -236,7 +236,7 @@ Status SegmentFlusher::_create_segment_writer( _context->max_rows_per_segment, writer_options, _context->mow_context)); { std::lock_guard l(_lock); - _file_writers.push_back(std::move(file_writer)); + _file_writers.emplace(segment_id, std::move(file_writer)); } auto s = writer->init(); if (!s.ok()) { @@ -341,6 +341,13 @@ Status SegmentFlusher::create_writer(std::unique_ptr& wr return Status::OK(); } +io::FileWriter* SegmentFlusher::get_file_writer(int32_t segment_id) { + if (!_file_writers.contains(segment_id)) { + return nullptr; + } + return _file_writers[segment_id].get(); +} + SegmentFlusher::Writer::Writer(SegmentFlusher* flusher, std::unique_ptr& segment_writer) : _flusher(flusher), _writer(std::move(segment_writer)) {}; diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index 668c75e47b..fe439d3bc7 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -20,6 +20,7 @@ #include #include +#include #include #include "common/status.h" @@ -102,6 +103,8 @@ public: int64_t num_rows_filtered() const { return _num_rows_filtered; } + io::FileWriter* get_file_writer(int32_t segment_id); + Status close(); public: @@ -153,7 +156,7 @@ private: RowsetWriterContext* _context; mutable SpinLock _lock; // protect following vectors. - std::vector _file_writers; + std::unordered_map _file_writers; // written rows by add_block/add_row std::atomic _num_rows_written = 0; @@ -196,6 +199,10 @@ public: Status close(); + io::FileWriter* get_file_writer(int32_t segment_id) { + return _segment_flusher.get_file_writer(segment_id); + } + private: std::atomic _next_segment_id = 0; SegmentFlusher _segment_flusher;