diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index a90bbec7d8..97bf7f700a 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -731,6 +731,16 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& tablet_meta_info.time_series_compaction_time_threshold_seconds); need_to_save = true; } + if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) { + if (tablet->tablet_meta()->compaction_policy() != "time_series") { + status = Status::InvalidArgument( + "only time series compaction policy support time series config"); + continue; + } + tablet->tablet_meta()->set_time_series_compaction_empty_rowsets_threshold( + tablet_meta_info.time_series_compaction_empty_rowsets_threshold); + need_to_save = true; + } if (tablet_meta_info.__isset.replica_id) { tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id); } diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp b/be/src/olap/cumulative_compaction_time_series_policy.cpp index c5733df3ea..6f7cb8e2e3 100644 --- a/be/src/olap/cumulative_compaction_time_series_policy.cpp +++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp @@ -63,6 +63,13 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( return 0; } + // If there is a continuous set of empty rowsets, prioritize merging. + auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets( + tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold()); + if (!consecutive_empty_rowsets.empty()) { + return score; + } + // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size int64_t compaction_goal_size_mbytes = tablet->tablet_meta()->time_series_compaction_goal_size_mbytes(); @@ -149,6 +156,13 @@ void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point( break; } + // check if the rowset has been compacted, but it is a empty rowset + if (!is_delete && rs->version().first != 0 && rs->version().first != rs->version().second && + rs->num_segments() == 0) { + *ret_cumulative_point = rs->version().first; + break; + } + // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase prev_version = rs->version().second; *ret_cumulative_point = prev_version + 1; @@ -166,6 +180,19 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( return 0; } + // If their are many empty rowsets, maybe should be compacted + auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets( + tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold()); + if (!consecutive_empty_rowsets.empty()) { + VLOG_NOTICE << "tablet is " << tablet->tablet_id() + << ", there are too many consecutive empty rowsets, size is " + << consecutive_empty_rowsets.size(); + input_rowsets->clear(); + input_rowsets->insert(input_rowsets->end(), consecutive_empty_rowsets.begin(), + consecutive_empty_rowsets.end()); + return 0; + } + int transient_size = 0; *compaction_score = 0; input_rowsets->clear(); @@ -175,8 +202,9 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( // BE1 should performs compaction on its own, the time series compaction may re-compact previously fetched rowsets. // time series compaction policy needs to skip over the fetched rowset const auto& first_rowset_iter = std::find_if( - candidate_rowsets.begin(), candidate_rowsets.end(), - [](const RowsetSharedPtr& rs) { return rs->start_version() == rs->end_version(); }); + candidate_rowsets.begin(), candidate_rowsets.end(), [](const RowsetSharedPtr& rs) { + return rs->start_version() == rs->end_version() || rs->num_segments() == 0; + }); for (auto it = first_rowset_iter; it != candidate_rowsets.end(); ++it) { const auto& rowset = *it; // check whether this rowset is delete version @@ -254,10 +282,12 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( void TimeSeriesCumulativeCompactionPolicy::update_cumulative_point( Tablet* tablet, const std::vector& input_rowsets, RowsetSharedPtr output_rowset, Version& last_delete_version) { - if (tablet->tablet_state() != TABLET_RUNNING) { + if (tablet->tablet_state() != TABLET_RUNNING || output_rowset->num_segments() == 0) { // if tablet under alter process, do not update cumulative point + // if the merged output rowset is empty, do not update cumulative point return; } + tablet->set_cumulative_layer_point(output_rowset->end_version() + 1); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 2d4b430334..4f1c322995 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1349,6 +1349,52 @@ std::vector Tablet::pick_candidate_rowsets_to_full_compaction() return pick_candidate_rowsets_to_single_replica_compaction(); } +std::vector Tablet::pick_first_consecutive_empty_rowsets(int limit) { + std::vector consecutive_empty_rowsets; + std::vector candidate_rowsets; + traverse_rowsets([&candidate_rowsets, this](const auto& rs) { + if (rs->is_local() && rs->start_version() >= _cumulative_point) { + candidate_rowsets.emplace_back(rs); + } + }); + std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); + int len = candidate_rowsets.size(); + for (int i = 0; i < len - 1; ++i) { + auto rowset = candidate_rowsets[i]; + auto next_rowset = candidate_rowsets[i + 1]; + + // identify two consecutive rowsets that are empty + if (rowset->num_segments() == 0 && next_rowset->num_segments() == 0 && + !rowset->rowset_meta()->has_delete_predicate() && + !next_rowset->rowset_meta()->has_delete_predicate() && + rowset->end_version() == next_rowset->start_version() - 1) { + consecutive_empty_rowsets.emplace_back(rowset); + consecutive_empty_rowsets.emplace_back(next_rowset); + rowset = next_rowset; + int next_index = i + 2; + + // keep searching for consecutive empty rowsets + while (next_index < len && candidate_rowsets[next_index]->num_segments() == 0 && + !candidate_rowsets[next_index]->rowset_meta()->has_delete_predicate() && + rowset->end_version() == candidate_rowsets[next_index]->start_version() - 1) { + consecutive_empty_rowsets.emplace_back(candidate_rowsets[next_index]); + rowset = candidate_rowsets[next_index++]; + } + // if the number of consecutive empty rowset reach the limit, + // and there are still rowsets following them + if (consecutive_empty_rowsets.size() >= limit && next_index < len) { + return consecutive_empty_rowsets; + } else { + // current rowset is not empty, start searching from that rowset in the next + i = next_index - 1; + consecutive_empty_rowsets.clear(); + } + } + } + + return consecutive_empty_rowsets; +} + std::vector Tablet::pick_candidate_rowsets_to_build_inverted_index( const std::set& alter_index_uids, bool is_drop_op) { std::vector candidate_rowsets; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 9f7141b404..03737ea241 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -271,6 +271,8 @@ public: std::vector pick_candidate_rowsets_to_single_replica_compaction(); std::vector get_all_versions(); + std::vector pick_first_consecutive_empty_rowsets(int limit); + void calculate_cumulative_point(); // TODO(ygl): bool is_primary_replica() { return false; } diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 793bde7b08..fc4a22a617 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -68,7 +68,8 @@ TabletMetaSharedPtr TabletMeta::create( std::move(binlog_config), request.compaction_policy, request.time_series_compaction_goal_size_mbytes, request.time_series_compaction_file_count_threshold, - request.time_series_compaction_time_threshold_seconds); + request.time_series_compaction_time_threshold_seconds, + request.time_series_compaction_empty_rowsets_threshold); } TabletMeta::TabletMeta() @@ -86,7 +87,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id std::optional binlog_config, std::string compaction_policy, int64_t time_series_compaction_goal_size_mbytes, int64_t time_series_compaction_file_count_threshold, - int64_t time_series_compaction_time_threshold_seconds) + int64_t time_series_compaction_time_threshold_seconds, + int64_t time_series_compaction_empty_rowsets_threshold) : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new DeleteBitmap(tablet_id)) { @@ -114,6 +116,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id time_series_compaction_file_count_threshold); tablet_meta_pb.set_time_series_compaction_time_threshold_seconds( time_series_compaction_time_threshold_seconds); + tablet_meta_pb.set_time_series_compaction_empty_rowsets_threshold( + time_series_compaction_empty_rowsets_threshold); TabletSchemaPB* schema = tablet_meta_pb.mutable_schema(); schema->set_num_short_key_columns(tablet_schema.short_key_column_count); schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block); @@ -313,7 +317,9 @@ TabletMeta::TabletMeta(const TabletMeta& b) _time_series_compaction_file_count_threshold( b._time_series_compaction_file_count_threshold), _time_series_compaction_time_threshold_seconds( - b._time_series_compaction_time_threshold_seconds) {}; + b._time_series_compaction_time_threshold_seconds), + _time_series_compaction_empty_rowsets_threshold( + b._time_series_compaction_empty_rowsets_threshold) {}; void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn, ColumnPB* column) { @@ -595,6 +601,8 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { tablet_meta_pb.time_series_compaction_file_count_threshold(); _time_series_compaction_time_threshold_seconds = tablet_meta_pb.time_series_compaction_time_threshold_seconds(); + _time_series_compaction_empty_rowsets_threshold = + tablet_meta_pb.time_series_compaction_empty_rowsets_threshold(); } void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { @@ -676,6 +684,8 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { time_series_compaction_file_count_threshold()); tablet_meta_pb->set_time_series_compaction_time_threshold_seconds( time_series_compaction_time_threshold_seconds()); + tablet_meta_pb->set_time_series_compaction_empty_rowsets_threshold( + time_series_compaction_empty_rowsets_threshold()); } int64_t TabletMeta::mem_size() const { @@ -861,6 +871,9 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) { if (a._time_series_compaction_time_threshold_seconds != b._time_series_compaction_time_threshold_seconds) return false; + if (a._time_series_compaction_empty_rowsets_threshold != + b._time_series_compaction_empty_rowsets_threshold) + return false; return true; } diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 2d14ef599f..a7e284b420 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -109,7 +109,8 @@ public: std::string compaction_policy = "size_based", int64_t time_series_compaction_goal_size_mbytes = 1024, int64_t time_series_compaction_file_count_threshold = 2000, - int64_t time_series_compaction_time_threshold_seconds = 3600); + int64_t time_series_compaction_time_threshold_seconds = 3600, + int64_t time_series_compaction_empty_rowsets_threshold = 5); // If need add a filed in TableMeta, filed init copy in copy construct function TabletMeta(const TabletMeta& tablet_meta); TabletMeta(TabletMeta&& tablet_meta) = delete; @@ -251,6 +252,12 @@ public: int64_t time_series_compaction_time_threshold_seconds() const { return _time_series_compaction_time_threshold_seconds; } + void set_time_series_compaction_empty_rowsets_threshold(int64_t empty_rowsets_threshold) { + _time_series_compaction_empty_rowsets_threshold = empty_rowsets_threshold; + } + int64_t time_series_compaction_empty_rowsets_threshold() const { + return _time_series_compaction_empty_rowsets_threshold; + } private: Status _save_meta(DataDir* data_dir); @@ -303,6 +310,7 @@ private: int64_t _time_series_compaction_goal_size_mbytes = 0; int64_t _time_series_compaction_file_count_threshold = 0; int64_t _time_series_compaction_time_threshold_seconds = 0; + int64_t _time_series_compaction_empty_rowsets_threshold = 0; mutable std::shared_mutex _meta_lock; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index ff6bf103bc..c49a325487 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -505,7 +505,9 @@ public class Alter { || properties .containsKey(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION) || properties - .containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)); + .containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD) + || properties + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)); ((SchemaChangeHandler) schemaChangeHandler).updateTableProperties(db, tableName, properties); } else { throw new DdlException("Invalid alter operation: " + alterClause.getOpType()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 3abebd17d5..aa61eaf906 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -274,6 +274,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { tbl.getTimeSeriesCompactionGoalSizeMbytes(), tbl.getTimeSeriesCompactionFileCountThreshold(), tbl.getTimeSeriesCompactionTimeThresholdSeconds(), + tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), tbl.storeRowColumn(), binlogConfig); createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 654775616b..b1d08ae0c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2201,6 +2201,13 @@ public class SchemaChangeHandler extends AlterHandler { .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))); } + if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)) { + timeSeriesCompactionConfig + .put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD, + Long.parseLong(properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD))); + } + if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null && timeSeriesCompactionConfig.isEmpty() && !properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED) && !properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index b85f0ab764..9e7174752a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -282,6 +282,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { tbl.getTimeSeriesCompactionGoalSizeMbytes(), tbl.getTimeSeriesCompactionFileCountThreshold(), tbl.getTimeSeriesCompactionTimeThresholdSeconds(), + tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), tbl.storeRowColumn(), binlogConfig); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index a23a73df15..07529535e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -206,6 +206,22 @@ public class ModifyTablePropertiesClause extends AlterTableClause { } this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)) { + long emptyRowsetsThreshold; + String emptyRowsetsThresholdStr = properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD); + try { + emptyRowsetsThreshold = Long.parseLong(emptyRowsetsThresholdStr); + if (emptyRowsetsThreshold < 2) { + throw new AnalysisException("time_series_compaction_empty_rowsets_threshold can not be less than 2:" + + emptyRowsetsThresholdStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_empty_rowsets_threshold format: " + + emptyRowsetsThresholdStr); + } + this.needTableStable = false; + this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) { if (!properties.get(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD).equalsIgnoreCase("true") && !properties.get(PropertyAnalyzer diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 23ca4a335f..c6c6beb335 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1069,6 +1069,7 @@ public class RestoreJob extends AbstractJob { localTbl.getTimeSeriesCompactionGoalSizeMbytes(), localTbl.getTimeSeriesCompactionFileCountThreshold(), localTbl.getTimeSeriesCompactionTimeThresholdSeconds(), + localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), localTbl.storeRowColumn(), binlogConfig); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 1ad131abb0..aba414ecdf 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3357,6 +3357,14 @@ public class Env { sb.append(olapTable.getTimeSeriesCompactionTimeThresholdSeconds()).append("\""); } + // time series compaction empty rowsets threshold + if (olapTable.getCompactionPolicy() != null && olapTable.getCompactionPolicy() + .equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) { + sb.append(",\n\"").append(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD).append("\" = \""); + sb.append(olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold()).append("\""); + } + // disable auto compaction sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION).append("\" = \""); sb.append(olapTable.disableAutoCompaction()).append("\""); @@ -4771,7 +4779,8 @@ public class Env { .buildTimeSeriesCompactionTimeThresholdSeconds() .buildSkipWriteIndexOnLoad() .buildDisableAutoCompaction() - .buildEnableSingleReplicaCompaction(); + .buildEnableSingleReplicaCompaction() + .buildTimeSeriesCompactionEmptyRowsetsThreshold(); // need to update partition info meta for (Partition partition : table.getPartitions()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index d12dc9512b..be1ded6eeb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2077,6 +2077,20 @@ public class OlapTable extends Table { return null; } + public void setTimeSeriesCompactionEmptyRowsetsThreshold(long timeSeriesCompactionEmptyRowsetsThreshold) { + TableProperty tableProperty = getOrCreatTableProperty(); + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD, + Long.valueOf(timeSeriesCompactionEmptyRowsetsThreshold).toString()); + tableProperty.buildTimeSeriesCompactionEmptyRowsetsThreshold(); + } + + public Long getTimeSeriesCompactionEmptyRowsetsThreshold() { + if (tableProperty != null) { + return tableProperty.timeSeriesCompactionEmptyRowsetsThreshold(); + } + return null; + } + public int getBaseSchemaVersion() { MaterializedIndexMeta baseIndexMeta = indexIdToMeta.get(baseIndexId); return baseIndexMeta.getSchemaVersion(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 857833de0c..54a6041b50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -97,6 +97,9 @@ public class TableProperty implements Writable { private long timeSeriesCompactionTimeThresholdSeconds = PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; + private long timeSeriesCompactionEmptyRowsetsThreshold + = PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; + private DataSortInfo dataSortInfo = new DataSortInfo(); public TableProperty(Map properties) { @@ -132,6 +135,7 @@ public class TableProperty implements Writable { buildSkipWriteIndexOnLoad(); buildEnableSingleReplicaCompaction(); buildDisableAutoCompaction(); + buildTimeSeriesCompactionEmptyRowsetsThreshold(); break; default: break; @@ -278,6 +282,17 @@ public class TableProperty implements Writable { return timeSeriesCompactionTimeThresholdSeconds; } + public TableProperty buildTimeSeriesCompactionEmptyRowsetsThreshold() { + timeSeriesCompactionEmptyRowsetsThreshold = Long.parseLong(properties + .getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD, + String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE))); + return this; + } + + public long timeSeriesCompactionEmptyRowsetsThreshold() { + return timeSeriesCompactionEmptyRowsetsThreshold; + } + public TableProperty buildMinLoadReplicaNum() { minLoadReplicaNum = Short.parseShort( properties.getOrDefault(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM, "-1")); @@ -528,7 +543,8 @@ public class TableProperty implements Writable { .buildTimeSeriesCompactionFileCountThreshold() .buildTimeSeriesCompactionTimeThresholdSeconds() .buildDisableAutoCompaction() - .buildEnableSingleReplicaCompaction(); + .buildEnableSingleReplicaCompaction() + .buildTimeSeriesCompactionEmptyRowsetsThreshold(); if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) { // get replica num from property map and create replica allocation String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index c8c0978ca7..9c0cfe722f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -144,6 +144,10 @@ public class PropertyAnalyzer { public static final String PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS = "time_series_compaction_time_threshold_seconds"; + + public static final String PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD = + "time_series_compaction_empty_rowsets_threshold"; + public static final String PROPERTIES_MUTABLE = "mutable"; public static final String PROPERTIES_IS_BEING_SYNCED = "is_being_synced"; @@ -182,6 +186,7 @@ public class PropertyAnalyzer { public static final long TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE = 1024; public static final long TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE = 2000; public static final long TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE = 3600; + public static final long TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE = 5; /** @@ -711,27 +716,28 @@ public class PropertyAnalyzer { return compactionPolicy; } - public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map properties) + public static long analyzeTimeSeriesCompactionEmptyRowsetsThreshold(Map properties) throws AnalysisException { - long goalSizeMbytes = TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; + long emptyRowsetsThreshold = TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; if (properties == null || properties.isEmpty()) { - return goalSizeMbytes; + return emptyRowsetsThreshold; } - if (properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { - String goalSizeMbytesStr = properties.get(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES); - properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES); + if (properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)) { + String emptyRowsetsThresholdStr = properties + .get(PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD); + properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD); try { - goalSizeMbytes = Long.parseLong(goalSizeMbytesStr); - if (goalSizeMbytes < 10) { - throw new AnalysisException("time_series_compaction_goal_size_mbytes can not be" - + " less than 10: " + goalSizeMbytesStr); + emptyRowsetsThreshold = Long.parseLong(emptyRowsetsThresholdStr); + if (emptyRowsetsThreshold < 2) { + throw new AnalysisException("time_series_compaction_empty_rowsets_threshold can not" + + " be less than 2: " + emptyRowsetsThresholdStr); } } catch (NumberFormatException e) { - throw new AnalysisException("Invalid time_series_compaction_goal_size_mbytes format: " - + goalSizeMbytesStr); + throw new AnalysisException("Invalid time_series_compaction_empty_rowsets_threshold: " + + emptyRowsetsThresholdStr); } } - return goalSizeMbytes; + return emptyRowsetsThreshold; } public static long analyzeTimeSeriesCompactionFileCountThreshold(Map properties) @@ -781,6 +787,29 @@ public class PropertyAnalyzer { return timeThresholdSeconds; } + public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map properties) + throws AnalysisException { + long goalSizeMbytes = TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; + if (properties == null || properties.isEmpty()) { + return goalSizeMbytes; + } + if (properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { + String goalSizeMbytesStr = properties.get(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES); + properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES); + try { + goalSizeMbytes = Long.parseLong(goalSizeMbytesStr); + if (goalSizeMbytes < 10) { + throw new AnalysisException("time_series_compaction_goal_size_mbytes can not be" + + " less than 10: " + goalSizeMbytesStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_goal_size_mbytes format: " + + goalSizeMbytesStr); + } + } + return goalSizeMbytes; + } + // analyzeCompressionType will parse the compression type from properties public static TCompressionType analyzeCompressionType(Map properties) throws AnalysisException { String compressionType = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index a3ced0b72f..5adb18e344 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1446,6 +1446,10 @@ public class InternalCatalog implements CatalogIf { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS, olapTable.getTimeSeriesCompactionTimeThresholdSeconds().toString()); } + if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)) { + properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD, + olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold().toString()); + } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) { properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, olapTable.getStoragePolicy()); } @@ -1542,6 +1546,7 @@ public class InternalCatalog implements CatalogIf { olapTable.getTimeSeriesCompactionGoalSizeMbytes(), olapTable.getTimeSeriesCompactionFileCountThreshold(), olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), + olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(), olapTable.storeRowColumn(), binlogConfig, dataProperty.isStorageMediumSpecified(), null); // TODO cluster key ids @@ -1798,6 +1803,7 @@ public class InternalCatalog implements CatalogIf { boolean enableSingleReplicaCompaction, boolean skipWriteIndexOnLoad, String compactionPolicy, Long timeSeriesCompactionGoalSizeMbytes, Long timeSeriesCompactionFileCountThreshold, Long timeSeriesCompactionTimeThresholdSeconds, + Long timeSeriesCompactionEmptyRowsetsThreshold, boolean storeRowColumn, BinlogConfig binlogConfig, boolean isStorageMediumSpecified, List clusterKeyIndexes) throws DdlException { // create base index first. @@ -1864,6 +1870,7 @@ public class InternalCatalog implements CatalogIf { disableAutoCompaction, enableSingleReplicaCompaction, skipWriteIndexOnLoad, compactionPolicy, timeSeriesCompactionGoalSizeMbytes, timeSeriesCompactionFileCountThreshold, timeSeriesCompactionTimeThresholdSeconds, + timeSeriesCompactionEmptyRowsetsThreshold, storeRowColumn, binlogConfig); task.setStorageFormat(storageFormat); @@ -2079,7 +2086,9 @@ public class InternalCatalog implements CatalogIf { && (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES) || properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD) || properties - .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))) { + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS) + || properties + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD))) { throw new DdlException("only time series compaction policy support for time series config"); } @@ -2116,6 +2125,17 @@ public class InternalCatalog implements CatalogIf { } olapTable.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds); + // set time series compaction empty rowsets threshold + long timeSeriesCompactionEmptyRowsetsThreshold + = PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; + try { + timeSeriesCompactionEmptyRowsetsThreshold = PropertyAnalyzer + .analyzeTimeSeriesCompactionEmptyRowsetsThreshold(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + olapTable.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold); + // get storage format TStorageFormat storageFormat = TStorageFormat.V2; // default is segment v2 try { @@ -2466,6 +2486,7 @@ public class InternalCatalog implements CatalogIf { olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(), olapTable.getTimeSeriesCompactionFileCountThreshold(), olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), + olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(), storeRowColumn, binlogConfigForTask, partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(), keysDesc.getClusterKeysColumnIds()); @@ -2542,6 +2563,7 @@ public class InternalCatalog implements CatalogIf { olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(), olapTable.getTimeSeriesCompactionFileCountThreshold(), olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), + olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(), storeRowColumn, binlogConfigForTask, dataProperty.isStorageMediumSpecified(), keysDesc.getClusterKeysColumnIds()); olapTable.addPartition(partition); @@ -2990,6 +3012,7 @@ public class InternalCatalog implements CatalogIf { olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(), olapTable.getTimeSeriesCompactionFileCountThreshold(), olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), + olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(), olapTable.storeRowColumn(), binlogConfig, copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified(), clusterKeyIdxes); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 71064585a3..2c8dc80aff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -835,6 +835,7 @@ public class ReportHandler extends Daemon { olapTable.getTimeSeriesCompactionGoalSizeMbytes(), olapTable.getTimeSeriesCompactionFileCountThreshold(), olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), + olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(), olapTable.storeRowColumn(), binlogConfig); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 6842e273a6..818636fe38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -111,6 +111,8 @@ public class CreateReplicaTask extends AgentTask { private long timeSeriesCompactionTimeThresholdSeconds; + private long timeSeriesCompactionEmptyRowsetsThreshold; + private boolean storeRowColumn; private BinlogConfig binlogConfig; @@ -134,6 +136,7 @@ public class CreateReplicaTask extends AgentTask { long timeSeriesCompactionGoalSizeMbytes, long timeSeriesCompactionFileCountThreshold, long timeSeriesCompactionTimeThresholdSeconds, + long timeSeriesCompactionEmptyRowsetsThreshold, boolean storeRowColumn, BinlogConfig binlogConfig) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); @@ -175,6 +178,7 @@ public class CreateReplicaTask extends AgentTask { this.timeSeriesCompactionGoalSizeMbytes = timeSeriesCompactionGoalSizeMbytes; this.timeSeriesCompactionFileCountThreshold = timeSeriesCompactionFileCountThreshold; this.timeSeriesCompactionTimeThresholdSeconds = timeSeriesCompactionTimeThresholdSeconds; + this.timeSeriesCompactionEmptyRowsetsThreshold = timeSeriesCompactionEmptyRowsetsThreshold; this.storeRowColumn = storeRowColumn; this.binlogConfig = binlogConfig; } @@ -322,6 +326,7 @@ public class CreateReplicaTask extends AgentTask { createTabletReq.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes); createTabletReq.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold); createTabletReq.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds); + createTabletReq.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold); if (binlogConfig != null) { createTabletReq.setBinlogConfig(binlogConfig.toThrift()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java index af5e45a825..65158cd31a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java @@ -155,6 +155,11 @@ public class UpdateTabletMetaInfoTask extends AgentTask { metaInfo.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionConfig .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)); } + if (timeSeriesCompactionConfig + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)) { + metaInfo.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionConfig + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)); + } } if (enableSingleReplicaCompaction >= 0) { metaInfo.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction > 0); diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index 8ef7104737..60ec442bed 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -107,7 +107,7 @@ public class AgentTaskTest { createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null, - TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, false, null); + TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, false, null); // drop dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false); diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 0f24f6b8d2..2bf8470719 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -335,6 +335,7 @@ message TabletMetaPB { optional int64 time_series_compaction_goal_size_mbytes = 29 [default = 1024]; optional int64 time_series_compaction_file_count_threshold = 30 [default = 2000]; optional int64 time_series_compaction_time_threshold_seconds = 31 [default = 3600]; + optional int64 time_series_compaction_empty_rowsets_threshold = 32 [default = 5]; } message OLAPRawDeltaHeaderMessage { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 6f777675f3..a1c4020b14 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -150,6 +150,7 @@ struct TCreateTabletReq { 23: optional i64 time_series_compaction_goal_size_mbytes = 1024 24: optional i64 time_series_compaction_file_count_threshold = 2000 25: optional i64 time_series_compaction_time_threshold_seconds = 3600 + 26: optional i64 time_series_compaction_empty_rowsets_threshold = 5 } struct TDropTabletReq { @@ -419,6 +420,7 @@ struct TTabletMetaInfo { 14: optional bool enable_single_replica_compaction 15: optional bool skip_write_index_on_load 16: optional bool disable_auto_compaction + 17: optional i64 time_series_compaction_empty_rowsets_threshold } struct TUpdateTabletMetaInfoReq {