[opt](stats) Scale replica of stats table to 3 when it's possible (#22227)
So that we could improve the availability of stats.
This commit is contained in:
@ -30,6 +30,8 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
import org.apache.doris.statistics.StatisticConstants;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
@ -75,6 +77,53 @@ public class InternalSchemaInitializer extends Thread {
|
||||
}
|
||||
}
|
||||
LOG.info("Internal schema is initialized");
|
||||
Optional<Database> op
|
||||
= Env.getCurrentEnv().getInternalCatalog().getDb(StatisticConstants.DB_NAME);
|
||||
if (!op.isPresent()) {
|
||||
LOG.warn("Internal DB got deleted!");
|
||||
return;
|
||||
}
|
||||
Database database = op.get();
|
||||
modifyTblReplicaCount(database, StatisticConstants.ANALYSIS_TBL_NAME);
|
||||
modifyTblReplicaCount(database, StatisticConstants.STATISTIC_TBL_NAME);
|
||||
modifyTblReplicaCount(database, StatisticConstants.HISTOGRAM_TBL_NAME);
|
||||
}
|
||||
|
||||
public void modifyTblReplicaCount(Database database, String tblName) {
|
||||
if (!(Config.min_replication_num_per_tablet < StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM
|
||||
&& Config.max_replication_num_per_tablet >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM)) {
|
||||
return;
|
||||
}
|
||||
while (true) {
|
||||
if (Env.getCurrentSystemInfo().aliveBECount() >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
|
||||
try {
|
||||
Map<String, String> props = new HashMap<>();
|
||||
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, "tag.location.default: "
|
||||
+ StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
|
||||
TableIf colStatsTbl = StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
|
||||
StatisticConstants.DB_NAME, tblName);
|
||||
OlapTable olapTable = (OlapTable) colStatsTbl;
|
||||
Partition partition = olapTable.getPartition(olapTable.getName());
|
||||
if (partition.getReplicaCount() >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
colStatsTbl.writeLock();
|
||||
Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) colStatsTbl, props);
|
||||
} finally {
|
||||
colStatsTbl.writeUnlock();
|
||||
}
|
||||
break;
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to scale replica of stats tbl:{} to 3", tblName, t);
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException t) {
|
||||
// IGNORE
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void createTbl() throws UserException {
|
||||
@ -122,7 +171,7 @@ public class InternalSchemaInitializer extends Thread {
|
||||
Map<String, String> properties = new HashMap<String, String>() {
|
||||
{
|
||||
put("replication_num", String.valueOf(
|
||||
Math.max(Config.statistic_internal_table_replica_num, Config.min_replication_num_per_tablet)));
|
||||
Math.max(1, Config.min_replication_num_per_tablet)));
|
||||
}
|
||||
};
|
||||
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
|
||||
@ -162,7 +211,7 @@ public class InternalSchemaInitializer extends Thread {
|
||||
Map<String, String> properties = new HashMap<String, String>() {
|
||||
{
|
||||
put("replication_num", String.valueOf(
|
||||
Math.max(Config.statistic_internal_table_replica_num, Config.min_replication_num_per_tablet)));
|
||||
Math.max(1, Config.min_replication_num_per_tablet)));
|
||||
}
|
||||
};
|
||||
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
|
||||
@ -195,7 +244,7 @@ public class InternalSchemaInitializer extends Thread {
|
||||
StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT, uniqueKeys);
|
||||
Map<String, String> properties = new HashMap<String, String>() {
|
||||
{
|
||||
put("replication_num", String.valueOf(Math.max(Config.statistic_internal_table_replica_num,
|
||||
put("replication_num", String.valueOf(Math.max(1,
|
||||
Config.min_replication_num_per_tablet)));
|
||||
}
|
||||
};
|
||||
|
||||
@ -32,14 +32,10 @@ public class StatisticConstants {
|
||||
|
||||
public static final String HISTOGRAM_TBL_NAME = "histogram_statistics";
|
||||
|
||||
public static final String ANALYSIS_JOB_TABLE = "analysis_jobs";
|
||||
|
||||
public static final int MAX_NAME_LEN = 64;
|
||||
|
||||
public static final int ID_LEN = 4096;
|
||||
|
||||
public static final int STATISTIC_PARALLEL_EXEC_INSTANCE_NUM = 1;
|
||||
|
||||
public static final int STATISTICS_CACHE_VALID_DURATION_IN_HOURS = 24 * 2;
|
||||
|
||||
public static final int STATISTICS_CACHE_REFRESH_INTERVAL = 24 * 2;
|
||||
@ -51,18 +47,11 @@ public class StatisticConstants {
|
||||
*/
|
||||
public static final int STATISTIC_TABLE_BUCKET_COUNT = 7;
|
||||
|
||||
public static final long STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES = 2L * 1024 * 1024 * 1024;
|
||||
|
||||
/**
|
||||
* Determine the execution interval for 'Statistics Table Cleaner' thread.
|
||||
*/
|
||||
public static final int STATISTIC_CLEAN_INTERVAL_IN_HOURS = 24 * 2;
|
||||
|
||||
/**
|
||||
* If analysis job execution time exceeds this time, it would be cancelled.
|
||||
*/
|
||||
public static final long STATISTICS_TASKS_TIMEOUT_IN_MS = TimeUnit.MINUTES.toMillis(10);
|
||||
|
||||
public static final long PRELOAD_RETRY_TIMES = 5;
|
||||
|
||||
public static final long PRELOAD_RETRY_INTERVAL_IN_SECONDS = TimeUnit.SECONDS.toMillis(10);
|
||||
@ -86,6 +75,10 @@ public class StatisticConstants {
|
||||
|
||||
public static int ANALYZE_TASK_RETRY_TIMES = 5;
|
||||
|
||||
public static final String DB_NAME = SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME;
|
||||
|
||||
public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3;
|
||||
|
||||
static {
|
||||
STATISTICS_DB_BLACK_LIST.add(SystemInfoService.DEFAULT_CLUSTER
|
||||
+ ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME);
|
||||
|
||||
@ -984,4 +984,8 @@ public class SystemInfoService {
|
||||
}
|
||||
return minPipelineExecutorSize;
|
||||
}
|
||||
|
||||
public long aliveBECount() {
|
||||
return idToBackendRef.values().stream().filter(Backend::isAlive).count();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user