From 1e6d34d1d066a04e7d7c2381d165563001ff7105 Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Wed, 11 Oct 2023 11:05:16 +0800 Subject: [PATCH] [Enhancement](sql-cache) Add partition update time for hms table and use it at sql-cache. (#24491) Now FE does not record the update time of hms tbl's partitons, so the sql cache may be hit even the hive table's partitions have changed. This pr add a field to record the partition update time, and use it when enable sql-cache. The cache will be missed if any partition has changed at hive side. Use System.currentTimeMillis() but not the event time of hms event because we would better keep the same measurement with the schemaUpdateTime of external table. Add this value to ExternalObjectLog and let slave FEs replay it because it is better to keep the same value with all FEs, so the sql-cache can be hit by the querys through different FEs. --- .../doris/catalog/external/ExternalTable.java | 11 +- .../catalog/external/HMSExternalTable.java | 19 +- .../catalog/external/PaimonExternalTable.java | 2 +- .../doris/common/proc/TablesProcDir.java | 2 +- .../apache/doris/datasource/CatalogMgr.java | 36 +++- .../datasource/ExternalMetaCacheMgr.java | 16 +- .../apache/doris/qe/cache/CacheAnalyzer.java | 2 +- .../org/apache/doris/qe/cache/SqlCache.java | 4 + .../doris/catalog/RefreshTableTest.java | 11 +- .../apache/doris/qe/HmsQueryCacheTest.java | 174 ++++++++++++++++-- 10 files changed, 226 insertions(+), 51 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 73aa70feea..36dbe88967 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -75,8 +75,8 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { protected long timestamp; @SerializedName(value = "dbName") protected String dbName; - @SerializedName(value = "lastUpdateTime") - protected long lastUpdateTime; + // this field will be refreshed after reloading schema + protected volatile long schemaUpdateTime; protected long dbId; protected boolean objectCreated; @@ -296,9 +296,12 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { return 0; } + // return schema update time as default + // override this method if there is some other kinds of update time + // use getSchemaUpdateTime if just need the schema update time @Override public long getUpdateTime() { - return 0; + return this.schemaUpdateTime; } @Override @@ -353,7 +356,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { * @return */ public List initSchemaAndUpdateTime() { - lastUpdateTime = System.currentTimeMillis(); + schemaUpdateTime = System.currentTimeMillis(); return initSchema(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 4f17785b87..5aa55c97dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -111,6 +111,9 @@ public class HMSExternalTable extends ExternalTable { // No as precise as row count in TableStats, but better than none. private long estimatedRowCount = -1; + // record the partition update time when enable hms event listener + protected volatile long partitionUpdateTime; + public enum DLAType { UNKNOWN, HIVE, HUDI, ICEBERG, DELTALAKE } @@ -270,11 +273,6 @@ public class HMSExternalTable extends ExternalTable { return 0; } - @Override - public long getUpdateTime() { - return 0; - } - @Override public long getRowCount() { makeSureInitialized(); @@ -630,6 +628,17 @@ public class HMSExternalTable extends ExternalTable { } } + public void setPartitionUpdateTime(long updateTime) { + this.partitionUpdateTime = updateTime; + } + + @Override + // get the max value of `schemaUpdateTime` and `partitionUpdateTime` + // partitionUpdateTime will be refreshed after processing partition events with hms event listener enabled + public long getUpdateTime() { + return Math.max(this.schemaUpdateTime, this.partitionUpdateTime); + } + @Override public void gsonPostProcess() throws IOException { super.gsonPostProcess(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java index 16cfa76e63..2ad593b1b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java @@ -56,7 +56,7 @@ public class PaimonExternalTable extends ExternalTable { super.makeSureInitialized(); if (!objectCreated) { originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); - lastUpdateTime = System.currentTimeMillis(); + schemaUpdateTime = System.currentTimeMillis(); objectCreated = true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java index be51b157de..1b11a9a91b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java @@ -129,7 +129,7 @@ public class TablesProcDir implements ProcDirInterface { tableInfo.add(FeConstants.null_string); tableInfo.add(replicaCount); } - tableInfo.add(TimeUtils.longToTimeString(table.getLastUpdateTime())); + tableInfo.add(TimeUtils.longToTimeString(table.getUpdateTime())); tableInfos.add(tableInfo); } finally { table.readUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index b11cafd8bd..0dc49e01f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.Resource.ReferenceType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; @@ -928,14 +929,21 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } return; } + if (!(table instanceof HMSExternalTable)) { + LOG.warn("only support HMSTable"); + return; + } - Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), - (ExternalTable) table, partitionNames); + HMSExternalTable hmsTable = (HMSExternalTable) table; + Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames); + long lastPartitionUpdateTime = System.currentTimeMillis(); + hmsTable.setPartitionUpdateTime(lastPartitionUpdateTime); ExternalObjectLog log = new ExternalObjectLog(); log.setCatalogId(catalog.getId()); log.setDbId(db.getId()); log.setTableId(table.getId()); log.setPartitionNames(partitionNames); + log.setLastUpdateTime(lastPartitionUpdateTime); Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log); } @@ -957,9 +965,16 @@ public class CatalogMgr implements Writable, GsonPostProcessable { LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); return; } + if (!(table instanceof HMSExternalTable)) { + LOG.warn("only support HMSTable"); + return; + } + + HMSExternalTable hmsTable = (HMSExternalTable) table; try { Env.getCurrentEnv().getExtMetaCacheMgr() - .addPartitionsCache(catalog.getId(), table, log.getPartitionNames()); + .addPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames()); + hmsTable.setPartitionUpdateTime(log.getLastUpdateTime()); } catch (HMSClientException e) { LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e); Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), @@ -998,6 +1013,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { log.setDbId(db.getId()); log.setTableId(table.getId()); log.setPartitionNames(partitionNames); + log.setLastUpdateTime(System.currentTimeMillis()); replayDropExternalPartitions(log); Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log); } @@ -1020,8 +1036,14 @@ public class CatalogMgr implements Writable, GsonPostProcessable { LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); return; } + if (!(table instanceof HMSExternalTable)) { + LOG.warn("only support HMSTable"); + return; + } + HMSExternalTable hmsTable = (HMSExternalTable) table; Env.getCurrentEnv().getExtMetaCacheMgr() - .dropPartitionsCache(catalog.getId(), table, log.getPartitionNames()); + .dropPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames()); + hmsTable.setPartitionUpdateTime(log.getLastUpdateTime()); } public void refreshExternalPartitions(String catalogName, String dbName, String tableName, @@ -1058,6 +1080,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { log.setDbId(db.getId()); log.setTableId(table.getId()); log.setPartitionNames(partitionNames); + log.setLastUpdateTime(System.currentTimeMillis()); replayRefreshExternalPartitions(log); Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log); } @@ -1080,9 +1103,14 @@ public class CatalogMgr implements Writable, GsonPostProcessable { LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); return; } + if (!(table instanceof HMSExternalTable)) { + LOG.warn("only support HMSTable"); + return; + } Env.getCurrentEnv().getExtMetaCacheMgr() .invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(), log.getPartitionNames()); + ((HMSExternalTable) table).setPartitionUpdateTime(log.getLastUpdateTime()); } public void registerCatalogRefreshListener(Env env) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 77254fd633..03a46c625e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -17,7 +17,6 @@ package org.apache.doris.datasource; -import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; @@ -159,25 +158,16 @@ public class ExternalMetaCacheMgr { LOG.debug("invalid catalog cache for {}", catalogId); } - public void addPartitionsCache(long catalogId, ExternalTable table, List partitionNames) { - if (!(table instanceof HMSExternalTable)) { - LOG.warn("only support HMSTable"); - return; - } + public void addPartitionsCache(long catalogId, HMSExternalTable table, List partitionNames) { String dbName = ClusterNamespace.getNameFromFullName(table.getDbName()); HiveMetaStoreCache metaCache = cacheMap.get(catalogId); if (metaCache != null) { - metaCache.addPartitionsCache(dbName, table.getName(), partitionNames, - ((HMSExternalTable) table).getPartitionColumnTypes()); + metaCache.addPartitionsCache(dbName, table.getName(), partitionNames, table.getPartitionColumnTypes()); } LOG.debug("add partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId); } - public void dropPartitionsCache(long catalogId, ExternalTable table, List partitionNames) { - if (!(table instanceof HMSExternalTable)) { - LOG.warn("only support HMSTable"); - return; - } + public void dropPartitionsCache(long catalogId, HMSExternalTable table, List partitionNames) { String dbName = ClusterNamespace.getNameFromFullName(table.getDbName()); HiveMetaStoreCache metaCache = cacheMap.get(catalogId); if (metaCache != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java index 981b95c759..1527cdbeac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -607,7 +607,7 @@ public class CacheAnalyzer { CacheTable cacheTable = new CacheTable(); cacheTable.table = node.getTargetTable(); cacheTable.partitionNum = node.getReadPartitionNum(); - cacheTable.latestTime = cacheTable.table.getLastUpdateTime(); + cacheTable.latestTime = cacheTable.table.getUpdateTime(); return cacheTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java index 536a507ba2..803e80da0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java @@ -55,6 +55,10 @@ public class SqlCache extends Cache { return cacheKey; } + public long getLatestTime() { + return latestTable.latestTime; + } + public long getSumOfPartitionNum() { return latestTable.sumOfPartitionNum; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/RefreshTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/RefreshTableTest.java index 305bfa32c4..a37f1ca98f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/RefreshTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/RefreshTableTest.java @@ -69,11 +69,11 @@ public class RefreshTableTest extends TestWithFeService { CatalogIf test1 = env.getCatalogMgr().getCatalog("test1"); TestExternalTable table = (TestExternalTable) test1.getDbNullable("db1").getTable("tbl11").get(); Assertions.assertFalse(table.isObjectCreated()); - long l1 = table.getLastUpdateTime(); + long l1 = table.getSchemaUpdateTime(); Assertions.assertTrue(l1 == 0); table.makeSureInitialized(); Assertions.assertTrue(table.isObjectCreated()); - long l2 = table.getLastUpdateTime(); + long l2 = table.getSchemaUpdateTime(); Assertions.assertTrue(l2 == l1); RefreshTableStmt refreshTableStmt = new RefreshTableStmt(new TableName("test1", "db1", "tbl11")); try { @@ -82,12 +82,15 @@ public class RefreshTableTest extends TestWithFeService { // Do nothing } Assertions.assertFalse(table.isObjectCreated()); - long l3 = table.getLastUpdateTime(); + long l3 = table.getSchemaUpdateTime(); Assertions.assertTrue(l3 == l2); table.getFullSchema(); // only table.getFullSchema() can change table.lastUpdateTime - long l4 = table.getLastUpdateTime(); + long l4 = table.getSchemaUpdateTime(); Assertions.assertTrue(l4 > l3); + // updateTime is equal to schema update time as default + long l5 = table.getUpdateTime(); + Assertions.assertTrue(l5 == l4); } public static class RefreshTableProvider implements TestExternalCatalog.TestCatalogProvider { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java index bb8bc4c1be..7af353f23a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java @@ -64,6 +64,8 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { @Mocked private HMSExternalTable tbl; @Mocked + private HMSExternalTable tbl2; + @Mocked private HMSExternalTable view1; @Mocked private HMSExternalTable view2; @@ -73,6 +75,8 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { private HiveScanNode hiveScanNode2; @Mocked private HiveScanNode hiveScanNode3; + @Mocked + private HiveScanNode hiveScanNode4; @Override protected void runBeforeAll() throws Exception { @@ -150,12 +154,55 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { minTimes = 0; result = DLAType.HIVE; - tbl.getLastUpdateTime(); + tbl.getUpdateTime(); minTimes = 0; result = NOW; } }; + Deencapsulation.setField(tbl2, "objectCreated", true); + Deencapsulation.setField(tbl2, "rwLock", new ReentrantReadWriteLock(true)); + + new Expectations(tbl2) { + { + tbl2.getId(); + minTimes = 0; + result = 10004; + + tbl2.getName(); + minTimes = 0; + result = "hms_tbl2"; + + tbl2.getDbName(); + minTimes = 0; + result = "hms_db"; + + tbl2.getFullSchema(); + minTimes = 0; + result = schema; + + tbl2.isSupportedHmsTable(); + minTimes = 0; + result = true; + + tbl2.isView(); + minTimes = 0; + result = false; + + tbl2.getType(); + minTimes = 0; + result = TableIf.TableType.HMS_EXTERNAL_TABLE; + + tbl2.getDlaType(); + minTimes = 0; + result = DLAType.HIVE; + + // mock init schema and do nothing + tbl2.initSchema(); + minTimes = 0; + } + }; + Deencapsulation.setField(view1, "objectCreated", true); Deencapsulation.setField(view1, "rwLock", new ReentrantReadWriteLock(true)); @@ -201,7 +248,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { minTimes = 0; result = DLAType.HIVE; - view1.getLastUpdateTime(); + view1.getUpdateTime(); minTimes = 0; result = NOW; } @@ -251,13 +298,14 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { minTimes = 0; result = DLAType.HIVE; - view2.getLastUpdateTime(); + view2.getUpdateTime(); minTimes = 0; result = NOW; } }; db.addTableForTest(tbl); + db.addTableForTest(tbl2); db.addTableForTest(view1); db.addTableForTest(view2); hmsCatalog.addDatabaseForTest(db); @@ -286,6 +334,14 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { } }; + new Expectations(hiveScanNode4) { + { + hiveScanNode4.getTargetTable(); + minTimes = 0; + result = tbl2; + } + }; + TupleDescriptor desc = new TupleDescriptor(new TupleId(1)); desc.setTable(mgr.getInternalCatalog().getDbNullable("default_cluster:test").getTableNullable("tbl1")); olapScanNode = new OlapScanNode(new PlanNodeId(1), desc, "tb1ScanNode"); @@ -297,18 +353,94 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_tbl", connectContext); List scanNodes = Arrays.asList(hiveScanNode1); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); - ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2); + ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + SqlCache sqlCache = (SqlCache) ca.getCache(); + Assert.assertEquals(sqlCache.getLatestTime(), NOW); } @Test - public void testHitSqlCacheByNereids() throws Exception { + public void testHitSqlCacheAfterPartitionUpdateTimeChanged() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_tbl2", connectContext); + List scanNodes = Arrays.asList(hiveScanNode4); + + // invoke initSchemaAndUpdateTime first and init schemaUpdateTime + tbl2.initSchemaAndUpdateTime(); + + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + SqlCache sqlCache1 = (SqlCache) ca.getCache(); + + // latestTime is equals to the schema update time if not set partition update time + Assert.assertEquals(sqlCache1.getLatestTime(), tbl2.getSchemaUpdateTime()); + + // wait a second and set partition update time + try { + Thread.sleep(1000); + } catch (Throwable throwable) { + // do nothing + } + long later = System.currentTimeMillis(); + tbl2.setPartitionUpdateTime(later); + + // check cache mode again + ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); + SqlCache sqlCache2 = (SqlCache) ca.getCache(); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + + // the latest time will be changed and is equals to the partition update time + Assert.assertEquals(later, sqlCache2.getLatestTime()); + Assert.assertTrue(sqlCache2.getLatestTime() > sqlCache1.getLatestTime()); + } + + @Test + public void testHitSqlCacheByNereids() { init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_tbl", connectContext); List scanNodes = Arrays.asList(hiveScanNode1); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); - ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2); + ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + SqlCache sqlCache = (SqlCache) ca.getCache(); + Assert.assertEquals(sqlCache.getLatestTime(), NOW); + } + + @Test + public void testHitSqlCacheByNereidsAfterPartitionUpdateTimeChanged() { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_tbl2", connectContext); + List scanNodes = Arrays.asList(hiveScanNode4); + + // invoke initSchemaAndUpdateTime first and init schemaUpdateTime + tbl2.initSchemaAndUpdateTime(); + + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + SqlCache sqlCache1 = (SqlCache) ca.getCache(); + + // latestTime is equals to the schema update time if not set partition update time + Assert.assertEquals(sqlCache1.getLatestTime(), tbl2.getSchemaUpdateTime()); + + // wait a second and set partition update time + try { + Thread.sleep(1000); + } catch (Throwable throwable) { + // do nothing + } + long later = System.currentTimeMillis(); + tbl2.setPartitionUpdateTime(later); + + // check cache mode again + ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); + SqlCache sqlCache2 = (SqlCache) ca.getCache(); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + + // the latest time will be changed and is equals to the partition update time + Assert.assertEquals(later, sqlCache2.getLatestTime()); + Assert.assertTrue(sqlCache2.getLatestTime() > sqlCache1.getLatestTime()); } @Test @@ -317,18 +449,22 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_view1", connectContext); List scanNodes = Arrays.asList(hiveScanNode2); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); - ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2); + ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + SqlCache sqlCache = (SqlCache) ca.getCache(); + Assert.assertEquals(sqlCache.getLatestTime(), NOW); } @Test - public void testHitSqlCacheWithHiveViewByNereids() throws Exception { + public void testHitSqlCacheWithHiveViewByNereids() { init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_view1", connectContext); List scanNodes = Arrays.asList(hiveScanNode2); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); - ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2); + ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + SqlCache sqlCache = (SqlCache) ca.getCache(); + Assert.assertEquals(sqlCache.getLatestTime(), NOW); } @Test @@ -337,27 +473,29 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_view2", connectContext); List scanNodes = Arrays.asList(hiveScanNode3); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); - ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2); + ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); SqlCache sqlCache = (SqlCache) ca.getCache(); String cacheKey = sqlCache.getSqlWithViewStmt(); Assert.assertEquals(cacheKey, "SELECT `hms_ctl`.`default_cluster:hms_db`.`hms_view2`.`k1` AS `k1` " + "FROM `hms_ctl`.`default_cluster:hms_db`.`hms_view2`" + "|SELECT * FROM hms_db.hms_tbl|SELECT * FROM hms_db.hms_view1"); + Assert.assertEquals(sqlCache.getLatestTime(), NOW); } @Test - public void testHitSqlCacheWithNestedHiveViewByNereids() throws Exception { + public void testHitSqlCacheWithNestedHiveViewByNereids() { init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_view2", connectContext); List scanNodes = Arrays.asList(hiveScanNode3); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); - ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2); + ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); SqlCache sqlCache = (SqlCache) ca.getCache(); String cacheKey = sqlCache.getSqlWithViewStmt(); Assert.assertEquals(cacheKey, "select * from hms_ctl.hms_db.hms_view2" + "|SELECT * FROM hms_db.hms_tbl|SELECT * FROM hms_db.hms_view1"); + Assert.assertEquals(sqlCache.getLatestTime(), NOW); } @Test @@ -366,17 +504,17 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_tbl", connectContext); List scanNodes = Arrays.asList(hiveScanNode1); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); - ca.checkCacheMode(NOW); + ca.checkCacheMode(0); Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); } @Test - public void testNotHitSqlCacheByNereids() throws Exception { + public void testNotHitSqlCacheByNereids() { init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_tbl", connectContext); List scanNodes = Arrays.asList(hiveScanNode1); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); - ca.checkCacheModeForNereids(NOW); + ca.checkCacheModeForNereids(0); Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); } @@ -388,19 +526,19 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { + "inner join internal.test.tbl1", connectContext); List scanNodes = Arrays.asList(hiveScanNode1, olapScanNode); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); - ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2); + ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); } @Test - public void testNotHitSqlCacheWithFederatedQueryByNereids() throws Exception { + public void testNotHitSqlCacheWithFederatedQueryByNereids() { init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); // cache mode is None if this query is a federated query StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_tbl " + "inner join internal.test.tbl1", connectContext); List scanNodes = Arrays.asList(hiveScanNode1, olapScanNode); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); - ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2); + ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); } }