[enhancement](nereids) speedup sql cache with use variable as partition predicate (#37943)
follow up #37090 support reuse sql cache when use variable as partition predicate and variable change: ```sql set @dt='2024-07-16'; -- create cache 1 select * from tbl where dt = @dt; set @dt='2024-07-17'; -- create cache 2, will not invalidate cache 1 select * from tbl where dt = @dt; set @dt='2024-07-16'; -- reuse cache 1 select * from tbl where dt = @dt; ```
This commit is contained in:
@ -25,12 +25,14 @@ import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.View;
|
||||
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.mysql.privilege.DataMaskPolicy;
|
||||
import org.apache.doris.mysql.privilege.RowFilterPolicy;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.SqlCacheContext;
|
||||
import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
|
||||
import org.apache.doris.nereids.SqlCacheContext.FullColumnName;
|
||||
import org.apache.doris.nereids.SqlCacheContext.FullTableName;
|
||||
import org.apache.doris.nereids.SqlCacheContext.ScanTable;
|
||||
@ -124,7 +126,9 @@ public class NereidsSqlCacheManager {
|
||||
|
||||
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
|
||||
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
|
||||
String key = currentUserIdentity.toString() + ":" + sql.trim();
|
||||
String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
|
||||
? currentUserIdentity.toString() + ":" + sql.trim()
|
||||
: currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
|
||||
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null
|
||||
&& sqlCacheContext.getResultSetInFe().isPresent()) {
|
||||
sqlCaches.put(key, sqlCacheContext);
|
||||
@ -142,7 +146,9 @@ public class NereidsSqlCacheManager {
|
||||
}
|
||||
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
|
||||
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
|
||||
String key = currentUserIdentity.toString() + ":" + sql.trim();
|
||||
String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
|
||||
? currentUserIdentity.toString() + ":" + sql.trim()
|
||||
: currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
|
||||
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
|
||||
SqlCache cache = (SqlCache) analyzer.getCache();
|
||||
sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum());
|
||||
@ -162,8 +168,7 @@ public class NereidsSqlCacheManager {
|
||||
/** tryParseSql */
|
||||
public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, String sql) {
|
||||
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
|
||||
Env env = connectContext.getEnv();
|
||||
String key = currentUserIdentity.toString() + ":" + sql.trim();
|
||||
String key = currentUserIdentity + ":" + sql.trim();
|
||||
SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key);
|
||||
if (sqlCacheContext == null) {
|
||||
return Optional.empty();
|
||||
@ -171,6 +176,36 @@ public class NereidsSqlCacheManager {
|
||||
|
||||
// LOG.info("Total size: " + GraphLayout.parseInstance(sqlCacheContext).totalSize());
|
||||
|
||||
List<Variable> currentVariables = resolveUserVariables(sqlCacheContext);
|
||||
if (usedVariablesChanged(currentVariables, sqlCacheContext)) {
|
||||
String md5 = DebugUtil.printId(
|
||||
sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)));
|
||||
|
||||
String md5CacheKey = currentUserIdentity + ":" + md5;
|
||||
SqlCacheContext sqlCacheContextWithVariable = sqlCaches.getIfPresent(md5CacheKey);
|
||||
|
||||
// already exist cache in the fe, but the variable is different to this query,
|
||||
// we should create another cache context in fe, use another cache key
|
||||
connectContext.getStatementContext()
|
||||
.getSqlCacheContext().ifPresent(ctx -> ctx.setCacheKeyType(CacheKeyType.MD5));
|
||||
|
||||
if (sqlCacheContextWithVariable != null) {
|
||||
return tryParseSqlWithoutCheckVariable(
|
||||
connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity
|
||||
);
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
} else {
|
||||
return tryParseSqlWithoutCheckVariable(connectContext, key, sqlCacheContext, currentUserIdentity);
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
|
||||
ConnectContext connectContext, String key,
|
||||
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
|
||||
Env env = connectContext.getEnv();
|
||||
|
||||
// check table and view and their columns authority
|
||||
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
|
||||
return invalidateCache(key);
|
||||
|
||||
@ -86,6 +86,8 @@ public class SqlCacheContext {
|
||||
private volatile PUniqueId cacheKeyMd5;
|
||||
private volatile ResultSet resultSetInFe;
|
||||
|
||||
private volatile CacheKeyType cacheKeyType = CacheKeyType.SQL;
|
||||
|
||||
public SqlCacheContext(UserIdentity userIdentity, TUniqueId queryId) {
|
||||
this.userIdentity = Objects.requireNonNull(userIdentity, "userIdentity cannot be null");
|
||||
this.queryId = Objects.requireNonNull(queryId, "queryId cannot be null");
|
||||
@ -392,6 +394,14 @@ public class SqlCacheContext {
|
||||
this.resultSetInFe = resultSetInFe;
|
||||
}
|
||||
|
||||
public CacheKeyType getCacheKeyType() {
|
||||
return cacheKeyType;
|
||||
}
|
||||
|
||||
public void setCacheKeyType(CacheKeyType cacheKeyType) {
|
||||
this.cacheKeyType = cacheKeyType;
|
||||
}
|
||||
|
||||
/** FullTableName */
|
||||
@lombok.Data
|
||||
@lombok.AllArgsConstructor
|
||||
@ -434,4 +444,12 @@ public class SqlCacheContext {
|
||||
this.scanPartitions.add(partitionId);
|
||||
}
|
||||
}
|
||||
|
||||
/** CacheKeyType */
|
||||
public enum CacheKeyType {
|
||||
// use `userIdentity`:`sql`.trim() as Cache key in FE
|
||||
SQL,
|
||||
// use MD5 as Cache key in FE
|
||||
MD5
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,6 +49,8 @@ import org.apache.doris.mysql.MysqlCommand;
|
||||
import org.apache.doris.mysql.MysqlPacket;
|
||||
import org.apache.doris.mysql.MysqlSerializer;
|
||||
import org.apache.doris.mysql.MysqlServerStatusFlag;
|
||||
import org.apache.doris.nereids.SqlCacheContext;
|
||||
import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.exceptions.NotSupportedException;
|
||||
import org.apache.doris.nereids.exceptions.ParseException;
|
||||
@ -230,9 +232,15 @@ public abstract class ConnectProcessor {
|
||||
boolean nereidsUseServerPrep = (sessionVariable.enableServeSidePreparedStatement
|
||||
&& !sessionVariable.isEnableInsertGroupCommit())
|
||||
|| mysqlCommand == MysqlCommand.COM_QUERY;
|
||||
CacheKeyType cacheKeyType = null;
|
||||
if (nereidsUseServerPrep && sessionVariable.isEnableNereidsPlanner()) {
|
||||
if (wantToParseSqlFromSqlCache) {
|
||||
cachedStmts = parseFromSqlCache(originStmt);
|
||||
Optional<SqlCacheContext> sqlCacheContext = ConnectContext.get()
|
||||
.getStatementContext().getSqlCacheContext();
|
||||
if (sqlCacheContext.isPresent()) {
|
||||
cacheKeyType = sqlCacheContext.get().getCacheKeyType();
|
||||
}
|
||||
if (cachedStmts != null) {
|
||||
stmts = cachedStmts;
|
||||
}
|
||||
@ -310,6 +318,12 @@ public abstract class ConnectProcessor {
|
||||
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
|
||||
ctx.setExecutor(executor);
|
||||
|
||||
if (cacheKeyType != null) {
|
||||
SqlCacheContext sqlCacheContext =
|
||||
executor.getContext().getStatementContext().getSqlCacheContext().get();
|
||||
sqlCacheContext.setCacheKeyType(cacheKeyType);
|
||||
}
|
||||
|
||||
try {
|
||||
executor.execute();
|
||||
if (connectType.equals(ConnectType.MYSQL)) {
|
||||
|
||||
@ -503,6 +503,26 @@ suite("parse_sql_from_sql_cache") {
|
||||
assertHasCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1"
|
||||
def result1 = sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1"
|
||||
assertTrue(result1.size() == 1 && result1[0][0].toString().toInteger() == 10)
|
||||
|
||||
|
||||
sql "set @custom_variable2=1"
|
||||
assertNoCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
|
||||
def res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
|
||||
assertTrue(res[0][0] == 1)
|
||||
assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
|
||||
|
||||
sql "set @custom_variable2=2"
|
||||
assertNoCache "select* from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
|
||||
// should not invalidate cache with @custom_variable2=1
|
||||
res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
|
||||
assertTrue(res[0][0] == 2)
|
||||
assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
|
||||
|
||||
sql "set @custom_variable2=1"
|
||||
// should reuse cache
|
||||
assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
|
||||
res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
|
||||
assertTrue(res[0][0] == 1)
|
||||
}
|
||||
}),
|
||||
extraThread("test_udf", {
|
||||
|
||||
Reference in New Issue
Block a user