[fix](fe) fix leaks of connect context (#14529)
Remove ConnectContext which built for internal statistics from threadlocal to avoid memory leaks
This commit is contained in:
@ -0,0 +1,34 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.qe;
|
||||
|
||||
public class AutoCloseConnectContext implements AutoCloseable {
|
||||
|
||||
public final ConnectContext connectContext;
|
||||
|
||||
public AutoCloseConnectContext(ConnectContext connectContext) {
|
||||
this.connectContext = connectContext;
|
||||
connectContext.setThreadLocalInfo();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
connectContext.kill(false);
|
||||
ConnectContext.remove();
|
||||
}
|
||||
}
|
||||
@ -25,6 +25,7 @@ import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo.JobState;
|
||||
@ -168,18 +169,23 @@ public class AnalysisJob {
|
||||
tbl.readUnlock();
|
||||
}
|
||||
for (String sql : partitionAnalysisSQLs) {
|
||||
ConnectContext connectContext = StatisticsUtil.buildConnectContext();
|
||||
this.stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
}
|
||||
params.remove("partId");
|
||||
params.put("type", col.getType().toString());
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
|
||||
ConnectContext connectContext = StatisticsUtil.buildConnectContext();
|
||||
this.stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName());
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName());
|
||||
} finally {
|
||||
ConnectContext.remove();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public int getLastExecTime() {
|
||||
|
||||
@ -19,7 +19,7 @@ package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
@ -93,9 +93,10 @@ public class HiveAnalysisJob extends HMSAnalysisJob {
|
||||
}
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE);
|
||||
ConnectContext connectContext = StatisticsUtil.buildConnectContext();
|
||||
this.stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
|
||||
// Get partition level information.
|
||||
List<String> partitions = ((HMSExternalCatalog)
|
||||
@ -128,9 +129,10 @@ public class HiveAnalysisJob extends HMSAnalysisJob {
|
||||
}
|
||||
// Update partition level stats for this column.
|
||||
for (String partitionSql : partitionAnalysisSQLs) {
|
||||
connectContext = StatisticsUtil.buildConnectContext();
|
||||
this.stmtExecutor = new StmtExecutor(connectContext, partitionSql);
|
||||
this.stmtExecutor.execute();
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, partitionSql);
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
@ -116,8 +116,9 @@ public class IcebergAnalysisJob extends HMSAnalysisJob {
|
||||
// Update table level stats info of this column.
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(INSERT_TABLE_SQL_TEMPLATE);
|
||||
ConnectContext connectContext = StatisticsUtil.buildConnectContext();
|
||||
this.stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -41,6 +41,7 @@ import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
@ -76,24 +77,20 @@ public class StatisticsUtil {
|
||||
}
|
||||
|
||||
public static List<ResultRow> execStatisticQuery(String sql) {
|
||||
ConnectContext connectContext = StatisticsUtil.buildConnectContext();
|
||||
try {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
connectContext.setExecutor(stmtExecutor);
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
r.connectContext.setExecutor(stmtExecutor);
|
||||
return stmtExecutor.executeInternalQuery();
|
||||
} finally {
|
||||
connectContext.kill(false);
|
||||
}
|
||||
}
|
||||
|
||||
public static void execUpdate(String sql) throws Exception {
|
||||
ConnectContext connectContext = StatisticsUtil.buildConnectContext();
|
||||
try {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
|
||||
connectContext.setExecutor(stmtExecutor);
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
r.connectContext.setExecutor(stmtExecutor);
|
||||
stmtExecutor.execute();
|
||||
} finally {
|
||||
connectContext.kill(false);
|
||||
ConnectContext.remove();
|
||||
}
|
||||
}
|
||||
|
||||
@ -107,7 +104,7 @@ public class StatisticsUtil {
|
||||
return resultBatches.stream().map(ColumnStatistic::fromResultRow).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static ConnectContext buildConnectContext() {
|
||||
public static AutoCloseConnectContext buildConnectContext() {
|
||||
ConnectContext connectContext = new ConnectContext();
|
||||
SessionVariable sessionVariable = connectContext.getSessionVariable();
|
||||
sessionVariable.internalSession = true;
|
||||
@ -121,15 +118,16 @@ public class StatisticsUtil {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
|
||||
connectContext.setQueryId(queryId);
|
||||
connectContext.setThreadLocalInfo();
|
||||
connectContext.setStartTime();
|
||||
connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER);
|
||||
return connectContext;
|
||||
return new AutoCloseConnectContext(connectContext);
|
||||
}
|
||||
|
||||
public static void analyze(StatementBase statementBase) throws UserException {
|
||||
Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), buildConnectContext());
|
||||
statementBase.analyze(analyzer);
|
||||
try (AutoCloseConnectContext r = buildConnectContext()) {
|
||||
Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), r.connectContext);
|
||||
statementBase.analyze(analyzer);
|
||||
}
|
||||
}
|
||||
|
||||
public static LiteralExpr readableValue(Type type, String columnValue) throws AnalysisException {
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.InternalSchemaInitializer;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo.JobType;
|
||||
@ -30,6 +31,7 @@ import mockit.Expectations;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import mockit.Mocked;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class AnalysisJobTest extends TestWithFeService {
|
||||
@ -63,17 +65,24 @@ public class AnalysisJobTest extends TestWithFeService {
|
||||
new MockUp<StatisticsUtil>() {
|
||||
|
||||
@Mock
|
||||
public ConnectContext buildConnectContext() {
|
||||
return connectContext;
|
||||
public AutoCloseConnectContext buildConnectContext() {
|
||||
return new AutoCloseConnectContext(connectContext);
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void execUpdate(String sql) throws Exception {
|
||||
}
|
||||
};
|
||||
|
||||
new MockUp<ConnectContext>() {
|
||||
|
||||
@Mock
|
||||
public ConnectContext get() {
|
||||
return connectContext;
|
||||
}
|
||||
};
|
||||
String sql = "ANALYZE t1";
|
||||
StmtExecutor executor = getSqlStmtExecutor(sql);
|
||||
executor.execute();
|
||||
Assertions.assertNotNull(getSqlStmtExecutor(sql));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user