From 02512cd0e253a7e4f9585cf07424c36a56e98a62 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Tue, 5 Dec 2023 14:53:35 +0800 Subject: [PATCH] [fix](stats)Drop stats or update updated rows after truncate table (#27931) 1. Also clear follower's stats cache when doing drop stats. 2. Drop stats when truncate a table. --- .../doris/datasource/InternalCatalog.java | 11 ++- .../doris/service/FrontendServiceImpl.java | 8 ++ .../doris/statistics/AnalysisManager.java | 6 +- .../statistics/StatisticsAutoCollector.java | 30 -------- .../doris/statistics/StatisticsCache.java | 30 ++++++++ .../statistics/StatisticsRepository.java | 3 + .../apache/doris/statistics/CacheTest.java | 3 +- .../StatisticsAutoCollectorTest.java | 74 ------------------- gensrc/thrift/FrontendService.thrift | 6 ++ .../suites/statistics/analyze_stats.groovy | 11 +++ 10 files changed, 74 insertions(+), 108 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index d0ba1f5a94..3a080d2b28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2910,6 +2910,8 @@ public class InternalCatalog implements CatalogIf { Database db = (Database) getDbOrDdlException(dbTbl.getDb()); OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl()); + long rowsToTruncate = 0; + BinlogConfig binlogConfig; olapTable.readLock(); try { @@ -2922,6 +2924,7 @@ public class InternalCatalog implements CatalogIf { } origPartitions.put(partName, partition.getId()); partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo()); + rowsToTruncate += partition.getBaseIndex().getRowCount(); } } else { for (Partition partition : olapTable.getPartitions()) { @@ -3065,7 +3068,13 @@ public class InternalCatalog implements CatalogIf { } finally { olapTable.writeUnlock(); } - + if (truncateEntireTable) { + // Drop the whole table stats after truncate the entire table + Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable); + } else { + // Update the updated rows in table stats after truncate some partitions. + Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(olapTable.getId(), rowsToTruncate); + } LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index c813ef8ca5..264bc0cdd4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -157,6 +157,7 @@ import org.apache.doris.thrift.TGetTabletReplicaInfosRequest; import org.apache.doris.thrift.TGetTabletReplicaInfosResult; import org.apache.doris.thrift.TInitExternalCtlMetaRequest; import org.apache.doris.thrift.TInitExternalCtlMetaResult; +import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; import org.apache.doris.thrift.TListPrivilegesResult; import org.apache.doris.thrift.TListTableMetadataNameIdsResult; import org.apache.doris.thrift.TListTableStatusResult; @@ -3109,6 +3110,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { return new TStatus(TStatusCode.OK); } + @Override + public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request) throws TException { + StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class); + Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId, k.idxId, k.colName); + return new TStatus(TStatusCode.OK); + } + @Override public TCreatePartitionResult createPartition(TCreatePartitionRequest request) throws TException { LOG.info("Receive create partition request: {}", request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 01e48f422f..4f62c3b875 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -719,8 +719,9 @@ public class AnalysisManager implements Writable { tableStats.reset(); } else { dropStatsStmt.getColumnNames().forEach(tableStats::removeColumn); + StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache(); for (String col : cols) { - Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col); + statisticsCache.syncInvalidate(tblId, -1L, col); } tableStats.updatedTime = 0; } @@ -734,9 +735,10 @@ public class AnalysisManager implements Writable { return; } Set cols = table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet()); + StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache(); for (String col : cols) { tableStats.removeColumn(col); - Env.getCurrentEnv().getStatisticsCache().invalidate(table.getId(), -1L, col); + statisticsCache.syncInvalidate(table.getId(), -1L, col); } tableStats.updatedTime = 0; logCreateTableStats(tableStats); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 8fd3be4b6f..244b1059d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -93,10 +93,6 @@ public class StatisticsAutoCollector extends StatisticsCollector { List analysisInfos = constructAnalysisInfo(databaseIf); for (AnalysisInfo analysisInfo : analysisInfos) { try { - if (needDropStaleStats(analysisInfo)) { - Env.getCurrentEnv().getAnalysisManager().dropStats(databaseIf.getTable(analysisInfo.tblId).get()); - continue; - } analysisInfo = getReAnalyzeRequiredPart(analysisInfo); if (analysisInfo == null) { continue; @@ -201,30 +197,4 @@ public class StatisticsAutoCollector extends StatisticsCollector { return new AnalysisInfoBuilder(jobInfo).setColToPartitions(needRunPartitions).build(); } - - /** - * Check if the given table should drop stale stats. User may truncate table, - * in this case, we need to drop the stale stats. - * @param jobInfo - * @return True if you need to drop, false otherwise. - */ - protected boolean needDropStaleStats(AnalysisInfo jobInfo) { - TableIf table = StatisticsUtil - .findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId); - if (!(table instanceof OlapTable)) { - return false; - } - AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); - TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId()); - if (tblStats == null) { - return false; - } - if (tblStats.analyzeColumns().isEmpty()) { - return false; - } - if (table.getRowCount() == 0) { - return true; - } - return false; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 84110d5bda..d4b91b0736 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -27,6 +27,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.system.Frontend; import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest; @@ -138,6 +139,19 @@ public class StatisticsCache { columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName)); } + public void syncInvalidate(long tblId, long idxId, String colName) { + StatisticsCacheKey cacheKey = new StatisticsCacheKey(tblId, idxId, colName); + columnStatisticsCache.synchronous().invalidate(cacheKey); + TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest(); + request.key = GsonUtils.GSON.toJson(cacheKey); + for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) { + if (StatisticsUtil.isMaster(frontend)) { + continue; + } + invalidateStats(frontend, request); + } + } + public void updateColStatsCache(long tblId, long idxId, String colName, ColumnStatistic statistic) { columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), Optional.of(statistic)); } @@ -250,6 +264,22 @@ public class StatisticsCache { } } + @VisibleForTesting + public void invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) { + TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort()); + FrontendService.Client client = null; + try { + client = ClientPool.frontendPool.borrowObject(address); + client.invalidateStatsCache(request); + } catch (Throwable t) { + LOG.warn("Failed to sync invalidate to follower: {}", address, t); + } finally { + if (client != null) { + ClientPool.frontendPool.returnObject(address, client); + } + } + } + public void putCache(StatisticsCacheKey k, ColumnStatistic c) { CompletableFuture> f = new CompletableFuture>(); f.obtrudeValue(Optional.of(c)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index 29e11ac75a..4512bf1fef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -184,6 +184,9 @@ public class StatisticsRepository { } public static void dropStatistics(long tblId, Set colNames) throws DdlException { + if (colNames == null) { + return; + } dropStatisticsByColName(tblId, colNames, StatisticConstants.STATISTIC_TBL_NAME); dropStatisticsByColName(tblId, colNames, StatisticConstants.HISTOGRAM_TBL_NAME); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java index 9fe8b09492..0d968167b7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java @@ -356,7 +356,7 @@ public class CacheTest extends TestWithFeService { } @Test - public void testEvict() { + public void testEvict() throws InterruptedException { ThreadPoolExecutor threadPool = ThreadPoolManager.newDaemonFixedThreadPool( 1, Integer.MAX_VALUE, "STATS_FETCH", true); @@ -377,6 +377,7 @@ public class CacheTest extends TestWithFeService { columnStatisticsCache.get(1); columnStatisticsCache.get(2); Assertions.assertTrue(columnStatisticsCache.synchronous().asMap().containsKey(2)); + Thread.sleep(100); Assertions.assertEquals(1, columnStatisticsCache.synchronous().asMap().size()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java index 5647520176..fd7eaeaad9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java @@ -27,7 +27,6 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.View; -import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -450,77 +449,4 @@ public class StatisticsAutoCollectorTest { Assertions.assertNotNull(task.getTableSample()); } } - - @Test - public void testNeedDropStaleStats() { - - TableIf olapTable = new OlapTable(); - TableIf otherTable = new ExternalTable(); - - new MockUp() { - @Mock - public TableIf findTable(long catalogId, long dbId, long tblId) { - if (tblId == 0) { - return olapTable; - } else { - return otherTable; - } - } - }; - - new MockUp() { - int count = 0; - - int[] rowCounts = {100, 100, 100, 0, 0, 0, 0}; - @Mock - public long getRowCount() { - return rowCounts[count++]; - } - - @Mock - public List getBaseSchema() { - return Lists.newArrayList(new Column("col1", Type.INT), new Column("col2", Type.INT)); - } - }; - - AnalysisInfo analysisInfoOlap = new AnalysisInfoBuilder().setAnalysisMethod(AnalysisMethod.FULL) - .setColToPartitions(new HashMap<>()) - .setAnalysisType(AnalysisType.FUNDAMENTALS) - .setColName("col1") - .setTblId(0) - .setJobType(JobType.SYSTEM).build(); - - new MockUp() { - int count = 0; - - TableStatsMeta[] tableStatsArr = - new TableStatsMeta[] {null, - new TableStatsMeta(0, analysisInfoOlap, olapTable), - new TableStatsMeta(0, analysisInfoOlap, olapTable)}; - - { - tableStatsArr[1].updatedRows.addAndGet(100); - tableStatsArr[2].updatedRows.addAndGet(0); - } - - - @Mock - public TableStatsMeta findTableStatsStatus(long tblId) { - return tableStatsArr[count++]; - } - }; - - AnalysisInfo analysisInfoOtherTable = new AnalysisInfoBuilder().setAnalysisMethod(AnalysisMethod.FULL) - .setColToPartitions(new HashMap<>()) - .setAnalysisType(AnalysisType.FUNDAMENTALS) - .setColName("col1") - .setTblId(1) - .setJobType(JobType.SYSTEM).build(); - - StatisticsAutoCollector statisticsAutoCollector = new StatisticsAutoCollector(); - Assertions.assertFalse(statisticsAutoCollector.needDropStaleStats(analysisInfoOtherTable)); - Assertions.assertFalse(statisticsAutoCollector.needDropStaleStats(analysisInfoOlap)); - Assertions.assertFalse(statisticsAutoCollector.needDropStaleStats(analysisInfoOlap)); - Assertions.assertTrue(statisticsAutoCollector.needDropStaleStats(analysisInfoOlap)); - } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index a749c5f0b5..19a8437fb5 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1155,6 +1155,10 @@ struct TUpdateFollowerStatsCacheRequest { 2: list statsRows; } +struct TInvalidateFollowerStatsCacheRequest { + 1: optional string key; +} + struct TAutoIncrementRangeRequest { 1: optional i64 db_id; 2: optional i64 table_id; @@ -1378,4 +1382,6 @@ service FrontendService { TGetBackendMetaResult getBackendMeta(1: TGetBackendMetaRequest request) TGetColumnInfoResult getColumnInfo(1: TGetColumnInfoRequest request) + + Status.TStatus invalidateStatsCache(1: TInvalidateFollowerStatsCacheRequest request) } diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index b57c088156..333772972e 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -1243,4 +1243,15 @@ PARTITION `p599` VALUES IN (599) } assert all_finished(show_result) + + // Test truncate table will drop table stats too. + sql """ANALYZE TABLE ${tbl} WITH SYNC""" + def result_before_truncate = sql """show column stats ${tbl}""" + assertEquals(14, result_before_truncate.size()) + sql """TRUNCATE TABLE ${tbl}""" + def result_after_truncate = sql """show column stats ${tbl}""" + assertEquals(0, result_after_truncate.size()) + result_after_truncate = sql """show column cached stats ${tbl}""" + assertEquals(0, result_after_truncate.size()) + }