[feature-wip](statistics) update cache when analysis job finished (#14370)
1. Update cache when analysis job finished 2. Rename `StatisticsStorageInitializer` to `InernalSchemaInitializer`
This commit is contained in:
@ -209,7 +209,6 @@ import org.apache.doris.qe.VariableMgr;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.statistics.AnalysisJobScheduler;
|
||||
import org.apache.doris.statistics.StatisticStorageInitializer;
|
||||
import org.apache.doris.statistics.StatisticsCache;
|
||||
import org.apache.doris.statistics.StatisticsJobManager;
|
||||
import org.apache.doris.statistics.StatisticsJobScheduler;
|
||||
@ -1431,7 +1430,7 @@ public class Env {
|
||||
getInternalCatalog().getIcebergTableCreationRecordMgr().start();
|
||||
this.statisticsJobScheduler.start();
|
||||
this.statisticsTaskScheduler.start();
|
||||
new StatisticStorageInitializer().start();
|
||||
new InternalSchemaInitializer().start();
|
||||
}
|
||||
|
||||
// start threads that should running on all FE
|
||||
|
||||
@ -15,7 +15,7 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.ColumnDef;
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
@ -25,14 +25,13 @@ import org.apache.doris.analysis.HashDistributionDesc;
|
||||
import org.apache.doris.analysis.KeysDesc;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.analysis.TypeDef;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
import org.apache.doris.statistics.StatisticConstants;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
@ -46,20 +45,31 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class StatisticStorageInitializer extends Thread {
|
||||
public class InternalSchemaInitializer extends Thread {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(StatisticStorageInitializer.class);
|
||||
private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class);
|
||||
|
||||
public static boolean forTest = false;
|
||||
|
||||
/**
|
||||
* If internal table creation failed, will retry after below seconds.
|
||||
*/
|
||||
public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 1;
|
||||
|
||||
|
||||
public void run() {
|
||||
if (forTest) {
|
||||
return;
|
||||
}
|
||||
while (true) {
|
||||
FrontendNodeType feType = Env.getCurrentEnv().getFeType();
|
||||
if (feType.equals(FrontendNodeType.INIT) || feType.equals(FrontendNodeType.UNKNOWN)) {
|
||||
LOG.warn("FE is not ready");
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
Thread.currentThread()
|
||||
.join(StatisticConstants.STATISTICS_TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L);
|
||||
.join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L);
|
||||
createDB();
|
||||
createTbl();
|
||||
break;
|
||||
@ -67,6 +77,7 @@ public class StatisticStorageInitializer extends Thread {
|
||||
LOG.warn("Statistics storage initiated failed, will try again later", e);
|
||||
}
|
||||
}
|
||||
LOG.info("Internal schema initiated");
|
||||
}
|
||||
|
||||
private void createTbl() throws UserException {
|
||||
@ -77,21 +88,21 @@ public class StatisticStorageInitializer extends Thread {
|
||||
@VisibleForTesting
|
||||
public static void createDB() {
|
||||
CreateDbStmt createDbStmt = new CreateDbStmt(true,
|
||||
ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, StatisticConstants.STATISTIC_DB_NAME),
|
||||
ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, FeConstants.INTERNAL_DB_NAME),
|
||||
null);
|
||||
createDbStmt.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
|
||||
try {
|
||||
Env.getCurrentEnv().createDb(createDbStmt);
|
||||
} catch (DdlException e) {
|
||||
LOG.warn("Failed to create database: {}, will try again later",
|
||||
StatisticConstants.STATISTIC_DB_NAME, e);
|
||||
FeConstants.INTERNAL_DB_NAME, e);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public CreateTableStmt buildStatisticsTblStmt() throws UserException {
|
||||
TableName tableName = new TableName("",
|
||||
StatisticConstants.STATISTIC_DB_NAME, StatisticConstants.STATISTIC_TBL_NAME);
|
||||
FeConstants.INTERNAL_DB_NAME, StatisticConstants.STATISTIC_TBL_NAME);
|
||||
List<ColumnDef> columnDefs = new ArrayList<>();
|
||||
columnDefs.add(new ColumnDef("id", TypeDef.createVarchar(StatisticConstants.ID_LEN)));
|
||||
columnDefs.add(new ColumnDef("catalog_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
|
||||
@ -131,7 +142,7 @@ public class StatisticStorageInitializer extends Thread {
|
||||
@VisibleForTesting
|
||||
public CreateTableStmt buildAnalysisJobTblStmt() throws UserException {
|
||||
TableName tableName = new TableName("",
|
||||
StatisticConstants.STATISTIC_DB_NAME, StatisticConstants.ANALYSIS_JOB_TABLE);
|
||||
FeConstants.INTERNAL_DB_NAME, StatisticConstants.ANALYSIS_JOB_TABLE);
|
||||
List<ColumnDef> columnDefs = new ArrayList<>();
|
||||
columnDefs.add(new ColumnDef("job_id", TypeDef.create(PrimitiveType.BIGINT)));
|
||||
columnDefs.add(new ColumnDef("catalog_name", TypeDef.createVarchar(1024)));
|
||||
@ -74,4 +74,5 @@ public class FeConstants {
|
||||
public static String FS_PREFIX_OBS = "obs";
|
||||
public static String FS_PREFIX_HDFS = "hdfs";
|
||||
public static String FS_PREFIX_FILE = "file";
|
||||
public static final String INTERNAL_DB_NAME = "__internal_schema";
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
@ -139,7 +140,7 @@ public class AnalysisJob {
|
||||
|
||||
public void execute() throws Exception {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("internalDB", StatisticConstants.STATISTIC_DB_NAME);
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("catalogId", String.valueOf(catalog.getId()));
|
||||
params.put("dbId", String.valueOf(db.getId()));
|
||||
@ -178,6 +179,7 @@ public class AnalysisJob {
|
||||
ConnectContext connectContext = StatisticsUtil.buildConnectContext();
|
||||
this.stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName());
|
||||
}
|
||||
|
||||
public int getLastExecTime() {
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.statistics;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo.JobState;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo.JobType;
|
||||
@ -46,7 +47,7 @@ public class AnalysisJobScheduler {
|
||||
private static final Logger LOG = LogManager.getLogger(AnalysisJobScheduler.class);
|
||||
|
||||
private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE "
|
||||
+ StatisticConstants.STATISTIC_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " "
|
||||
+ FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " "
|
||||
+ "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId}";
|
||||
|
||||
private final PriorityQueue<AnalysisJob> systemJobQueue =
|
||||
|
||||
@ -62,11 +62,13 @@ public class AnalysisJobWrapper extends FutureTask<Void> {
|
||||
Env.getCurrentEnv().getAnalysisJobScheduler()
|
||||
.updateJobStatus(job.getJobId(), JobState.FINISHED, "", System.currentTimeMillis());
|
||||
}
|
||||
LOG.warn("{} finished, cost time:{}", job.toString(), System.currentTimeMillis() - startTime);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean cancel() {
|
||||
try {
|
||||
LOG.warn("{} cancelled, cost time:{}", job.toString(), System.currentTimeMillis() - startTime);
|
||||
job.cancel();
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("Cancel job failed job info : %s", job.toString()));
|
||||
|
||||
@ -106,6 +106,8 @@ public class ColumnStatistic {
|
||||
columnStatisticBuilder.setNumNulls(Double.parseDouble(resultRow.getColumnValue("null_count")));
|
||||
columnStatisticBuilder.setDataSize(Double
|
||||
.parseDouble(resultRow.getColumnValue("data_size_in_bytes")));
|
||||
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize()
|
||||
/ columnStatisticBuilder.getCount());
|
||||
long catalogId = Long.parseLong(resultRow.getColumnValue("catalog_id"));
|
||||
long dbID = Long.parseLong(resultRow.getColumnValue("db_id"));
|
||||
long tblId = Long.parseLong(resultRow.getColumnValue("tbl_id"));
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
@ -69,7 +70,7 @@ public class HiveAnalysisJob extends HMSAnalysisJob {
|
||||
List<String> columns = new ArrayList<>();
|
||||
columns.add(col.getName());
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("internalDB", StatisticConstants.STATISTIC_DB_NAME);
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("catalogId", String.valueOf(catalog.getId()));
|
||||
params.put("dbId", String.valueOf(db.getId()));
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
@ -99,7 +100,7 @@ public class IcebergAnalysisJob extends HMSAnalysisJob {
|
||||
|
||||
private void updateStats() throws Exception {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("internalDB", StatisticConstants.STATISTIC_DB_NAME);
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("id", String.valueOf(tbl.getId()) + "-" + String.valueOf(col.getName()));
|
||||
params.put("catalogId", String.valueOf(catalog.getId()));
|
||||
|
||||
@ -20,8 +20,6 @@ package org.apache.doris.statistics;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class StatisticConstants {
|
||||
public static final String STATISTIC_DB_NAME = "__internal_schema";
|
||||
|
||||
public static final String STATISTIC_TBL_NAME = "column_statistics";
|
||||
|
||||
public static final String ANALYSIS_JOB_TABLE = "analysis_jobs";
|
||||
@ -48,10 +46,6 @@ public class StatisticConstants {
|
||||
*/
|
||||
public static final int STATISTIC_CLEAN_INTERVAL_IN_HOURS = 24 * 2;
|
||||
|
||||
/**
|
||||
* If statistics related table creation failed, will retry after below seconds.
|
||||
*/
|
||||
public static final int STATISTICS_TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 5;
|
||||
|
||||
/**
|
||||
* The max cached item in `StatisticsCache`.
|
||||
|
||||
@ -62,4 +62,8 @@ public class StatisticsCache {
|
||||
public void updateCache(long tblId, String colName, ColumnStatistic statistic) {
|
||||
cache.synchronous().put(new StatisticsCacheKey(tblId, colName), statistic);
|
||||
}
|
||||
|
||||
public void refreshSync(long tblId, String colName) {
|
||||
cache.synchronous().refresh(new StatisticsCacheKey(tblId, colName));
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
@ -38,7 +39,7 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(StatisticsCacheLoader.class);
|
||||
|
||||
private static final String QUERY_COLUMN_STATISTICS = "SELECT * FROM " + StatisticConstants.STATISTIC_DB_NAME
|
||||
private static final String QUERY_COLUMN_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME
|
||||
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
|
||||
+ "id = CONCAT('${tblId}', '-', '${colId}')";
|
||||
|
||||
|
||||
@ -24,6 +24,7 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo.JobState;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType;
|
||||
@ -53,7 +54,7 @@ public class StatisticsRepository {
|
||||
private static final Logger LOG = LogManager.getLogger(StatisticsRepository.class);
|
||||
|
||||
private static final String FULL_QUALIFIED_DB_NAME = "`" + SystemInfoService.DEFAULT_CLUSTER + ":"
|
||||
+ StatisticConstants.STATISTIC_DB_NAME + "`";
|
||||
+ FeConstants.INTERNAL_DB_NAME + "`";
|
||||
|
||||
private static final String FULL_QUALIFIED_COLUMN_STATISTICS_NAME = FULL_QUALIFIED_DB_NAME + "."
|
||||
+ "`" + StatisticConstants.STATISTIC_TBL_NAME + "`";
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
@ -98,7 +99,7 @@ public class StatisticsTableCleaner extends MasterDaemon {
|
||||
|
||||
private void deleteExpired(String colName, List<String> constants) {
|
||||
// TODO: must promise count of children of predicate is less than the FE limits.
|
||||
String deleteTemplate = "DELETE FROM " + StatisticConstants.STATISTIC_DB_NAME
|
||||
String deleteTemplate = "DELETE FROM " + FeConstants.INTERNAL_DB_NAME
|
||||
+ "." + StatisticConstants.STATISTIC_TBL_NAME + "WHERE ${colName} NOT IN ${predicate}";
|
||||
StringJoiner predicateBuilder = new StringJoiner(",", "(", ")");
|
||||
constants.forEach(predicateBuilder::add);
|
||||
|
||||
@ -37,6 +37,7 @@ import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
|
||||
@ -114,7 +115,7 @@ public class StatisticsUtil {
|
||||
sessionVariable.setEnableInsertStrict(true);
|
||||
sessionVariable.parallelExecInstanceNum = StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM;
|
||||
connectContext.setEnv(Env.getCurrentEnv());
|
||||
connectContext.setDatabase(StatisticConstants.STATISTIC_DB_NAME);
|
||||
connectContext.setDatabase(FeConstants.INTERNAL_DB_NAME);
|
||||
connectContext.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
|
||||
connectContext.setCurrentUserIdentity(UserIdentity.ROOT);
|
||||
UUID uuid = UUID.randomUUID();
|
||||
|
||||
@ -28,6 +28,7 @@ import org.apache.doris.catalog.ColocateTableIndex;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DiskInfo;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.InternalSchemaInitializer;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
@ -95,6 +96,7 @@ public class TabletRepairAndBalanceTest {
|
||||
|
||||
static {
|
||||
try {
|
||||
InternalSchemaInitializer.forTest = true;
|
||||
tag1 = Tag.create(Tag.TYPE_LOCATION, "zone1");
|
||||
tag2 = Tag.create(Tag.TYPE_LOCATION, "zone2");
|
||||
} catch (AnalysisException e) {
|
||||
@ -314,12 +316,12 @@ public class TabletRepairAndBalanceTest {
|
||||
// check tablet and replica number
|
||||
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
|
||||
Table<Long, Long, Replica> replicaMetaTable = invertedIndex.getReplicaMetaTable();
|
||||
Assert.assertEquals(44, replicaMetaTable.rowKeySet().size());
|
||||
Assert.assertEquals(30, replicaMetaTable.rowKeySet().size());
|
||||
Assert.assertEquals(5, replicaMetaTable.columnKeySet().size());
|
||||
|
||||
// wait all replica reallocating to correct backend
|
||||
checkTableReplicaAllocation(tbl);
|
||||
Assert.assertEquals(104, replicaMetaTable.cellSet().size());
|
||||
Assert.assertEquals(90, replicaMetaTable.cellSet().size());
|
||||
|
||||
// for now, tbl has 3 partitions:
|
||||
// p1: zone1: 1, zone2: 2
|
||||
@ -340,7 +342,7 @@ public class TabletRepairAndBalanceTest {
|
||||
Assert.assertEquals(tag2, be.getLocationTag());
|
||||
ExceptionChecker.expectThrows(UserException.class, () -> tbl.checkReplicaAllocation());
|
||||
checkTableReplicaAllocation(tbl);
|
||||
Assert.assertEquals(104, replicaMetaTable.cellSet().size());
|
||||
Assert.assertEquals(90, replicaMetaTable.cellSet().size());
|
||||
|
||||
// For now, Backends:
|
||||
// [0, 1]: zone1
|
||||
@ -435,7 +437,7 @@ public class TabletRepairAndBalanceTest {
|
||||
ExceptionChecker.expectThrowsNoException(() -> dropTable(dropStmt1));
|
||||
ExceptionChecker.expectThrowsNoException(() -> dropTable(dropStmt2));
|
||||
ExceptionChecker.expectThrowsNoException(() -> dropTable(dropStmt3));
|
||||
Assert.assertEquals(14, replicaMetaTable.size());
|
||||
Assert.assertEquals(0, replicaMetaTable.size());
|
||||
|
||||
// set all backends' tag to default
|
||||
for (int i = 0; i < backends.size(); ++i) {
|
||||
|
||||
@ -19,10 +19,9 @@ package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DiskInfo;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.InternalSchemaInitializer;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -30,7 +29,6 @@ import org.apache.doris.common.ExceptionChecker;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.statistics.StatisticConstants;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.Diagnoser;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
@ -51,7 +49,6 @@ import org.junit.Test;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -75,6 +72,7 @@ public class TabletReplicaTooSlowTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
InternalSchemaInitializer.forTest = true;
|
||||
System.out.println(runningDir);
|
||||
FeConstants.runningUnitTest = true;
|
||||
FeConstants.tablet_checker_interval_ms = 1000;
|
||||
@ -167,19 +165,6 @@ public class TabletReplicaTooSlowTest {
|
||||
+ ")";
|
||||
ExceptionChecker.expectThrowsNoException(() -> createTable(createStr));
|
||||
|
||||
Database db = null;
|
||||
|
||||
do {
|
||||
db = Env.getCurrentEnv().getInternalCatalog()
|
||||
.getDb(SystemInfoService.DEFAULT_CLUSTER + ":" + StatisticConstants.STATISTIC_DB_NAME)
|
||||
.orElse(null);
|
||||
Thread.sleep(100);
|
||||
} while (db == null);
|
||||
Set<Long> replicaIdSet = db.getTables().stream().map(t -> {
|
||||
return (OlapTable) t;
|
||||
}).flatMap(t -> t.getPartitions().stream()).flatMap(p -> p.getBaseIndex().getTablets().stream())
|
||||
.flatMap(t -> t.getReplicas().stream()).map(r -> r.getId()).collect(Collectors.toSet());
|
||||
|
||||
int maxLoop = 300;
|
||||
boolean delete = false;
|
||||
while (maxLoop-- > 0) {
|
||||
@ -187,7 +172,7 @@ public class TabletReplicaTooSlowTest {
|
||||
boolean found = false;
|
||||
for (Table.Cell<Long, Long, Replica> cell : replicaMetaTable.cellSet()) {
|
||||
Replica replica = cell.getValue();
|
||||
if (replica.getVersionCount() == 401 && !replicaIdSet.contains(cell.getValue().getId())) {
|
||||
if (replica.getVersionCount() == 401) {
|
||||
if (replica.tooSlow()) {
|
||||
LOG.info("set to TOO_SLOW.");
|
||||
}
|
||||
|
||||
@ -18,14 +18,11 @@
|
||||
package org.apache.doris.cluster;
|
||||
|
||||
import org.apache.doris.analysis.AlterSystemStmt;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.statistics.StatisticConstants;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
@ -90,17 +87,8 @@ public class DecommissionBackendTest extends TestWithFeService {
|
||||
|
||||
Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size());
|
||||
|
||||
Database db = null;
|
||||
long waitLimitMs = 5 * 1000;
|
||||
do {
|
||||
db = Env.getCurrentEnv().getInternalCatalog()
|
||||
.getDb(SystemInfoService.DEFAULT_CLUSTER + ":" + StatisticConstants.STATISTIC_DB_NAME)
|
||||
.orElse(null);
|
||||
Thread.sleep(100);
|
||||
waitLimitMs -= 100;
|
||||
} while (db == null && waitLimitMs > 0);
|
||||
// For now, we have pre-built internal table: analysis_job and column_statistics
|
||||
Assertions.assertEquals(tabletNum + StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT * 2,
|
||||
Assertions.assertEquals(tabletNum,
|
||||
Env.getCurrentInvertedIndex().getTabletMetaMap().size());
|
||||
|
||||
// 6. add backend
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.InternalSchemaInitializer;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo.JobType;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType;
|
||||
@ -39,7 +40,7 @@ public class AnalysisJobExecutorTest extends TestWithFeService {
|
||||
@Override
|
||||
protected void runBeforeAll() throws Exception {
|
||||
try {
|
||||
StatisticStorageInitializer.createDB();
|
||||
InternalSchemaInitializer.createDB();
|
||||
createDatabase("analysis_job_test");
|
||||
connectContext.setDatabase("default_cluster:analysis_job_test");
|
||||
createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n"
|
||||
@ -47,7 +48,7 @@ public class AnalysisJobExecutorTest extends TestWithFeService {
|
||||
+ "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n"
|
||||
+ "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n"
|
||||
+ ");");
|
||||
StatisticStorageInitializer storageInitializer = new StatisticStorageInitializer();
|
||||
InternalSchemaInitializer storageInitializer = new InternalSchemaInitializer();
|
||||
Env.getCurrentEnv().createTable(storageInitializer.buildAnalysisJobTblStmt());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.InternalSchemaInitializer;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo.JobType;
|
||||
@ -33,21 +34,17 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
public class AnalysisJobTest extends TestWithFeService {
|
||||
|
||||
{
|
||||
StatisticStorageInitializer.forTest = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runBeforeAll() throws Exception {
|
||||
try {
|
||||
StatisticStorageInitializer.createDB();
|
||||
InternalSchemaInitializer.createDB();
|
||||
createDatabase("analysis_job_test");
|
||||
connectContext.setDatabase("default_cluster:analysis_job_test");
|
||||
createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n"
|
||||
+ "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n"
|
||||
+ "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n"
|
||||
+ ");");
|
||||
StatisticStorageInitializer storageInitializer = new StatisticStorageInitializer();
|
||||
InternalSchemaInitializer storageInitializer = new InternalSchemaInitializer();
|
||||
Env.getCurrentEnv().createTable(storageInitializer.buildAnalysisJobTblStmt());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
|
||||
@ -41,6 +41,7 @@ import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DiskInfo;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.InternalSchemaInitializer;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.Table;
|
||||
@ -120,6 +121,7 @@ public abstract class TestWithFeService {
|
||||
|
||||
@BeforeAll
|
||||
public final void beforeAll() throws Exception {
|
||||
InternalSchemaInitializer.forTest = true;
|
||||
beforeCreatingConnectContext();
|
||||
connectContext = createDefaultCtx();
|
||||
createDorisCluster();
|
||||
|
||||
@ -1,11 +1,11 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql --
|
||||
col1 3.0 3.0 0.0 3.0 0.0 6.9895866216790098E18 7.1337018097548657E18 'a' 'c'
|
||||
col2 3.0 3.0 0.0 48.0 0.0 2.0110101E13 2.0130101E13 '2011-01-01' '2013-01-01'
|
||||
id 3.0 3.0 0.0 24.0 0.0 1.0 3.0 1 3
|
||||
col1 3.0 3.0 0.0 3.0 1.0 6.9895866216790098E18 7.1337018097548657E18 'a' 'c'
|
||||
col2 3.0 3.0 0.0 48.0 16.0 2.0110101E13 2.0130101E13 '2011-01-01' '2013-01-01'
|
||||
id 3.0 3.0 0.0 24.0 8.0 1.0 3.0 1 3
|
||||
|
||||
-- !sql2 --
|
||||
col1 114.0 1.48064528E8 0.0 511.0 0.0 3.5308221078584689E18 3.8911100780481085E18 '1' '6'
|
||||
col2 3.0 3.0 0.0 48.0 0.0 2.0110101E13 2.0130101E13 '2011-01-01' '2013-01-01'
|
||||
id 3.0 3.0 0.0 24.0 0.0 1.0 3.0 1 3
|
||||
col1 114.0 1.48064528E8 0.0 511.0 4.482456140350878 3.5308221078584689E18 3.8911100780481085E18 '1' '6'
|
||||
col2 3.0 3.0 0.0 48.0 16.0 2.0110101E13 2.0130101E13 '2011-01-01' '2013-01-01'
|
||||
id 3.0 3.0 0.0 24.0 8.0 1.0 3.0 1 3
|
||||
|
||||
|
||||
Reference in New Issue
Block a user