branch-2.1: [fix](sql cache) fix prepare statement with sql cache throw NullPointerException (#48902) (#48977)
cherry pick from #48902
This commit is contained in:
@ -126,6 +126,13 @@ public class NereidsSqlCacheManager {
|
||||
* tryAddFeCache
|
||||
*/
|
||||
public void tryAddFeSqlCache(ConnectContext connectContext, String sql) {
|
||||
switch (connectContext.getCommand()) {
|
||||
case COM_STMT_EXECUTE:
|
||||
case COM_STMT_PREPARE:
|
||||
return;
|
||||
default: { }
|
||||
}
|
||||
|
||||
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
|
||||
if (!sqlCacheContextOpt.isPresent()) {
|
||||
return;
|
||||
@ -145,6 +152,12 @@ public class NereidsSqlCacheManager {
|
||||
* tryAddBeCache
|
||||
*/
|
||||
public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyzer analyzer) {
|
||||
switch (connectContext.getCommand()) {
|
||||
case COM_STMT_EXECUTE:
|
||||
case COM_STMT_PREPARE:
|
||||
return;
|
||||
default: { }
|
||||
}
|
||||
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
|
||||
if (!sqlCacheContextOpt.isPresent()) {
|
||||
return;
|
||||
@ -176,6 +189,12 @@ public class NereidsSqlCacheManager {
|
||||
* tryParseSql
|
||||
*/
|
||||
public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, String sql) {
|
||||
switch (connectContext.getCommand()) {
|
||||
case COM_STMT_EXECUTE:
|
||||
case COM_STMT_PREPARE:
|
||||
return Optional.empty();
|
||||
default: { }
|
||||
}
|
||||
String key = generateCacheKey(connectContext, normalizeSql(sql.trim()));
|
||||
SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key);
|
||||
if (sqlCacheContext == null) {
|
||||
|
||||
@ -167,7 +167,7 @@ public class StatementContext implements Closeable {
|
||||
private final Stack<CloseableResource> plannerResources = new Stack<>();
|
||||
|
||||
// placeholder params for prepared statement
|
||||
private List<Placeholder> placeholders;
|
||||
private List<Placeholder> placeholders = new ArrayList<>();
|
||||
|
||||
// all tables in query
|
||||
private boolean needLockTables = true;
|
||||
|
||||
@ -23,6 +23,8 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.PointQueryExecutor;
|
||||
@ -65,9 +67,13 @@ public class ExecuteCommand extends Command {
|
||||
throw new AnalysisException(
|
||||
"prepare statement " + stmtName + " not found, maybe expired");
|
||||
}
|
||||
PrepareCommand prepareCommand = (PrepareCommand) preparedStmtCtx.command;
|
||||
LogicalPlanAdapter planAdapter = new LogicalPlanAdapter(prepareCommand.getLogicalPlan(), executor.getContext()
|
||||
.getStatementContext());
|
||||
PrepareCommand prepareCommand = preparedStmtCtx.command;
|
||||
LogicalPlan logicalPlan = prepareCommand.getLogicalPlan();
|
||||
if (logicalPlan instanceof LogicalSqlCache) {
|
||||
throw new AnalysisException("Unsupported sql cache for server prepared statement");
|
||||
}
|
||||
LogicalPlanAdapter planAdapter = new LogicalPlanAdapter(
|
||||
logicalPlan, executor.getContext().getStatementContext());
|
||||
executor.setParsedStmt(planAdapter);
|
||||
// If it's not a short circuit query or schema version is different(indicates schema changed) or
|
||||
// has nondeterministic functions in statement, then need to do reanalyze and plan
|
||||
|
||||
@ -396,6 +396,17 @@ public abstract class ConnectProcessor {
|
||||
private List<StatementBase> parseFromSqlCache(String originStmt) {
|
||||
StatementContext statementContext = new StatementContext(ctx, new OriginStatement(originStmt, 0));
|
||||
ctx.setStatementContext(statementContext);
|
||||
|
||||
// the mysql protocol has different between COM_QUERY and COM_STMT_EXECUTE,
|
||||
// the sql cache use the result of COM_QUERY, so we can not provide the
|
||||
// result of sql cache for COM_STMT_EXECUTE/COM_STMT_PREPARE
|
||||
switch (ctx.getCommand()) {
|
||||
case COM_STMT_EXECUTE:
|
||||
case COM_STMT_PREPARE:
|
||||
return null;
|
||||
default: { }
|
||||
}
|
||||
|
||||
try {
|
||||
Optional<Pair<ExplainOptions, String>> explainPlan = NereidsParser.tryParseExplainPlan(originStmt);
|
||||
String cacheSqlKey = originStmt;
|
||||
|
||||
@ -68,7 +68,7 @@ class TestAction implements SuiteAction {
|
||||
} else {
|
||||
if (exception != null || result.exception != null) {
|
||||
def msg = result.exception?.toString()
|
||||
log.info("Exception: ${msg}")
|
||||
log.error("Exception: ${msg}", exception != null ? exception : result.exception)
|
||||
Assert.assertTrue("Expect exception msg contains '${exception}', but meet '${msg}'",
|
||||
msg != null && exception != null && msg.contains(exception))
|
||||
}
|
||||
|
||||
@ -33,12 +33,7 @@ suite("parse_sql_from_sql_cache") {
|
||||
}
|
||||
|
||||
def dbName = (sql "select database()")[0][0].toString()
|
||||
foreachFrontends { fe ->
|
||||
def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/${dbName}"
|
||||
connect(context.config.jdbcUser, context.config.jdbcPassword, url) {
|
||||
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
|
||||
}
|
||||
}
|
||||
sql "ADMIN SET ALL FRONTENDS CONFIG ('cache_last_version_interval_second' = '10')"
|
||||
|
||||
// make sure if the table has been dropped, the cache should invalidate,
|
||||
// so we should retry multiple times to check
|
||||
|
||||
@ -15,17 +15,10 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import com.mysql.cj.ServerPreparedQuery
|
||||
import com.mysql.cj.jdbc.ConnectionImpl
|
||||
import com.mysql.cj.jdbc.JdbcStatement
|
||||
import com.mysql.cj.jdbc.ServerPreparedStatement
|
||||
import com.mysql.cj.jdbc.StatementImpl
|
||||
import org.apache.doris.regression.util.JdbcUtils
|
||||
|
||||
import java.lang.reflect.Field
|
||||
import java.sql.PreparedStatement
|
||||
import java.sql.ResultSet
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
|
||||
suite("prepare_stmt_with_sql_cache") {
|
||||
|
||||
@ -38,11 +31,13 @@ suite("prepare_stmt_with_sql_cache") {
|
||||
insert into test_prepare_stmt_with_sql_cache select * from numbers('number'='100');
|
||||
"""
|
||||
|
||||
sql "ADMIN SET ALL FRONTENDS CONFIG ('cache_last_version_interval_second' = '10')"
|
||||
|
||||
def db = (sql "select database()")[0][0].toString()
|
||||
|
||||
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
|
||||
def serverPrepareUrl = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
|
||||
|
||||
connect(context.config.jdbcUser, context.config.jdbcPassword, url) {
|
||||
connect(context.config.jdbcUser, context.config.jdbcPassword, serverPrepareUrl) {
|
||||
sql "set enable_sql_cache=true"
|
||||
for (def i in 0..<10) {
|
||||
try (PreparedStatement pstmt = prepareStatement("select * from test_prepare_stmt_with_sql_cache where id=?")) {
|
||||
@ -54,4 +49,24 @@ suite("prepare_stmt_with_sql_cache") {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sleep(10 * 1000)
|
||||
|
||||
connect(context.config.jdbcUser, context.config.jdbcPassword, context.config.jdbcUrl) {
|
||||
sql "use ${db}"
|
||||
sql "set enable_sql_cache=true"
|
||||
test {
|
||||
sql "select * from test_prepare_stmt_with_sql_cache where id=10"
|
||||
result([[10]])
|
||||
}
|
||||
}
|
||||
|
||||
connect(context.config.jdbcUser, context.config.jdbcPassword, serverPrepareUrl) {
|
||||
sql "use ${db}"
|
||||
sql "set enable_sql_cache=true"
|
||||
test {
|
||||
sql "select * from test_prepare_stmt_with_sql_cache where id=10"
|
||||
result(([[10]]))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user