[fix](Nereids) fix sql cache return old value when truncate partition (#34698)
1. fix sql cache return old value when truncate partition 2. use expire_sql_cache_in_fe_second to control the expire time of the sql cache which in the NereidsSqlCacheManager
This commit is contained in:
@ -1306,12 +1306,18 @@ public class Config extends ConfigBase {
|
||||
* Minimum interval between last version when caching results,
|
||||
* This parameter distinguishes between offline and real-time updates
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = false)
|
||||
public static int cache_last_version_interval_second = 30;
|
||||
|
||||
/**
|
||||
* Expire sql sql in frontend time
|
||||
*/
|
||||
@ConfField(
|
||||
mutable = true,
|
||||
masterOnly = false,
|
||||
callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig"
|
||||
)
|
||||
public static int cache_last_version_interval_second = 30;
|
||||
public static int expire_sql_cache_in_fe_second = 300;
|
||||
|
||||
/**
|
||||
* Set the maximum number of rows that can be cached
|
||||
|
||||
@ -771,9 +771,7 @@ public class Env {
|
||||
this.mtmvService = new MTMVService();
|
||||
this.insertOverwriteManager = new InsertOverwriteManager();
|
||||
this.dnsCache = new DNSCache();
|
||||
this.sqlCacheManager = new NereidsSqlCacheManager(
|
||||
Config.sql_cache_manage_num, Config.cache_last_version_interval_second
|
||||
);
|
||||
this.sqlCacheManager = new NereidsSqlCacheManager();
|
||||
}
|
||||
|
||||
public static void destroyCheckpoint() {
|
||||
|
||||
@ -74,8 +74,11 @@ public class NereidsSqlCacheManager {
|
||||
// value: SqlCacheContext
|
||||
private volatile Cache<String, SqlCacheContext> sqlCaches;
|
||||
|
||||
public NereidsSqlCacheManager(int sqlCacheNum, long cacheIntervalSeconds) {
|
||||
sqlCaches = buildSqlCaches(sqlCacheNum, cacheIntervalSeconds);
|
||||
public NereidsSqlCacheManager() {
|
||||
sqlCaches = buildSqlCaches(
|
||||
Config.sql_cache_manage_num,
|
||||
Config.expire_sql_cache_in_fe_second
|
||||
);
|
||||
}
|
||||
|
||||
public static synchronized void updateConfig() {
|
||||
@ -90,22 +93,24 @@ public class NereidsSqlCacheManager {
|
||||
|
||||
Cache<String, SqlCacheContext> sqlCaches = buildSqlCaches(
|
||||
Config.sql_cache_manage_num,
|
||||
Config.cache_last_version_interval_second
|
||||
Config.expire_sql_cache_in_fe_second
|
||||
);
|
||||
sqlCaches.putAll(sqlCacheManager.sqlCaches.asMap());
|
||||
sqlCacheManager.sqlCaches = sqlCaches;
|
||||
}
|
||||
|
||||
private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long cacheIntervalSeconds) {
|
||||
sqlCacheNum = sqlCacheNum < 0 ? 100 : sqlCacheNum;
|
||||
cacheIntervalSeconds = cacheIntervalSeconds < 0 ? 30 : cacheIntervalSeconds;
|
||||
|
||||
return Caffeine.newBuilder()
|
||||
.maximumSize(sqlCacheNum)
|
||||
.expireAfterAccess(Duration.ofSeconds(cacheIntervalSeconds))
|
||||
private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long expireAfterAccessSeconds) {
|
||||
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
|
||||
// auto evict cache when jvm memory too low
|
||||
.softValues()
|
||||
.build();
|
||||
.softValues();
|
||||
if (sqlCacheNum > 0) {
|
||||
cacheBuilder = cacheBuilder.maximumSize(sqlCacheNum);
|
||||
}
|
||||
if (expireAfterAccessSeconds > 0) {
|
||||
cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireAfterAccessSeconds));
|
||||
}
|
||||
|
||||
return cacheBuilder.build();
|
||||
}
|
||||
|
||||
/** tryAddFeCache */
|
||||
@ -237,9 +242,6 @@ public class NereidsSqlCacheManager {
|
||||
}
|
||||
|
||||
private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
|
||||
long latestPartitionTime = sqlCacheContext.getLatestPartitionTime();
|
||||
long latestPartitionVersion = sqlCacheContext.getLatestPartitionVersion();
|
||||
|
||||
if (sqlCacheContext.hasUnsupportedTables()) {
|
||||
return true;
|
||||
}
|
||||
@ -255,7 +257,7 @@ public class NereidsSqlCacheManager {
|
||||
long cacheTableTime = scanTable.latestTimestamp;
|
||||
long currentTableVersion = olapTable.getVisibleVersion();
|
||||
long cacheTableVersion = scanTable.latestVersion;
|
||||
// some partitions have been dropped, or delete or update or insert rows into new partition?
|
||||
// some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition?
|
||||
if (currentTableTime > cacheTableTime
|
||||
|| (currentTableTime == cacheTableTime && currentTableVersion > cacheTableVersion)) {
|
||||
return true;
|
||||
@ -264,9 +266,7 @@ public class NereidsSqlCacheManager {
|
||||
for (Long scanPartitionId : scanTable.getScanPartitions()) {
|
||||
Partition partition = olapTable.getPartition(scanPartitionId);
|
||||
// partition == null: is this partition truncated?
|
||||
if (partition == null || partition.getVisibleVersionTime() > latestPartitionTime
|
||||
|| (partition.getVisibleVersionTime() == latestPartitionTime
|
||||
&& partition.getVisibleVersion() > latestPartitionVersion)) {
|
||||
if (partition == null) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -701,11 +701,11 @@ public class CacheAnalyzer {
|
||||
scanTables.add(scanTable);
|
||||
for (Long partitionId : node.getSelectedPartitionIds()) {
|
||||
Partition partition = olapTable.getPartition(partitionId);
|
||||
scanTable.addScanPartition(partitionId);
|
||||
if (partition.getVisibleVersionTime() >= cacheTable.latestPartitionTime) {
|
||||
cacheTable.latestPartitionId = partition.getId();
|
||||
cacheTable.latestPartitionTime = partition.getVisibleVersionTime();
|
||||
cacheTable.latestPartitionVersion = partition.getVisibleVersion();
|
||||
scanTable.addScanPartition(partitionId);
|
||||
}
|
||||
}
|
||||
return cacheTable;
|
||||
|
||||
@ -32,7 +32,6 @@ suite("parse_sql_from_sql_cache") {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
|
||||
|
||||
combineFutures(
|
||||
@ -656,6 +655,43 @@ suite("parse_sql_from_sql_cache") {
|
||||
assertHasCache "select * from test_use_plan_cache20 where id=999"
|
||||
def result6 = sql "select * from test_use_plan_cache20 where id=999"
|
||||
assertTrue(result6.isEmpty())
|
||||
}),
|
||||
extraThread("test_truncate_partition", {
|
||||
sql "drop table if exists test_use_plan_cache21"
|
||||
sql """create table test_use_plan_cache21 (
|
||||
id int,
|
||||
dt int
|
||||
)
|
||||
partition by range(dt)
|
||||
(
|
||||
partition dt1 values [('1'), ('2')),
|
||||
partition dt2 values [('2'), ('3'))
|
||||
)
|
||||
distributed by hash(id)
|
||||
properties('replication_num'='1')"""
|
||||
|
||||
|
||||
|
||||
sql "insert into test_use_plan_cache21 values('2', '2')"
|
||||
sleep(100)
|
||||
sql "insert into test_use_plan_cache21 values('1', '1')"
|
||||
|
||||
// after partition changed 10s, the sql cache can be used
|
||||
sleep(10000)
|
||||
|
||||
sql "set enable_nereids_planner=true"
|
||||
sql "set enable_fallback_to_original_planner=false"
|
||||
sql "set enable_sql_cache=true"
|
||||
|
||||
assertNoCache "select * from test_use_plan_cache21"
|
||||
def result1 = sql "select * from test_use_plan_cache21"
|
||||
assertTrue(result1.size() == 2)
|
||||
assertHasCache "select * from test_use_plan_cache21"
|
||||
|
||||
sql "truncate table test_use_plan_cache21 partition dt2"
|
||||
assertNoCache "select * from test_use_plan_cache21"
|
||||
def result2 = sql "select * from test_use_plan_cache21"
|
||||
assertTrue(result2.size() == 1)
|
||||
})
|
||||
).get()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user