From 4a4d137402fbeee5f7e8d193e15da482b80bca1f Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Wed, 6 Dec 2023 21:07:55 +0800 Subject: [PATCH] [feature](workloadgroup) support nereids internal query and all dml query (#28054) support nereids internal query to bind a workload group support insert into select bind workload group support create table as select bind workload group change token wait timeout to be query timeout or queue timeout query queue should not bind to pipeline engine, it could be used every where. --- .../org/apache/doris/qe/CoordInterface.java | 5 + .../java/org/apache/doris/qe/Coordinator.java | 31 +++++ .../org/apache/doris/qe/StmtExecutor.java | 121 ++++++++---------- .../resource/workloadgroup/QueryQueue.java | 2 +- .../resource/workloadgroup/QueueToken.java | 11 +- .../workload_manager_p0/test_curd_wlg.groovy | 39 +++++- 6 files changed, 132 insertions(+), 77 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java index 925cd1fd15..e57d0d261e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java @@ -20,6 +20,7 @@ package org.apache.doris.qe; import org.apache.doris.proto.Types; public interface CoordInterface { + public void exec() throws Exception; public RowBatch getNext() throws Exception; @@ -27,5 +28,9 @@ public interface CoordInterface { public int getInstanceTotalNum(); public void cancel(Types.PPlanFragmentCancelReason cancelReason); + + // When call exec or get next data finished, should call this method to release + // some resource. + public default void close() {} } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3789be0754..cc56b6f764 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -68,6 +68,8 @@ import org.apache.doris.proto.Types; import org.apache.doris.proto.Types.PUniqueId; import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo; +import org.apache.doris.resource.workloadgroup.QueryQueue; +import org.apache.doris.resource.workloadgroup.QueueToken; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.ExecuteEnv; @@ -269,6 +271,9 @@ public class Coordinator implements CoordInterface { private final ExecutionProfile executionProfile; + private QueueToken queueToken = null; + private QueryQueue queryQueue = null; + public ExecutionProfile getExecutionProfile() { return executionProfile; } @@ -590,6 +595,32 @@ public class Coordinator implements CoordInterface { // A call to Exec() must precede all other member function calls. @Override public void exec() throws Exception { + // LoadTask does not have context, not controlled by queue now + if (Config.enable_workload_group && Config.enable_query_queue && context != null) { + queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); + if (queryQueue == null) { + // This logic is actually useless, because when could not find query queue, it will + // throw exception during workload group manager. + throw new UserException("could not find query queue"); + } + queueToken = queryQueue.getToken(); + if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) { + LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail()); + queryQueue.returnToken(queueToken); + throw new UserException(queueToken.getOfferResultDetail()); + } + } + execInternal(); + } + + @Override + public void close() { + if (queryQueue != null && queueToken != null) { + queryQueue.returnToken(queueToken); + } + } + + private void execInternal() throws Exception { if (LOG.isDebugEnabled() && !scanNodes.isEmpty()) { LOG.debug("debug: in Coordinator::exec. query id: {}, planNode: {}", DebugUtil.printId(queryId), scanNodes.get(0).treeToThrift()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c7658b94e6..7fdbd06169 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -143,8 +143,6 @@ import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; -import org.apache.doris.resource.workloadgroup.QueryQueue; -import org.apache.doris.resource.workloadgroup.QueueToken; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.RpcException; @@ -618,55 +616,37 @@ public class StmtExecutor { private void handleQueryWithRetry(TUniqueId queryId) throws Exception { // queue query here syncJournalIfNeeded(); - QueueToken queueToken = null; - QueryQueue queryQueue = null; - if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue - && context.getSessionVariable().getEnablePipelineEngine()) { - queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); - queueToken = queryQueue.getToken(); - if (!queueToken.waitSignal()) { - LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail()); - queryQueue.returnToken(queueToken); - throw new UserException(queueToken.getOfferResultDetail()); - } - } int retryTime = Config.max_query_retry_time; - try { - for (int i = 0; i < retryTime; i++) { - try { - // reset query id for each retry - if (i > 0) { - UUID uuid = UUID.randomUUID(); - TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(), - uuid.getLeastSignificantBits()); - AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}", - DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId)); - context.setQueryId(newQueryId); - } - if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { - context.setReturnResultFromLocal(false); - } - handleQueryStmt(); - break; - } catch (RpcException e) { - if (i == retryTime - 1) { - throw e; - } - if (context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) { - LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt); - } else { - throw e; - } - } finally { - if (context.isReturnResultFromLocal()) { - finalizeQuery(); - } + for (int i = 0; i < retryTime; i++) { + try { + // reset query id for each retry + if (i > 0) { + UUID uuid = UUID.randomUUID(); + TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(), + uuid.getLeastSignificantBits()); + AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}", + DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId)); + context.setQueryId(newQueryId); + } + if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { + context.setReturnResultFromLocal(false); + } + handleQueryStmt(); + break; + } catch (RpcException e) { + if (i == retryTime - 1) { + throw e; + } + if (context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) { + LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt); + } else { + throw e; + } + } finally { + if (context.isReturnResultFromLocal()) { + finalizeQuery(); } - } - } finally { - if (queueToken != null) { - queryQueue.returnToken(queueToken); } } } @@ -1497,27 +1477,13 @@ public class StmtExecutor { coordBase = coord; } - coordBase.exec(); - - profile.getSummaryProfile().setQueryScheduleFinishTime(); - updateProfile(false); - if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { - try { - LOG.debug("Start to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", - context.getQualifiedUser(), context.getDatabase(), - parsedStmt.getOrigStmt().originStmt.replace("\n", " "), - coordBase.getInstanceTotalNum()); - } catch (Exception e) { - LOG.warn("Fail to print fragment concurrency for Query.", e); - } - } - - if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { - Preconditions.checkState(!context.isReturnResultFromLocal()); - profile.getSummaryProfile().setTempStartTime(); + try { + coordBase.exec(); + profile.getSummaryProfile().setQueryScheduleFinishTime(); + updateProfile(false); if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { try { - LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", + LOG.debug("Start to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", context.getQualifiedUser(), context.getDatabase(), parsedStmt.getOrigStmt().originStmt.replace("\n", " "), coordBase.getInstanceTotalNum()); @@ -1525,10 +1491,22 @@ public class StmtExecutor { LOG.warn("Fail to print fragment concurrency for Query.", e); } } - return; - } - try { + if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { + Preconditions.checkState(!context.isReturnResultFromLocal()); + profile.getSummaryProfile().setTempStartTime(); + if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { + try { + LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", + context.getQualifiedUser(), context.getDatabase(), + parsedStmt.getOrigStmt().originStmt.replace("\n", " "), + coordBase.getInstanceTotalNum()); + } catch (Exception e) { + LOG.warn("Fail to print fragment concurrency for Query.", e); + } + } + return; + } while (true) { // register the fetch result time. profile.getSummaryProfile().setTempStartTime(); @@ -1603,6 +1581,7 @@ public class StmtExecutor { coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); throw e; } finally { + coordBase.close(); if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { try { LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", @@ -2038,6 +2017,7 @@ public class StmtExecutor { */ throwable = t; } finally { + coord.close(); finalizeQuery(); } @@ -2741,6 +2721,7 @@ public class StmtExecutor { throw new RuntimeException("Failed to fetch internal SQL result. " + Util.getRootCauseMessage(e), e); } } finally { + coord.close(); AuditLogHelper.logAuditLog(context, originStmt.toString(), parsedStmt, getQueryStatisticsForAuditLog(), true); QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index 5d9a61f4a2..7ba6353e74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -126,7 +126,7 @@ public class QueryQueue { } // If the token is acquired and do work success, then call this method to release it. - public void returnToken(QueueToken token) throws InterruptedException { + public void returnToken(QueueToken token) { queueLock.lock(); try { // If current token is not in ready to run state, then it is still in the queue diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java index 9126535dc0..17299d3ea3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java @@ -42,7 +42,7 @@ public class QueueToken { private TokenState tokenState; - private long waitTimeout = 0; + private long queueWaitTimeout = 0; private String offerResultDetail; @@ -51,15 +51,15 @@ public class QueueToken { private final ReentrantLock tokenLock = new ReentrantLock(); private final Condition tokenCond = tokenLock.newCondition(); - public QueueToken(TokenState tokenState, long waitTimeout, + public QueueToken(TokenState tokenState, long queueWaitTimeout, String offerResultDetail) { this.tokenId = tokenIdGenerator.addAndGet(1); this.tokenState = tokenState; - this.waitTimeout = waitTimeout; + this.queueWaitTimeout = queueWaitTimeout; this.offerResultDetail = offerResultDetail; } - public boolean waitSignal() throws InterruptedException { + public boolean waitSignal(long queryTimeoutMillis) throws InterruptedException { this.tokenLock.lock(); try { if (isTimeout) { @@ -68,6 +68,9 @@ public class QueueToken { if (tokenState == TokenState.READY_TO_RUN) { return true; } + // If query timeout is less than queue wait timeout, then should use + // query timeout as wait timeout + long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? queueWaitTimeout : queryTimeoutMillis; tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS); // If wait timeout and is steal not ready to run, then return false if (tokenState != TokenState.READY_TO_RUN) { diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 476aa4c10a..6ad4697ae3 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -17,8 +17,12 @@ suite("test_crud_wlg") { def table_name = "wlg_test_table" + def table_name2 = "wlg_test_table2" + def table_name3 = "wlg_test_table3" sql "drop table if exists ${table_name}" + sql "drop table if exists ${table_name2}" + sql "drop table if exists ${table_name3}" sql """ CREATE TABLE IF NOT EXISTS `${table_name}` ( @@ -37,6 +41,23 @@ suite("test_crud_wlg") { ) """ + sql """ + CREATE TABLE IF NOT EXISTS `${table_name2}` ( + `siteid` int(11) NOT NULL COMMENT "", + `citycode` int(11) NOT NULL COMMENT "", + `userid` int(11) NOT NULL COMMENT "", + `pv` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`siteid`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`siteid`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ + sql """insert into ${table_name} values (9,10,11,12), (1,2,3,4) @@ -257,7 +278,21 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'max_queue_size'='0' );" Thread.sleep(3000); try { - sql "select 1;" + sql "select * from ${table_name};" + } catch (Exception e) { + assertTrue(e.getMessage().contains("query waiting queue is full")); + } + + // test insert into select will go to queue + try { + sql "insert into ${table_name2} select * from ${table_name};" + } catch (Exception e) { + assertTrue(e.getMessage().contains("query waiting queue is full")); + } + + // test create table as select will go to queue + try { + sql "create table ${table_name3} PROPERTIES('replication_num' = '1') as select * from ${table_name};" } catch (Exception e) { assertTrue(e.getMessage().contains("query waiting queue is full")); } @@ -266,7 +301,7 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'queue_timeout'='500' );" Thread.sleep(3000); try { - sql "select 1;" + sql "select * from ${table_name};" } catch (Exception e) { assertTrue(e.getMessage().contains("query wait timeout")); }