[opt](load) check -238 segment number limit earlier (#39045) (#41551)

backport #39045

---------

Co-authored-by: Xin Liao <liaoxinbit@126.com>
This commit is contained in:
Kaijie Chen
2024-10-24 11:17:27 +08:00
committed by GitHub
parent 5453337e7b
commit d891ed9de8
2 changed files with 26 additions and 29 deletions

View File

@ -414,26 +414,27 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u
return Status::OK();
}
// return true if there isn't any flying segcompaction, otherwise return false
bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
return !_is_doing_segcompaction.exchange(true);
}
Status BetaRowsetWriter::_segcompaction_if_necessary() {
Status status = Status::OK();
// leave _check_and_set_is_doing_segcompaction as the last condition
// otherwise _segcompacting_cond will never get notified
// if not doing segcompaction, just check segment number
if (!config::enable_segcompaction || !_context.enable_segcompaction ||
!_context.tablet_schema->cluster_key_idxes().empty() ||
_context.tablet_schema->num_variant_columns() > 0 ||
!_check_and_set_is_doing_segcompaction()) {
_context.tablet_schema->num_variant_columns() > 0) {
return _check_segment_number_limit(_num_segment);
}
// leave _is_doing_segcompaction as the last condition
// otherwise _segcompacting_cond will never get notified
if (_is_doing_segcompaction.exchange(true)) {
return status;
}
if (_segcompaction_status.load() != OK) {
status = Status::Error<SEGCOMPACTION_FAILED>(
"BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}",
_segcompaction_status.load());
} else if ((_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
} else {
status = _check_segment_number_limit(_num_segcompacted);
}
if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
SegCompactionCandidatesSharedPtr segments;
status = _find_longest_consecutive_small_segment(segments);
if (LIKELY(status.ok()) && (!segments->empty())) {
@ -619,10 +620,11 @@ Status BetaRowsetWriter::_close_file_writers() {
return Status::OK();
}
Status BaseBetaRowsetWriter::build(RowsetSharedPtr& rowset) {
Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
RETURN_IF_ERROR(_close_file_writers());
RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(),
const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num),
"too many segments when build new rowset");
RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta, true));
@ -805,11 +807,10 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
return Status::OK();
}
Status BaseBetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment + 1;
Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
{ total_segment_num = dp->param("segnum", 1024); });
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
{ segnum = dp->param("segnum", 1024); });
if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, "
"_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too "
@ -820,11 +821,10 @@ Status BaseBetaRowsetWriter::_check_segment_number_limit() {
return Status::OK();
}
Status BetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
{ total_segment_num = dp->param("segnum", 1024); });
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
{ segnum = dp->param("segnum", 1024); });
if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
"_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if "

View File

@ -85,8 +85,6 @@ public:
// This method is thread-safe.
Status flush_single_block(const vectorized::Block* block) override;
Status build(RowsetSharedPtr& rowset) override;
RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override;
PUniqueId load_id() override { return _context.load_id; }
@ -132,18 +130,16 @@ public:
const RowsetWriterContext& context() const override { return _context; }
private:
protected:
virtual Status _generate_delete_bitmap(int32_t segment_id) = 0;
Status _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta,
bool check_segment_num = false);
void update_rowset_schema(TabletSchemaSPtr flush_schema);
// build a tmp rowset for load segment to calc delete_bitmap
// for this segment
protected:
Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer);
virtual Status _close_file_writers();
virtual Status _check_segment_number_limit();
virtual Status _check_segment_number_limit(size_t segnum);
virtual int64_t _num_seg() const;
// build a tmp rowset for load segment to calc delete_bitmap for this segment
Status _build_tmp(RowsetSharedPtr& rowset_ptr);
@ -201,6 +197,8 @@ public:
Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) override;
Status build(RowsetSharedPtr& rowset) override;
Status flush_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
KeyBoundsPB& key_bounds);
@ -213,7 +211,7 @@ private:
// segment compaction
friend class SegcompactionWorker;
Status _close_file_writers() override;
Status _check_segment_number_limit() override;
Status _check_segment_number_limit(size_t segnum) override;
int64_t _num_seg() const override;
Status _wait_flying_segcompaction();
Status _create_segment_writer_for_segcompaction(
@ -222,7 +220,6 @@ private:
Status _segcompaction_rename_last_segments();
Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id);
Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments);
bool _check_and_set_is_doing_segcompaction();
Status _rename_compacted_segments(int64_t begin, int64_t end);
Status _rename_compacted_segment_plain(uint64_t seg_id);
Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id);