diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java index 925cd1fd15..e57d0d261e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java @@ -20,6 +20,7 @@ package org.apache.doris.qe; import org.apache.doris.proto.Types; public interface CoordInterface { + public void exec() throws Exception; public RowBatch getNext() throws Exception; @@ -27,5 +28,9 @@ public interface CoordInterface { public int getInstanceTotalNum(); public void cancel(Types.PPlanFragmentCancelReason cancelReason); + + // When call exec or get next data finished, should call this method to release + // some resource. + public default void close() {} } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3789be0754..cc56b6f764 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -68,6 +68,8 @@ import org.apache.doris.proto.Types; import org.apache.doris.proto.Types.PUniqueId; import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo; +import org.apache.doris.resource.workloadgroup.QueryQueue; +import org.apache.doris.resource.workloadgroup.QueueToken; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.ExecuteEnv; @@ -269,6 +271,9 @@ public class Coordinator implements CoordInterface { private final ExecutionProfile executionProfile; + private QueueToken queueToken = null; + private QueryQueue queryQueue = null; + public ExecutionProfile getExecutionProfile() { return executionProfile; } @@ -590,6 +595,32 @@ public class Coordinator implements CoordInterface { // A call to Exec() must precede all other member function calls. @Override public void exec() throws Exception { + // LoadTask does not have context, not controlled by queue now + if (Config.enable_workload_group && Config.enable_query_queue && context != null) { + queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); + if (queryQueue == null) { + // This logic is actually useless, because when could not find query queue, it will + // throw exception during workload group manager. + throw new UserException("could not find query queue"); + } + queueToken = queryQueue.getToken(); + if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) { + LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail()); + queryQueue.returnToken(queueToken); + throw new UserException(queueToken.getOfferResultDetail()); + } + } + execInternal(); + } + + @Override + public void close() { + if (queryQueue != null && queueToken != null) { + queryQueue.returnToken(queueToken); + } + } + + private void execInternal() throws Exception { if (LOG.isDebugEnabled() && !scanNodes.isEmpty()) { LOG.debug("debug: in Coordinator::exec. query id: {}, planNode: {}", DebugUtil.printId(queryId), scanNodes.get(0).treeToThrift()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c7658b94e6..7fdbd06169 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -143,8 +143,6 @@ import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; -import org.apache.doris.resource.workloadgroup.QueryQueue; -import org.apache.doris.resource.workloadgroup.QueueToken; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.RpcException; @@ -618,55 +616,37 @@ public class StmtExecutor { private void handleQueryWithRetry(TUniqueId queryId) throws Exception { // queue query here syncJournalIfNeeded(); - QueueToken queueToken = null; - QueryQueue queryQueue = null; - if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue - && context.getSessionVariable().getEnablePipelineEngine()) { - queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); - queueToken = queryQueue.getToken(); - if (!queueToken.waitSignal()) { - LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail()); - queryQueue.returnToken(queueToken); - throw new UserException(queueToken.getOfferResultDetail()); - } - } int retryTime = Config.max_query_retry_time; - try { - for (int i = 0; i < retryTime; i++) { - try { - // reset query id for each retry - if (i > 0) { - UUID uuid = UUID.randomUUID(); - TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(), - uuid.getLeastSignificantBits()); - AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}", - DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId)); - context.setQueryId(newQueryId); - } - if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { - context.setReturnResultFromLocal(false); - } - handleQueryStmt(); - break; - } catch (RpcException e) { - if (i == retryTime - 1) { - throw e; - } - if (context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) { - LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt); - } else { - throw e; - } - } finally { - if (context.isReturnResultFromLocal()) { - finalizeQuery(); - } + for (int i = 0; i < retryTime; i++) { + try { + // reset query id for each retry + if (i > 0) { + UUID uuid = UUID.randomUUID(); + TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(), + uuid.getLeastSignificantBits()); + AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}", + DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId)); + context.setQueryId(newQueryId); + } + if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { + context.setReturnResultFromLocal(false); + } + handleQueryStmt(); + break; + } catch (RpcException e) { + if (i == retryTime - 1) { + throw e; + } + if (context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) { + LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt); + } else { + throw e; + } + } finally { + if (context.isReturnResultFromLocal()) { + finalizeQuery(); } - } - } finally { - if (queueToken != null) { - queryQueue.returnToken(queueToken); } } } @@ -1497,27 +1477,13 @@ public class StmtExecutor { coordBase = coord; } - coordBase.exec(); - - profile.getSummaryProfile().setQueryScheduleFinishTime(); - updateProfile(false); - if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { - try { - LOG.debug("Start to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", - context.getQualifiedUser(), context.getDatabase(), - parsedStmt.getOrigStmt().originStmt.replace("\n", " "), - coordBase.getInstanceTotalNum()); - } catch (Exception e) { - LOG.warn("Fail to print fragment concurrency for Query.", e); - } - } - - if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { - Preconditions.checkState(!context.isReturnResultFromLocal()); - profile.getSummaryProfile().setTempStartTime(); + try { + coordBase.exec(); + profile.getSummaryProfile().setQueryScheduleFinishTime(); + updateProfile(false); if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { try { - LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", + LOG.debug("Start to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", context.getQualifiedUser(), context.getDatabase(), parsedStmt.getOrigStmt().originStmt.replace("\n", " "), coordBase.getInstanceTotalNum()); @@ -1525,10 +1491,22 @@ public class StmtExecutor { LOG.warn("Fail to print fragment concurrency for Query.", e); } } - return; - } - try { + if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { + Preconditions.checkState(!context.isReturnResultFromLocal()); + profile.getSummaryProfile().setTempStartTime(); + if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { + try { + LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", + context.getQualifiedUser(), context.getDatabase(), + parsedStmt.getOrigStmt().originStmt.replace("\n", " "), + coordBase.getInstanceTotalNum()); + } catch (Exception e) { + LOG.warn("Fail to print fragment concurrency for Query.", e); + } + } + return; + } while (true) { // register the fetch result time. profile.getSummaryProfile().setTempStartTime(); @@ -1603,6 +1581,7 @@ public class StmtExecutor { coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); throw e; } finally { + coordBase.close(); if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { try { LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", @@ -2038,6 +2017,7 @@ public class StmtExecutor { */ throwable = t; } finally { + coord.close(); finalizeQuery(); } @@ -2741,6 +2721,7 @@ public class StmtExecutor { throw new RuntimeException("Failed to fetch internal SQL result. " + Util.getRootCauseMessage(e), e); } } finally { + coord.close(); AuditLogHelper.logAuditLog(context, originStmt.toString(), parsedStmt, getQueryStatisticsForAuditLog(), true); QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index 5d9a61f4a2..7ba6353e74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -126,7 +126,7 @@ public class QueryQueue { } // If the token is acquired and do work success, then call this method to release it. - public void returnToken(QueueToken token) throws InterruptedException { + public void returnToken(QueueToken token) { queueLock.lock(); try { // If current token is not in ready to run state, then it is still in the queue diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java index 9126535dc0..17299d3ea3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java @@ -42,7 +42,7 @@ public class QueueToken { private TokenState tokenState; - private long waitTimeout = 0; + private long queueWaitTimeout = 0; private String offerResultDetail; @@ -51,15 +51,15 @@ public class QueueToken { private final ReentrantLock tokenLock = new ReentrantLock(); private final Condition tokenCond = tokenLock.newCondition(); - public QueueToken(TokenState tokenState, long waitTimeout, + public QueueToken(TokenState tokenState, long queueWaitTimeout, String offerResultDetail) { this.tokenId = tokenIdGenerator.addAndGet(1); this.tokenState = tokenState; - this.waitTimeout = waitTimeout; + this.queueWaitTimeout = queueWaitTimeout; this.offerResultDetail = offerResultDetail; } - public boolean waitSignal() throws InterruptedException { + public boolean waitSignal(long queryTimeoutMillis) throws InterruptedException { this.tokenLock.lock(); try { if (isTimeout) { @@ -68,6 +68,9 @@ public class QueueToken { if (tokenState == TokenState.READY_TO_RUN) { return true; } + // If query timeout is less than queue wait timeout, then should use + // query timeout as wait timeout + long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? queueWaitTimeout : queryTimeoutMillis; tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS); // If wait timeout and is steal not ready to run, then return false if (tokenState != TokenState.READY_TO_RUN) { diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 476aa4c10a..6ad4697ae3 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -17,8 +17,12 @@ suite("test_crud_wlg") { def table_name = "wlg_test_table" + def table_name2 = "wlg_test_table2" + def table_name3 = "wlg_test_table3" sql "drop table if exists ${table_name}" + sql "drop table if exists ${table_name2}" + sql "drop table if exists ${table_name3}" sql """ CREATE TABLE IF NOT EXISTS `${table_name}` ( @@ -37,6 +41,23 @@ suite("test_crud_wlg") { ) """ + sql """ + CREATE TABLE IF NOT EXISTS `${table_name2}` ( + `siteid` int(11) NOT NULL COMMENT "", + `citycode` int(11) NOT NULL COMMENT "", + `userid` int(11) NOT NULL COMMENT "", + `pv` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`siteid`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`siteid`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ + sql """insert into ${table_name} values (9,10,11,12), (1,2,3,4) @@ -257,7 +278,21 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'max_queue_size'='0' );" Thread.sleep(3000); try { - sql "select 1;" + sql "select * from ${table_name};" + } catch (Exception e) { + assertTrue(e.getMessage().contains("query waiting queue is full")); + } + + // test insert into select will go to queue + try { + sql "insert into ${table_name2} select * from ${table_name};" + } catch (Exception e) { + assertTrue(e.getMessage().contains("query waiting queue is full")); + } + + // test create table as select will go to queue + try { + sql "create table ${table_name3} PROPERTIES('replication_num' = '1') as select * from ${table_name};" } catch (Exception e) { assertTrue(e.getMessage().contains("query waiting queue is full")); } @@ -266,7 +301,7 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'queue_timeout'='500' );" Thread.sleep(3000); try { - sql "select 1;" + sql "select * from ${table_name};" } catch (Exception e) { assertTrue(e.getMessage().contains("query wait timeout")); }