[peformance](load) cancel unstarted segcompaction tasks when build rowset (#22392)
This commit is contained in:
@ -352,23 +352,6 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments,
|
||||
bool is_last) {
|
||||
if (is_last) {
|
||||
VLOG_DEBUG << "segcompaction last few segments";
|
||||
// currently we only rename remaining segments to reduce wait time
|
||||
// so that transaction can be committed ASAP
|
||||
RETURN_IF_ERROR(_load_noncompacted_segments(segments.get(), _num_segment));
|
||||
for (int i = 0; i < segments->size(); ++i) {
|
||||
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
|
||||
}
|
||||
segments->clear();
|
||||
} else {
|
||||
RETURN_IF_ERROR(_find_longest_consecutive_small_segment(segments));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
|
||||
std::lock_guard<std::mutex> l(_is_doing_segcompaction_lock);
|
||||
if (!_is_doing_segcompaction) {
|
||||
@ -391,7 +374,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
|
||||
} else if ((_num_segment - _segcompacted_point) >=
|
||||
config::segcompaction_threshold_segment_num) {
|
||||
SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>();
|
||||
status = _get_segcompaction_candidates(segments, false);
|
||||
status = _find_longest_consecutive_small_segment(segments);
|
||||
if (LIKELY(status.ok()) && (segments->size() > 0)) {
|
||||
LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id
|
||||
<< " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment
|
||||
@ -411,35 +394,28 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
|
||||
return status;
|
||||
}
|
||||
|
||||
Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
|
||||
Status status = Status::OK();
|
||||
Status BetaRowsetWriter::_segcompaction_rename_last_segments() {
|
||||
DCHECK_EQ(_is_doing_segcompaction, false);
|
||||
if (!config::enable_segcompaction) {
|
||||
return Status::OK();
|
||||
}
|
||||
if (_segcompaction_status.load() != OK) {
|
||||
return Status::Error<SEGCOMPACTION_FAILED>(
|
||||
"BetaRowsetWriter::_segcompaction_ramaining_if_necessary meet invalid state");
|
||||
"BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state");
|
||||
}
|
||||
if (!_is_segcompacted() || _segcompacted_point == _num_segment) {
|
||||
// no need if never segcompact before or all segcompacted
|
||||
return Status::OK();
|
||||
}
|
||||
_is_doing_segcompaction = true;
|
||||
SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>();
|
||||
status = _get_segcompaction_candidates(segments, true);
|
||||
if (LIKELY(status.ok()) && (segments->size() > 0)) {
|
||||
LOG(INFO) << "submit segcompaction remaining task, tablet_id:" << _context.tablet_id
|
||||
<< " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment
|
||||
<< " segcompacted_point:" << _segcompacted_point;
|
||||
status = StorageEngine::instance()->submit_seg_compaction_task(&_segcompaction_worker,
|
||||
segments);
|
||||
if (status.ok()) {
|
||||
return status;
|
||||
}
|
||||
// currently we only rename remaining segments to reduce wait time
|
||||
// so that transaction can be committed ASAP
|
||||
VLOG_DEBUG << "segcompaction last few segments";
|
||||
SegCompactionCandidates segments;
|
||||
RETURN_IF_ERROR(_load_noncompacted_segments(&segments, _num_segment));
|
||||
for (int i = 0; i < segments.size(); ++i) {
|
||||
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
|
||||
}
|
||||
_is_doing_segcompaction = false;
|
||||
return status;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BetaRowsetWriter::_add_rows(const vectorized::Block* block,
|
||||
@ -595,19 +571,15 @@ RowsetSharedPtr BetaRowsetWriter::build() {
|
||||
// if _segment_start_id is not zero, that means it's a transient rowset writer for
|
||||
// MoW partial update, don't need to do segment compaction.
|
||||
if (_segment_start_id == 0) {
|
||||
_segcompaction_worker.cancel();
|
||||
status = wait_flying_segcompaction();
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "segcompaction failed when build new rowset 1st wait, res=" << status;
|
||||
return nullptr;
|
||||
}
|
||||
status = _segcompaction_ramaining_if_necessary();
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status;
|
||||
return nullptr;
|
||||
}
|
||||
status = wait_flying_segcompaction();
|
||||
status = _segcompaction_rename_last_segments();
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "segcompaction failed when build new rowset 2nd wait, res=" << status;
|
||||
LOG(WARNING) << "rename last segments failed when build new rowset, res=" << status;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
@ -161,11 +161,10 @@ private:
|
||||
Status _create_segment_writer_for_segcompaction(
|
||||
std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end);
|
||||
Status _segcompaction_if_necessary();
|
||||
Status _segcompaction_ramaining_if_necessary();
|
||||
Status _segcompaction_rename_last_segments();
|
||||
Status _load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
|
||||
size_t num);
|
||||
Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr segments);
|
||||
Status _get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, bool is_last);
|
||||
bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
|
||||
|
||||
bool _check_and_set_is_doing_segcompaction();
|
||||
|
||||
@ -293,7 +293,12 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
|
||||
}
|
||||
|
||||
void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) {
|
||||
Status status = _do_compact_segments(segments);
|
||||
Status status = Status::OK();
|
||||
if (_cancelled) {
|
||||
LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task";
|
||||
} else {
|
||||
status = _do_compact_segments(segments);
|
||||
}
|
||||
if (!status.ok()) {
|
||||
int16_t errcode = status.code();
|
||||
switch (errcode) {
|
||||
|
||||
@ -54,6 +54,9 @@ public:
|
||||
|
||||
io::FileWriterPtr& get_file_writer() { return _file_writer; }
|
||||
|
||||
// set the cancel flag, tasks already started will not be cancelled.
|
||||
void cancel() { _cancelled = true; }
|
||||
|
||||
private:
|
||||
Status _create_segment_writer_for_segcompaction(
|
||||
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, uint64_t end);
|
||||
@ -74,5 +77,6 @@ private:
|
||||
//TODO(zhengyu): current impl depends heavily on the access to feilds of BetaRowsetWriter
|
||||
BetaRowsetWriter* _writer;
|
||||
io::FileWriterPtr _file_writer;
|
||||
std::atomic<bool> _cancelled = false;
|
||||
};
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user