[fix](stats)Drop stats or update updated rows after truncate table (#27931)
1. Also clear follower's stats cache when doing drop stats. 2. Drop stats when truncate a table.
This commit is contained in:
@ -2910,6 +2910,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
Database db = (Database) getDbOrDdlException(dbTbl.getDb());
|
||||
OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl());
|
||||
|
||||
long rowsToTruncate = 0;
|
||||
|
||||
BinlogConfig binlogConfig;
|
||||
olapTable.readLock();
|
||||
try {
|
||||
@ -2922,6 +2924,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
origPartitions.put(partName, partition.getId());
|
||||
partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
|
||||
rowsToTruncate += partition.getBaseIndex().getRowCount();
|
||||
}
|
||||
} else {
|
||||
for (Partition partition : olapTable.getPartitions()) {
|
||||
@ -3065,7 +3068,13 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
} finally {
|
||||
olapTable.writeUnlock();
|
||||
}
|
||||
|
||||
if (truncateEntireTable) {
|
||||
// Drop the whole table stats after truncate the entire table
|
||||
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
|
||||
} else {
|
||||
// Update the updated rows in table stats after truncate some partitions.
|
||||
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(olapTable.getId(), rowsToTruncate);
|
||||
}
|
||||
LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames());
|
||||
}
|
||||
|
||||
|
||||
@ -157,6 +157,7 @@ import org.apache.doris.thrift.TGetTabletReplicaInfosRequest;
|
||||
import org.apache.doris.thrift.TGetTabletReplicaInfosResult;
|
||||
import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
|
||||
import org.apache.doris.thrift.TInitExternalCtlMetaResult;
|
||||
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
|
||||
import org.apache.doris.thrift.TListPrivilegesResult;
|
||||
import org.apache.doris.thrift.TListTableMetadataNameIdsResult;
|
||||
import org.apache.doris.thrift.TListTableStatusResult;
|
||||
@ -3109,6 +3110,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
return new TStatus(TStatusCode.OK);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request) throws TException {
|
||||
StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class);
|
||||
Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId, k.idxId, k.colName);
|
||||
return new TStatus(TStatusCode.OK);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TCreatePartitionResult createPartition(TCreatePartitionRequest request) throws TException {
|
||||
LOG.info("Receive create partition request: {}", request);
|
||||
|
||||
@ -719,8 +719,9 @@ public class AnalysisManager implements Writable {
|
||||
tableStats.reset();
|
||||
} else {
|
||||
dropStatsStmt.getColumnNames().forEach(tableStats::removeColumn);
|
||||
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
|
||||
for (String col : cols) {
|
||||
Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col);
|
||||
statisticsCache.syncInvalidate(tblId, -1L, col);
|
||||
}
|
||||
tableStats.updatedTime = 0;
|
||||
}
|
||||
@ -734,9 +735,10 @@ public class AnalysisManager implements Writable {
|
||||
return;
|
||||
}
|
||||
Set<String> cols = table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet());
|
||||
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
|
||||
for (String col : cols) {
|
||||
tableStats.removeColumn(col);
|
||||
Env.getCurrentEnv().getStatisticsCache().invalidate(table.getId(), -1L, col);
|
||||
statisticsCache.syncInvalidate(table.getId(), -1L, col);
|
||||
}
|
||||
tableStats.updatedTime = 0;
|
||||
logCreateTableStats(tableStats);
|
||||
|
||||
@ -93,10 +93,6 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
|
||||
for (AnalysisInfo analysisInfo : analysisInfos) {
|
||||
try {
|
||||
if (needDropStaleStats(analysisInfo)) {
|
||||
Env.getCurrentEnv().getAnalysisManager().dropStats(databaseIf.getTable(analysisInfo.tblId).get());
|
||||
continue;
|
||||
}
|
||||
analysisInfo = getReAnalyzeRequiredPart(analysisInfo);
|
||||
if (analysisInfo == null) {
|
||||
continue;
|
||||
@ -201,30 +197,4 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
|
||||
return new AnalysisInfoBuilder(jobInfo).setColToPartitions(needRunPartitions).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the given table should drop stale stats. User may truncate table,
|
||||
* in this case, we need to drop the stale stats.
|
||||
* @param jobInfo
|
||||
* @return True if you need to drop, false otherwise.
|
||||
*/
|
||||
protected boolean needDropStaleStats(AnalysisInfo jobInfo) {
|
||||
TableIf table = StatisticsUtil
|
||||
.findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId);
|
||||
if (!(table instanceof OlapTable)) {
|
||||
return false;
|
||||
}
|
||||
AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager();
|
||||
TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId());
|
||||
if (tblStats == null) {
|
||||
return false;
|
||||
}
|
||||
if (tblStats.analyzeColumns().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (table.getRowCount() == 0) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.system.Frontend;
|
||||
import org.apache.doris.thrift.FrontendService;
|
||||
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
|
||||
|
||||
@ -138,6 +139,19 @@ public class StatisticsCache {
|
||||
columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
|
||||
}
|
||||
|
||||
public void syncInvalidate(long tblId, long idxId, String colName) {
|
||||
StatisticsCacheKey cacheKey = new StatisticsCacheKey(tblId, idxId, colName);
|
||||
columnStatisticsCache.synchronous().invalidate(cacheKey);
|
||||
TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest();
|
||||
request.key = GsonUtils.GSON.toJson(cacheKey);
|
||||
for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) {
|
||||
if (StatisticsUtil.isMaster(frontend)) {
|
||||
continue;
|
||||
}
|
||||
invalidateStats(frontend, request);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateColStatsCache(long tblId, long idxId, String colName, ColumnStatistic statistic) {
|
||||
columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), Optional.of(statistic));
|
||||
}
|
||||
@ -250,6 +264,22 @@ public class StatisticsCache {
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) {
|
||||
TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort());
|
||||
FrontendService.Client client = null;
|
||||
try {
|
||||
client = ClientPool.frontendPool.borrowObject(address);
|
||||
client.invalidateStatsCache(request);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to sync invalidate to follower: {}", address, t);
|
||||
} finally {
|
||||
if (client != null) {
|
||||
ClientPool.frontendPool.returnObject(address, client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void putCache(StatisticsCacheKey k, ColumnStatistic c) {
|
||||
CompletableFuture<Optional<ColumnStatistic>> f = new CompletableFuture<Optional<ColumnStatistic>>();
|
||||
f.obtrudeValue(Optional.of(c));
|
||||
|
||||
@ -184,6 +184,9 @@ public class StatisticsRepository {
|
||||
}
|
||||
|
||||
public static void dropStatistics(long tblId, Set<String> colNames) throws DdlException {
|
||||
if (colNames == null) {
|
||||
return;
|
||||
}
|
||||
dropStatisticsByColName(tblId, colNames, StatisticConstants.STATISTIC_TBL_NAME);
|
||||
dropStatisticsByColName(tblId, colNames, StatisticConstants.HISTOGRAM_TBL_NAME);
|
||||
}
|
||||
|
||||
@ -356,7 +356,7 @@ public class CacheTest extends TestWithFeService {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvict() {
|
||||
public void testEvict() throws InterruptedException {
|
||||
ThreadPoolExecutor threadPool
|
||||
= ThreadPoolManager.newDaemonFixedThreadPool(
|
||||
1, Integer.MAX_VALUE, "STATS_FETCH", true);
|
||||
@ -377,6 +377,7 @@ public class CacheTest extends TestWithFeService {
|
||||
columnStatisticsCache.get(1);
|
||||
columnStatisticsCache.get(2);
|
||||
Assertions.assertTrue(columnStatisticsCache.synchronous().asMap().containsKey(2));
|
||||
Thread.sleep(100);
|
||||
Assertions.assertEquals(1, columnStatisticsCache.synchronous().asMap().size());
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,7 +27,6 @@ import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.View;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
@ -450,77 +449,4 @@ public class StatisticsAutoCollectorTest {
|
||||
Assertions.assertNotNull(task.getTableSample());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNeedDropStaleStats() {
|
||||
|
||||
TableIf olapTable = new OlapTable();
|
||||
TableIf otherTable = new ExternalTable();
|
||||
|
||||
new MockUp<StatisticsUtil>() {
|
||||
@Mock
|
||||
public TableIf findTable(long catalogId, long dbId, long tblId) {
|
||||
if (tblId == 0) {
|
||||
return olapTable;
|
||||
} else {
|
||||
return otherTable;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
new MockUp<OlapTable>() {
|
||||
int count = 0;
|
||||
|
||||
int[] rowCounts = {100, 100, 100, 0, 0, 0, 0};
|
||||
@Mock
|
||||
public long getRowCount() {
|
||||
return rowCounts[count++];
|
||||
}
|
||||
|
||||
@Mock
|
||||
public List<Column> getBaseSchema() {
|
||||
return Lists.newArrayList(new Column("col1", Type.INT), new Column("col2", Type.INT));
|
||||
}
|
||||
};
|
||||
|
||||
AnalysisInfo analysisInfoOlap = new AnalysisInfoBuilder().setAnalysisMethod(AnalysisMethod.FULL)
|
||||
.setColToPartitions(new HashMap<>())
|
||||
.setAnalysisType(AnalysisType.FUNDAMENTALS)
|
||||
.setColName("col1")
|
||||
.setTblId(0)
|
||||
.setJobType(JobType.SYSTEM).build();
|
||||
|
||||
new MockUp<AnalysisManager>() {
|
||||
int count = 0;
|
||||
|
||||
TableStatsMeta[] tableStatsArr =
|
||||
new TableStatsMeta[] {null,
|
||||
new TableStatsMeta(0, analysisInfoOlap, olapTable),
|
||||
new TableStatsMeta(0, analysisInfoOlap, olapTable)};
|
||||
|
||||
{
|
||||
tableStatsArr[1].updatedRows.addAndGet(100);
|
||||
tableStatsArr[2].updatedRows.addAndGet(0);
|
||||
}
|
||||
|
||||
|
||||
@Mock
|
||||
public TableStatsMeta findTableStatsStatus(long tblId) {
|
||||
return tableStatsArr[count++];
|
||||
}
|
||||
};
|
||||
|
||||
AnalysisInfo analysisInfoOtherTable = new AnalysisInfoBuilder().setAnalysisMethod(AnalysisMethod.FULL)
|
||||
.setColToPartitions(new HashMap<>())
|
||||
.setAnalysisType(AnalysisType.FUNDAMENTALS)
|
||||
.setColName("col1")
|
||||
.setTblId(1)
|
||||
.setJobType(JobType.SYSTEM).build();
|
||||
|
||||
StatisticsAutoCollector statisticsAutoCollector = new StatisticsAutoCollector();
|
||||
Assertions.assertFalse(statisticsAutoCollector.needDropStaleStats(analysisInfoOtherTable));
|
||||
Assertions.assertFalse(statisticsAutoCollector.needDropStaleStats(analysisInfoOlap));
|
||||
Assertions.assertFalse(statisticsAutoCollector.needDropStaleStats(analysisInfoOlap));
|
||||
Assertions.assertTrue(statisticsAutoCollector.needDropStaleStats(analysisInfoOlap));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user