[enhencement](segcompaction) cancel inflight segcompaction tasks faster when load finish (#28901)
[Goal] When building the rowset writer, avoid waiting for inflight segcompaction to elimite long tail latency for load. [Current situation] 1. The segcompaction of a rowset is executed serially. During the build phase, we need to wait for the completion of the inflight segcompaction task. 2. If the rowset writer finishes writing and starts building meta, then segments that have not been compacted will not be submitted to segcompaction worker. We simply ignore them to accelerate the build process. 3. But this is not enough. If a segcompaction task has already been submitted to the worker thread pool, we will set a cancelled flag for the worker, and nothing will be done during execution to complete the task ASAP. 4. But this is still not enough. Although the latency of the segcompaction task has been shortened by aforemetioned method, tasks may still be queuing in the thread pool. [Solution] We can increase the worker thread pool to avoid queuing congestion, but this is not the best solution. Segcompaction should be a best effort work, and should not use too many CPU and memory resources. So we adopted the strategy of unbinding build and segcompaction, specifically: 1. For the segcompaction task that is performing compaction operations, we should not interrupt it, otherwise it may cause file corruption 2. For those tasks still queued, we no longer care about their results (because these tasks will know they are cancelled and will not perform any actual operations), so we just ignore them and continue with the subsequent rowset build process Signed-off-by: freemandealer <freeman.zhang1992@gmail.com>
This commit is contained in:
@ -996,17 +996,22 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionT
|
||||
return _submit_compaction_task(tablet, compaction_type, force);
|
||||
}
|
||||
|
||||
Status StorageEngine::_handle_seg_compaction(SegcompactionWorker* worker,
|
||||
SegCompactionCandidatesSharedPtr segments) {
|
||||
Status StorageEngine::_handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
|
||||
SegCompactionCandidatesSharedPtr segments,
|
||||
uint64_t submission_time) {
|
||||
// note: be aware that worker->_writer maybe released when the task is cancelled
|
||||
uint64_t exec_queue_time = GetCurrentTimeMicros() - submission_time;
|
||||
LOG(INFO) << "segcompaction thread pool queue time(ms): " << exec_queue_time / 1000;
|
||||
worker->compact_segments(segments);
|
||||
// return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status StorageEngine::submit_seg_compaction_task(SegcompactionWorker* worker,
|
||||
Status StorageEngine::submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
|
||||
SegCompactionCandidatesSharedPtr segments) {
|
||||
return _seg_compaction_thread_pool->submit_func(
|
||||
std::bind<void>(&StorageEngine::_handle_seg_compaction, this, worker, segments));
|
||||
uint64_t submission_time = GetCurrentTimeMicros();
|
||||
return _seg_compaction_thread_pool->submit_func(std::bind<void>(
|
||||
&StorageEngine::_handle_seg_compaction, this, worker, segments, submission_time));
|
||||
}
|
||||
|
||||
Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) {
|
||||
|
||||
@ -105,7 +105,7 @@ BaseBetaRowsetWriter::BaseBetaRowsetWriter()
|
||||
_total_index_size(0) {}
|
||||
|
||||
BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine)
|
||||
: _engine(engine), _segcompaction_worker(std::make_unique<SegcompactionWorker>(this)) {}
|
||||
: _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {}
|
||||
|
||||
BaseBetaRowsetWriter::~BaseBetaRowsetWriter() {
|
||||
// TODO(lingbin): Should wrapper exception logic, no need to know file ops directly.
|
||||
@ -381,14 +381,9 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// return true if there isn't any flying segcompaction, otherwise return false
|
||||
bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
|
||||
std::lock_guard<std::mutex> l(_is_doing_segcompaction_lock);
|
||||
if (!_is_doing_segcompaction) {
|
||||
_is_doing_segcompaction = true;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
return !_is_doing_segcompaction.exchange(true);
|
||||
}
|
||||
|
||||
Status BetaRowsetWriter::_segcompaction_if_necessary() {
|
||||
@ -410,7 +405,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
|
||||
LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id
|
||||
<< " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment
|
||||
<< ", segcompacted_point:" << _segcompacted_point;
|
||||
status = _engine.submit_seg_compaction_task(_segcompaction_worker.get(), segments);
|
||||
status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments);
|
||||
if (status.ok()) {
|
||||
return status;
|
||||
}
|
||||
@ -546,9 +541,14 @@ Status BetaRowsetWriter::_close_file_writers() {
|
||||
// if _segment_start_id is not zero, that means it's a transient rowset writer for
|
||||
// MoW partial update, don't need to do segment compaction.
|
||||
if (_segment_start_id == 0) {
|
||||
_segcompaction_worker->cancel();
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
|
||||
"segcompaction failed when build new rowset");
|
||||
if (_segcompaction_worker->cancel()) {
|
||||
std::lock_guard lk(_is_doing_segcompaction_lock);
|
||||
_is_doing_segcompaction = false;
|
||||
_segcompacting_cond.notify_all();
|
||||
} else {
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
|
||||
"segcompaction failed when build new rowset");
|
||||
}
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(),
|
||||
"rename last segments failed when build new rowset");
|
||||
if (_segcompaction_worker->get_file_writer()) {
|
||||
|
||||
@ -225,11 +225,11 @@ private:
|
||||
// already been segment compacted
|
||||
std::atomic<int32_t> _num_segcompacted {0}; // index for segment compaction
|
||||
|
||||
std::unique_ptr<SegcompactionWorker> _segcompaction_worker;
|
||||
std::shared_ptr<SegcompactionWorker> _segcompaction_worker;
|
||||
|
||||
// ensure only one inflight segcompaction task for each rowset
|
||||
std::atomic<bool> _is_doing_segcompaction {false};
|
||||
// enforce compare-and-swap on _is_doing_segcompaction
|
||||
// enforce condition variable on _is_doing_segcompaction
|
||||
std::mutex _is_doing_segcompaction_lock;
|
||||
std::condition_variable _segcompacting_cond;
|
||||
|
||||
|
||||
@ -302,10 +302,12 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
|
||||
|
||||
void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) {
|
||||
Status status = Status::OK();
|
||||
if (_cancelled) {
|
||||
LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task";
|
||||
} else {
|
||||
if (_is_compacting_state_mutable.exchange(false)) {
|
||||
status = _do_compact_segments(segments);
|
||||
} else {
|
||||
// note: be aware that _writer maybe released when the task is cancelled
|
||||
LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task";
|
||||
return;
|
||||
}
|
||||
if (!status.ok()) {
|
||||
int16_t errcode = status.code();
|
||||
@ -330,6 +332,13 @@ void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm
|
||||
_writer->_is_doing_segcompaction = false;
|
||||
_writer->_segcompacting_cond.notify_all();
|
||||
}
|
||||
_is_compacting_state_mutable = true;
|
||||
}
|
||||
|
||||
bool SegcompactionWorker::cancel() {
|
||||
// return true if the task is canncellable (actual compaction is not started)
|
||||
// return false when the task is not cancellable (it is in the middle of segcompaction)
|
||||
return _is_compacting_state_mutable.exchange(false);
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -54,7 +54,7 @@ public:
|
||||
io::FileWriterPtr& get_file_writer() { return _file_writer; }
|
||||
|
||||
// set the cancel flag, tasks already started will not be cancelled.
|
||||
void cancel() { _cancelled = true; }
|
||||
bool cancel();
|
||||
|
||||
private:
|
||||
Status _create_segment_writer_for_segcompaction(
|
||||
@ -77,6 +77,8 @@ private:
|
||||
// Currently cloud storage engine doesn't need segcompaction
|
||||
BetaRowsetWriter* _writer = nullptr;
|
||||
io::FileWriterPtr _file_writer;
|
||||
std::atomic<bool> _cancelled = false;
|
||||
|
||||
// the state is not mutable when 1)actual compaction operation started or 2) cancelled
|
||||
std::atomic<bool> _is_compacting_state_mutable = true;
|
||||
};
|
||||
} // namespace doris
|
||||
|
||||
@ -196,7 +196,7 @@ public:
|
||||
|
||||
Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
|
||||
bool force);
|
||||
Status submit_seg_compaction_task(SegcompactionWorker* worker,
|
||||
Status submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
|
||||
SegCompactionCandidatesSharedPtr segments);
|
||||
|
||||
std::unique_ptr<ThreadPool>& tablet_publish_txn_thread_pool() {
|
||||
@ -313,8 +313,9 @@ private:
|
||||
void _remove_unused_remote_files_callback();
|
||||
void _cold_data_compaction_producer_callback();
|
||||
|
||||
Status _handle_seg_compaction(SegcompactionWorker* worker,
|
||||
SegCompactionCandidatesSharedPtr segments);
|
||||
Status _handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
|
||||
SegCompactionCandidatesSharedPtr segments,
|
||||
uint64_t submission_time);
|
||||
|
||||
Status _handle_index_change(IndexBuilderSharedPtr index_builder);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user