From 0c68f7e347dfe2c78e3db528cb5d7b8f2310690b Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 4 Aug 2023 10:10:38 +0800 Subject: [PATCH] [peformance](load) cancel unstarted segcompaction tasks when build rowset (#22392) --- be/src/olap/rowset/beta_rowset_writer.cpp | 56 ++++++----------------- be/src/olap/rowset/beta_rowset_writer.h | 3 +- be/src/olap/rowset/segcompaction.cpp | 7 ++- be/src/olap/rowset/segcompaction.h | 4 ++ 4 files changed, 25 insertions(+), 45 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 551d7bdd7d..e51a3fa66f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -352,23 +352,6 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u return Status::OK(); } -Status BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, - bool is_last) { - if (is_last) { - VLOG_DEBUG << "segcompaction last few segments"; - // currently we only rename remaining segments to reduce wait time - // so that transaction can be committed ASAP - RETURN_IF_ERROR(_load_noncompacted_segments(segments.get(), _num_segment)); - for (int i = 0; i < segments->size(); ++i) { - RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); - } - segments->clear(); - } else { - RETURN_IF_ERROR(_find_longest_consecutive_small_segment(segments)); - } - return Status::OK(); -} - bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() { std::lock_guard l(_is_doing_segcompaction_lock); if (!_is_doing_segcompaction) { @@ -391,7 +374,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() { } else if ((_num_segment - _segcompacted_point) >= config::segcompaction_threshold_segment_num) { SegCompactionCandidatesSharedPtr segments = std::make_shared(); - status = _get_segcompaction_candidates(segments, false); + status = _find_longest_consecutive_small_segment(segments); if (LIKELY(status.ok()) && (segments->size() > 0)) { LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment @@ -411,35 +394,28 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() { return status; } -Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() { - Status status = Status::OK(); +Status BetaRowsetWriter::_segcompaction_rename_last_segments() { DCHECK_EQ(_is_doing_segcompaction, false); if (!config::enable_segcompaction) { return Status::OK(); } if (_segcompaction_status.load() != OK) { return Status::Error( - "BetaRowsetWriter::_segcompaction_ramaining_if_necessary meet invalid state"); + "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state"); } if (!_is_segcompacted() || _segcompacted_point == _num_segment) { // no need if never segcompact before or all segcompacted return Status::OK(); } - _is_doing_segcompaction = true; - SegCompactionCandidatesSharedPtr segments = std::make_shared(); - status = _get_segcompaction_candidates(segments, true); - if (LIKELY(status.ok()) && (segments->size() > 0)) { - LOG(INFO) << "submit segcompaction remaining task, tablet_id:" << _context.tablet_id - << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment - << " segcompacted_point:" << _segcompacted_point; - status = StorageEngine::instance()->submit_seg_compaction_task(&_segcompaction_worker, - segments); - if (status.ok()) { - return status; - } + // currently we only rename remaining segments to reduce wait time + // so that transaction can be committed ASAP + VLOG_DEBUG << "segcompaction last few segments"; + SegCompactionCandidates segments; + RETURN_IF_ERROR(_load_noncompacted_segments(&segments, _num_segment)); + for (int i = 0; i < segments.size(); ++i) { + RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); } - _is_doing_segcompaction = false; - return status; + return Status::OK(); } Status BetaRowsetWriter::_add_rows(const vectorized::Block* block, @@ -595,19 +571,15 @@ RowsetSharedPtr BetaRowsetWriter::build() { // if _segment_start_id is not zero, that means it's a transient rowset writer for // MoW partial update, don't need to do segment compaction. if (_segment_start_id == 0) { + _segcompaction_worker.cancel(); status = wait_flying_segcompaction(); - if (!status.ok()) { - LOG(WARNING) << "segcompaction failed when build new rowset 1st wait, res=" << status; - return nullptr; - } - status = _segcompaction_ramaining_if_necessary(); if (!status.ok()) { LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status; return nullptr; } - status = wait_flying_segcompaction(); + status = _segcompaction_rename_last_segments(); if (!status.ok()) { - LOG(WARNING) << "segcompaction failed when build new rowset 2nd wait, res=" << status; + LOG(WARNING) << "rename last segments failed when build new rowset, res=" << status; return nullptr; } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 7ad3aed786..8c635f3459 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -161,11 +161,10 @@ private: Status _create_segment_writer_for_segcompaction( std::unique_ptr* writer, int64_t begin, int64_t end); Status _segcompaction_if_necessary(); - Status _segcompaction_ramaining_if_necessary(); + Status _segcompaction_rename_last_segments(); Status _load_noncompacted_segments(std::vector* segments, size_t num); Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr segments); - Status _get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, bool is_last); bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; } bool _check_and_set_is_doing_segcompaction(); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index cac1e78fa6..0109d7bc48 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -293,7 +293,12 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt } void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) { - Status status = _do_compact_segments(segments); + Status status = Status::OK(); + if (_cancelled) { + LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task"; + } else { + status = _do_compact_segments(segments); + } if (!status.ok()) { int16_t errcode = status.code(); switch (errcode) { diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 7191cfdc10..7111db5d83 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -54,6 +54,9 @@ public: io::FileWriterPtr& get_file_writer() { return _file_writer; } + // set the cancel flag, tasks already started will not be cancelled. + void cancel() { _cancelled = true; } + private: Status _create_segment_writer_for_segcompaction( std::unique_ptr* writer, uint64_t begin, uint64_t end); @@ -74,5 +77,6 @@ private: //TODO(zhengyu): current impl depends heavily on the access to feilds of BetaRowsetWriter BetaRowsetWriter* _writer; io::FileWriterPtr _file_writer; + std::atomic _cancelled = false; }; } // namespace doris