diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 29aa4eb05f..9ee24dee19 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -996,17 +996,22 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionT return _submit_compaction_task(tablet, compaction_type, force); } -Status StorageEngine::_handle_seg_compaction(SegcompactionWorker* worker, - SegCompactionCandidatesSharedPtr segments) { +Status StorageEngine::_handle_seg_compaction(std::shared_ptr worker, + SegCompactionCandidatesSharedPtr segments, + uint64_t submission_time) { + // note: be aware that worker->_writer maybe released when the task is cancelled + uint64_t exec_queue_time = GetCurrentTimeMicros() - submission_time; + LOG(INFO) << "segcompaction thread pool queue time(ms): " << exec_queue_time / 1000; worker->compact_segments(segments); // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status return Status::OK(); } -Status StorageEngine::submit_seg_compaction_task(SegcompactionWorker* worker, +Status StorageEngine::submit_seg_compaction_task(std::shared_ptr worker, SegCompactionCandidatesSharedPtr segments) { - return _seg_compaction_thread_pool->submit_func( - std::bind(&StorageEngine::_handle_seg_compaction, this, worker, segments)); + uint64_t submission_time = GetCurrentTimeMicros(); + return _seg_compaction_thread_pool->submit_func(std::bind( + &StorageEngine::_handle_seg_compaction, this, worker, segments, submission_time)); } Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) { diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 6850ce0f43..ce30a03c80 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -105,7 +105,7 @@ BaseBetaRowsetWriter::BaseBetaRowsetWriter() _total_index_size(0) {} BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) - : _engine(engine), _segcompaction_worker(std::make_unique(this)) {} + : _engine(engine), _segcompaction_worker(std::make_shared(this)) {} BaseBetaRowsetWriter::~BaseBetaRowsetWriter() { // TODO(lingbin): Should wrapper exception logic, no need to know file ops directly. @@ -381,14 +381,9 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u return Status::OK(); } +// return true if there isn't any flying segcompaction, otherwise return false bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() { - std::lock_guard l(_is_doing_segcompaction_lock); - if (!_is_doing_segcompaction) { - _is_doing_segcompaction = true; - return true; - } else { - return false; - } + return !_is_doing_segcompaction.exchange(true); } Status BetaRowsetWriter::_segcompaction_if_necessary() { @@ -410,7 +405,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() { LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment << ", segcompacted_point:" << _segcompacted_point; - status = _engine.submit_seg_compaction_task(_segcompaction_worker.get(), segments); + status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments); if (status.ok()) { return status; } @@ -546,9 +541,14 @@ Status BetaRowsetWriter::_close_file_writers() { // if _segment_start_id is not zero, that means it's a transient rowset writer for // MoW partial update, don't need to do segment compaction. if (_segment_start_id == 0) { - _segcompaction_worker->cancel(); - RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), - "segcompaction failed when build new rowset"); + if (_segcompaction_worker->cancel()) { + std::lock_guard lk(_is_doing_segcompaction_lock); + _is_doing_segcompaction = false; + _segcompacting_cond.notify_all(); + } else { + RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), + "segcompaction failed when build new rowset"); + } RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(), "rename last segments failed when build new rowset"); if (_segcompaction_worker->get_file_writer()) { diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 347994e243..0bd96f2829 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -225,11 +225,11 @@ private: // already been segment compacted std::atomic _num_segcompacted {0}; // index for segment compaction - std::unique_ptr _segcompaction_worker; + std::shared_ptr _segcompaction_worker; // ensure only one inflight segcompaction task for each rowset std::atomic _is_doing_segcompaction {false}; - // enforce compare-and-swap on _is_doing_segcompaction + // enforce condition variable on _is_doing_segcompaction std::mutex _is_doing_segcompaction_lock; std::condition_variable _segcompacting_cond; diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index be92557da2..ad3e78a608 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -302,10 +302,12 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) { Status status = Status::OK(); - if (_cancelled) { - LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task"; - } else { + if (_is_compacting_state_mutable.exchange(false)) { status = _do_compact_segments(segments); + } else { + // note: be aware that _writer maybe released when the task is cancelled + LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task"; + return; } if (!status.ok()) { int16_t errcode = status.code(); @@ -330,6 +332,13 @@ void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm _writer->_is_doing_segcompaction = false; _writer->_segcompacting_cond.notify_all(); } + _is_compacting_state_mutable = true; +} + +bool SegcompactionWorker::cancel() { + // return true if the task is canncellable (actual compaction is not started) + // return false when the task is not cancellable (it is in the middle of segcompaction) + return _is_compacting_state_mutable.exchange(false); } } // namespace doris diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index e2b5812ad8..5aef89992d 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -54,7 +54,7 @@ public: io::FileWriterPtr& get_file_writer() { return _file_writer; } // set the cancel flag, tasks already started will not be cancelled. - void cancel() { _cancelled = true; } + bool cancel(); private: Status _create_segment_writer_for_segcompaction( @@ -77,6 +77,8 @@ private: // Currently cloud storage engine doesn't need segcompaction BetaRowsetWriter* _writer = nullptr; io::FileWriterPtr _file_writer; - std::atomic _cancelled = false; + + // the state is not mutable when 1)actual compaction operation started or 2) cancelled + std::atomic _is_compacting_state_mutable = true; }; } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 77d3efeaaf..750e7a4ca2 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -196,7 +196,7 @@ public: Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, bool force); - Status submit_seg_compaction_task(SegcompactionWorker* worker, + Status submit_seg_compaction_task(std::shared_ptr worker, SegCompactionCandidatesSharedPtr segments); std::unique_ptr& tablet_publish_txn_thread_pool() { @@ -313,8 +313,9 @@ private: void _remove_unused_remote_files_callback(); void _cold_data_compaction_producer_callback(); - Status _handle_seg_compaction(SegcompactionWorker* worker, - SegCompactionCandidatesSharedPtr segments); + Status _handle_seg_compaction(std::shared_ptr worker, + SegCompactionCandidatesSharedPtr segments, + uint64_t submission_time); Status _handle_index_change(IndexBuilderSharedPtr index_builder);