diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 688e8dfccd..65c17eaee7 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -414,26 +414,27 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u return Status::OK(); } -// return true if there isn't any flying segcompaction, otherwise return false -bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() { - return !_is_doing_segcompaction.exchange(true); -} - Status BetaRowsetWriter::_segcompaction_if_necessary() { Status status = Status::OK(); - // leave _check_and_set_is_doing_segcompaction as the last condition - // otherwise _segcompacting_cond will never get notified + // if not doing segcompaction, just check segment number if (!config::enable_segcompaction || !_context.enable_segcompaction || !_context.tablet_schema->cluster_key_idxes().empty() || - _context.tablet_schema->num_variant_columns() > 0 || - !_check_and_set_is_doing_segcompaction()) { + _context.tablet_schema->num_variant_columns() > 0) { + return _check_segment_number_limit(_num_segment); + } + // leave _is_doing_segcompaction as the last condition + // otherwise _segcompacting_cond will never get notified + if (_is_doing_segcompaction.exchange(true)) { return status; } if (_segcompaction_status.load() != OK) { status = Status::Error( "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}", _segcompaction_status.load()); - } else if ((_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) { + } else { + status = _check_segment_number_limit(_num_segcompacted); + } + if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) { SegCompactionCandidatesSharedPtr segments; status = _find_longest_consecutive_small_segment(segments); if (LIKELY(status.ok()) && (!segments->empty())) { @@ -619,10 +620,11 @@ Status BetaRowsetWriter::_close_file_writers() { return Status::OK(); } -Status BaseBetaRowsetWriter::build(RowsetSharedPtr& rowset) { +Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { RETURN_IF_ERROR(_close_file_writers()); - RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(), + const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; + RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num), "too many segments when build new rowset"); RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta, true)); @@ -805,11 +807,10 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( return Status::OK(); } -Status BaseBetaRowsetWriter::_check_segment_number_limit() { - size_t total_segment_num = _num_segment + 1; +Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) { DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", - { total_segment_num = dp->param("segnum", 1024); }); - if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) { + { segnum = dp->param("segnum", 1024); }); + if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { return Status::Error( "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, " "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too " @@ -820,11 +821,10 @@ Status BaseBetaRowsetWriter::_check_segment_number_limit() { return Status::OK(); } -Status BetaRowsetWriter::_check_segment_number_limit() { - size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; +Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) { DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", - { total_segment_num = dp->param("segnum", 1024); }); - if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) { + { segnum = dp->param("segnum", 1024); }); + if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { return Status::Error( "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, " "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if " diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index b897d96c9b..5a34371e65 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -85,8 +85,6 @@ public: // This method is thread-safe. Status flush_single_block(const vectorized::Block* block) override; - Status build(RowsetSharedPtr& rowset) override; - RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override; PUniqueId load_id() override { return _context.load_id; } @@ -132,18 +130,16 @@ public: const RowsetWriterContext& context() const override { return _context; } -private: +protected: virtual Status _generate_delete_bitmap(int32_t segment_id) = 0; Status _build_rowset_meta(std::shared_ptr rowset_meta, bool check_segment_num = false); void update_rowset_schema(TabletSchemaSPtr flush_schema); - // build a tmp rowset for load segment to calc delete_bitmap - // for this segment -protected: + Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer); virtual Status _close_file_writers(); - virtual Status _check_segment_number_limit(); + virtual Status _check_segment_number_limit(size_t segnum); virtual int64_t _num_seg() const; // build a tmp rowset for load segment to calc delete_bitmap for this segment Status _build_tmp(RowsetSharedPtr& rowset_ptr); @@ -201,6 +197,8 @@ public: Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat, TabletSchemaSPtr flush_schema) override; + Status build(RowsetSharedPtr& rowset) override; + Status flush_segment_writer_for_segcompaction( std::unique_ptr* writer, uint64_t index_size, KeyBoundsPB& key_bounds); @@ -213,7 +211,7 @@ private: // segment compaction friend class SegcompactionWorker; Status _close_file_writers() override; - Status _check_segment_number_limit() override; + Status _check_segment_number_limit(size_t segnum) override; int64_t _num_seg() const override; Status _wait_flying_segcompaction(); Status _create_segment_writer_for_segcompaction( @@ -222,7 +220,6 @@ private: Status _segcompaction_rename_last_segments(); Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id); Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments); - bool _check_and_set_is_doing_segcompaction(); Status _rename_compacted_segments(int64_t begin, int64_t end); Status _rename_compacted_segment_plain(uint64_t seg_id); Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id);