diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AutoCloseConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AutoCloseConnectContext.java new file mode 100644 index 0000000000..f0019876e5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AutoCloseConnectContext.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +public class AutoCloseConnectContext implements AutoCloseable { + + public final ConnectContext connectContext; + + public AutoCloseConnectContext(ConnectContext connectContext) { + this.connectContext = connectContext; + connectContext.setThreadLocalInfo(); + } + + @Override + public void close() { + connectContext.kill(false); + ConnectContext.remove(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index a480c219fb..3ba55112c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisJobInfo.JobState; @@ -168,18 +169,23 @@ public class AnalysisJob { tbl.readUnlock(); } for (String sql : partitionAnalysisSQLs) { - ConnectContext connectContext = StatisticsUtil.buildConnectContext(); - this.stmtExecutor = new StmtExecutor(connectContext, sql); - this.stmtExecutor.execute(); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } } params.remove("partId"); params.put("type", col.getType().toString()); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE); - ConnectContext connectContext = StatisticsUtil.buildConnectContext(); - this.stmtExecutor = new StmtExecutor(connectContext, sql); - this.stmtExecutor.execute(); - Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName()); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName()); + } finally { + ConnectContext.remove(); + } + } public int getLastExecTime() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java index 88db4f0a54..216cafcb89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java @@ -19,7 +19,7 @@ package org.apache.doris.statistics; import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.HMSExternalCatalog; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.util.StatisticsUtil; @@ -93,9 +93,10 @@ public class HiveAnalysisJob extends HMSAnalysisJob { } StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE); - ConnectContext connectContext = StatisticsUtil.buildConnectContext(); - this.stmtExecutor = new StmtExecutor(connectContext, sql); - this.stmtExecutor.execute(); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } // Get partition level information. List partitions = ((HMSExternalCatalog) @@ -128,9 +129,10 @@ public class HiveAnalysisJob extends HMSAnalysisJob { } // Update partition level stats for this column. for (String partitionSql : partitionAnalysisSQLs) { - connectContext = StatisticsUtil.buildConnectContext(); - this.stmtExecutor = new StmtExecutor(connectContext, partitionSql); - this.stmtExecutor.execute(); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + this.stmtExecutor = new StmtExecutor(r.connectContext, partitionSql); + this.stmtExecutor.execute(); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java index 0f32221f1f..a62052f8bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java @@ -18,7 +18,7 @@ package org.apache.doris.statistics; import org.apache.doris.common.FeConstants; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.util.StatisticsUtil; @@ -116,8 +116,9 @@ public class IcebergAnalysisJob extends HMSAnalysisJob { // Update table level stats info of this column. StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(INSERT_TABLE_SQL_TEMPLATE); - ConnectContext connectContext = StatisticsUtil.buildConnectContext(); - this.stmtExecutor = new StmtExecutor(connectContext, sql); - this.stmtExecutor.execute(); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 140522323e..a376c6448f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -41,6 +41,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral; +import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; @@ -76,24 +77,20 @@ public class StatisticsUtil { } public static List execStatisticQuery(String sql) { - ConnectContext connectContext = StatisticsUtil.buildConnectContext(); - try { - StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); - connectContext.setExecutor(stmtExecutor); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); + r.connectContext.setExecutor(stmtExecutor); return stmtExecutor.executeInternalQuery(); - } finally { - connectContext.kill(false); } } public static void execUpdate(String sql) throws Exception { - ConnectContext connectContext = StatisticsUtil.buildConnectContext(); - try { - StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql); - connectContext.setExecutor(stmtExecutor); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); + r.connectContext.setExecutor(stmtExecutor); stmtExecutor.execute(); } finally { - connectContext.kill(false); + ConnectContext.remove(); } } @@ -107,7 +104,7 @@ public class StatisticsUtil { return resultBatches.stream().map(ColumnStatistic::fromResultRow).collect(Collectors.toList()); } - public static ConnectContext buildConnectContext() { + public static AutoCloseConnectContext buildConnectContext() { ConnectContext connectContext = new ConnectContext(); SessionVariable sessionVariable = connectContext.getSessionVariable(); sessionVariable.internalSession = true; @@ -121,15 +118,16 @@ public class StatisticsUtil { UUID uuid = UUID.randomUUID(); TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); connectContext.setQueryId(queryId); - connectContext.setThreadLocalInfo(); connectContext.setStartTime(); connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER); - return connectContext; + return new AutoCloseConnectContext(connectContext); } public static void analyze(StatementBase statementBase) throws UserException { - Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), buildConnectContext()); - statementBase.analyze(analyzer); + try (AutoCloseConnectContext r = buildConnectContext()) { + Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), r.connectContext); + statementBase.analyze(analyzer); + } } public static LiteralExpr readableValue(Type type, String columnValue) throws AnalysisException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index c22cc5d5dd..dfe7617b60 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -19,6 +19,7 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.InternalSchemaInitializer; +import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisJobInfo.JobType; @@ -30,6 +31,7 @@ import mockit.Expectations; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class AnalysisJobTest extends TestWithFeService { @@ -63,17 +65,24 @@ public class AnalysisJobTest extends TestWithFeService { new MockUp() { @Mock - public ConnectContext buildConnectContext() { - return connectContext; + public AutoCloseConnectContext buildConnectContext() { + return new AutoCloseConnectContext(connectContext); } @Mock public void execUpdate(String sql) throws Exception { } }; + + new MockUp() { + + @Mock + public ConnectContext get() { + return connectContext; + } + }; String sql = "ANALYZE t1"; - StmtExecutor executor = getSqlStmtExecutor(sql); - executor.execute(); + Assertions.assertNotNull(getSqlStmtExecutor(sql)); } @Test