From b51fcbd9c72cd7ef0422b5ef922574a1f59b9ae7 Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Thu, 27 Jul 2023 17:36:54 +0800 Subject: [PATCH] [opt](stats) Scale replica of stats table to 3 when it's possible (#22227) So that we could improve the availability of stats. --- .../java/org/apache/doris/common/Config.java | 7 --- .../catalog/InternalSchemaInitializer.java | 55 ++++++++++++++++++- .../doris/statistics/StatisticConstants.java | 15 ++--- .../doris/system/SystemInfoService.java | 4 ++ 4 files changed, 60 insertions(+), 21 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index e7fdaa5f08..74966c0c04 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1675,13 +1675,6 @@ public class Config extends ConfigBase { @ConfField public static int statistics_simultaneously_running_task_num = 10; - /** - * Internal table replica num, once set, user should promise the avaible BE is greater than this value, - * otherwise the statistics related internal table creation would be failed. - */ - @ConfField - public static int statistic_internal_table_replica_num = 1; - /** * if table has too many replicas, Fe occur oom when schema change. * 10W replicas is a reasonable value for testing. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 74d7a5d9fa..39b2326e30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -30,6 +30,8 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.statistics.util.StatisticsUtil; @@ -75,6 +77,53 @@ public class InternalSchemaInitializer extends Thread { } } LOG.info("Internal schema is initialized"); + Optional op + = Env.getCurrentEnv().getInternalCatalog().getDb(StatisticConstants.DB_NAME); + if (!op.isPresent()) { + LOG.warn("Internal DB got deleted!"); + return; + } + Database database = op.get(); + modifyTblReplicaCount(database, StatisticConstants.ANALYSIS_TBL_NAME); + modifyTblReplicaCount(database, StatisticConstants.STATISTIC_TBL_NAME); + modifyTblReplicaCount(database, StatisticConstants.HISTOGRAM_TBL_NAME); + } + + public void modifyTblReplicaCount(Database database, String tblName) { + if (!(Config.min_replication_num_per_tablet < StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM + && Config.max_replication_num_per_tablet >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM)) { + return; + } + while (true) { + if (Env.getCurrentSystemInfo().aliveBECount() >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) { + try { + Map props = new HashMap<>(); + props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, "tag.location.default: " + + StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM); + TableIf colStatsTbl = StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME, + StatisticConstants.DB_NAME, tblName); + OlapTable olapTable = (OlapTable) colStatsTbl; + Partition partition = olapTable.getPartition(olapTable.getName()); + if (partition.getReplicaCount() >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) { + return; + } + try { + colStatsTbl.writeLock(); + Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) colStatsTbl, props); + } finally { + colStatsTbl.writeUnlock(); + } + break; + } catch (Throwable t) { + LOG.warn("Failed to scale replica of stats tbl:{} to 3", tblName, t); + } + } + try { + Thread.sleep(5000); + } catch (InterruptedException t) { + // IGNORE + } + } } private void createTbl() throws UserException { @@ -122,7 +171,7 @@ public class InternalSchemaInitializer extends Thread { Map properties = new HashMap() { { put("replication_num", String.valueOf( - Math.max(Config.statistic_internal_table_replica_num, Config.min_replication_num_per_tablet))); + Math.max(1, Config.min_replication_num_per_tablet))); } }; CreateTableStmt createTableStmt = new CreateTableStmt(true, false, @@ -162,7 +211,7 @@ public class InternalSchemaInitializer extends Thread { Map properties = new HashMap() { { put("replication_num", String.valueOf( - Math.max(Config.statistic_internal_table_replica_num, Config.min_replication_num_per_tablet))); + Math.max(1, Config.min_replication_num_per_tablet))); } }; CreateTableStmt createTableStmt = new CreateTableStmt(true, false, @@ -195,7 +244,7 @@ public class InternalSchemaInitializer extends Thread { StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT, uniqueKeys); Map properties = new HashMap() { { - put("replication_num", String.valueOf(Math.max(Config.statistic_internal_table_replica_num, + put("replication_num", String.valueOf(Math.max(1, Config.min_replication_num_per_tablet))); } }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index e9bccff5b9..072738e19b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -32,14 +32,10 @@ public class StatisticConstants { public static final String HISTOGRAM_TBL_NAME = "histogram_statistics"; - public static final String ANALYSIS_JOB_TABLE = "analysis_jobs"; - public static final int MAX_NAME_LEN = 64; public static final int ID_LEN = 4096; - public static final int STATISTIC_PARALLEL_EXEC_INSTANCE_NUM = 1; - public static final int STATISTICS_CACHE_VALID_DURATION_IN_HOURS = 24 * 2; public static final int STATISTICS_CACHE_REFRESH_INTERVAL = 24 * 2; @@ -51,18 +47,11 @@ public class StatisticConstants { */ public static final int STATISTIC_TABLE_BUCKET_COUNT = 7; - public static final long STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES = 2L * 1024 * 1024 * 1024; - /** * Determine the execution interval for 'Statistics Table Cleaner' thread. */ public static final int STATISTIC_CLEAN_INTERVAL_IN_HOURS = 24 * 2; - /** - * If analysis job execution time exceeds this time, it would be cancelled. - */ - public static final long STATISTICS_TASKS_TIMEOUT_IN_MS = TimeUnit.MINUTES.toMillis(10); - public static final long PRELOAD_RETRY_TIMES = 5; public static final long PRELOAD_RETRY_INTERVAL_IN_SECONDS = TimeUnit.SECONDS.toMillis(10); @@ -86,6 +75,10 @@ public class StatisticConstants { public static int ANALYZE_TASK_RETRY_TIMES = 5; + public static final String DB_NAME = SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME; + + public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3; + static { STATISTICS_DB_BLACK_LIST.add(SystemInfoService.DEFAULT_CLUSTER + ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 2488b36a3c..0f4639a13f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -984,4 +984,8 @@ public class SystemInfoService { } return minPipelineExecutorSize; } + + public long aliveBECount() { + return idToBackendRef.values().stream().filter(Backend::isAlive).count(); + } }