[feature](workloadgroup) support nereids internal query and all dml query (#28054)

support nereids internal query to bind a workload group
support insert into select bind workload group
support create table as select bind workload group
change token wait timeout to be query timeout or queue timeout
query queue should not bind to pipeline engine, it could be used every where.
This commit is contained in:
yiguolei
2023-12-06 21:07:55 +08:00
committed by GitHub
parent 00c8bab84d
commit 4a4d137402
6 changed files with 132 additions and 77 deletions

View File

@ -20,6 +20,7 @@ package org.apache.doris.qe;
import org.apache.doris.proto.Types;
public interface CoordInterface {
public void exec() throws Exception;
public RowBatch getNext() throws Exception;
@ -27,5 +28,9 @@ public interface CoordInterface {
public int getInstanceTotalNum();
public void cancel(Types.PPlanFragmentCancelReason cancelReason);
// When call exec or get next data finished, should call this method to release
// some resource.
public default void close() {}
}

View File

@ -68,6 +68,8 @@ import org.apache.doris.proto.Types;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
@ -269,6 +271,9 @@ public class Coordinator implements CoordInterface {
private final ExecutionProfile executionProfile;
private QueueToken queueToken = null;
private QueryQueue queryQueue = null;
public ExecutionProfile getExecutionProfile() {
return executionProfile;
}
@ -590,6 +595,32 @@ public class Coordinator implements CoordInterface {
// A call to Exec() must precede all other member function calls.
@Override
public void exec() throws Exception {
// LoadTask does not have context, not controlled by queue now
if (Config.enable_workload_group && Config.enable_query_queue && context != null) {
queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
if (queryQueue == null) {
// This logic is actually useless, because when could not find query queue, it will
// throw exception during workload group manager.
throw new UserException("could not find query queue");
}
queueToken = queryQueue.getToken();
if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) {
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail());
queryQueue.returnToken(queueToken);
throw new UserException(queueToken.getOfferResultDetail());
}
}
execInternal();
}
@Override
public void close() {
if (queryQueue != null && queueToken != null) {
queryQueue.returnToken(queueToken);
}
}
private void execInternal() throws Exception {
if (LOG.isDebugEnabled() && !scanNodes.isEmpty()) {
LOG.debug("debug: in Coordinator::exec. query id: {}, planNode: {}",
DebugUtil.printId(queryId), scanNodes.get(0).treeToThrift());

View File

@ -143,8 +143,6 @@ import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.RpcException;
@ -618,55 +616,37 @@ public class StmtExecutor {
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
// queue query here
syncJournalIfNeeded();
QueueToken queueToken = null;
QueryQueue queryQueue = null;
if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue
&& context.getSessionVariable().getEnablePipelineEngine()) {
queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
queueToken = queryQueue.getToken();
if (!queueToken.waitSignal()) {
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail());
queryQueue.returnToken(queueToken);
throw new UserException(queueToken.getOfferResultDetail());
}
}
int retryTime = Config.max_query_retry_time;
try {
for (int i = 0; i < retryTime; i++) {
try {
// reset query id for each retry
if (i > 0) {
UUID uuid = UUID.randomUUID();
TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}",
DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId));
context.setQueryId(newQueryId);
}
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setReturnResultFromLocal(false);
}
handleQueryStmt();
break;
} catch (RpcException e) {
if (i == retryTime - 1) {
throw e;
}
if (context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) {
LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt);
} else {
throw e;
}
} finally {
if (context.isReturnResultFromLocal()) {
finalizeQuery();
}
for (int i = 0; i < retryTime; i++) {
try {
// reset query id for each retry
if (i > 0) {
UUID uuid = UUID.randomUUID();
TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}",
DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId));
context.setQueryId(newQueryId);
}
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setReturnResultFromLocal(false);
}
handleQueryStmt();
break;
} catch (RpcException e) {
if (i == retryTime - 1) {
throw e;
}
if (context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) {
LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt);
} else {
throw e;
}
} finally {
if (context.isReturnResultFromLocal()) {
finalizeQuery();
}
}
} finally {
if (queueToken != null) {
queryQueue.returnToken(queueToken);
}
}
}
@ -1497,27 +1477,13 @@ public class StmtExecutor {
coordBase = coord;
}
coordBase.exec();
profile.getSummaryProfile().setQueryScheduleFinishTime();
updateProfile(false);
if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
try {
LOG.debug("Start to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
context.getQualifiedUser(), context.getDatabase(),
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
coordBase.getInstanceTotalNum());
} catch (Exception e) {
LOG.warn("Fail to print fragment concurrency for Query.", e);
}
}
if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
Preconditions.checkState(!context.isReturnResultFromLocal());
profile.getSummaryProfile().setTempStartTime();
try {
coordBase.exec();
profile.getSummaryProfile().setQueryScheduleFinishTime();
updateProfile(false);
if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
try {
LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
LOG.debug("Start to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
context.getQualifiedUser(), context.getDatabase(),
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
coordBase.getInstanceTotalNum());
@ -1525,10 +1491,22 @@ public class StmtExecutor {
LOG.warn("Fail to print fragment concurrency for Query.", e);
}
}
return;
}
try {
if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
Preconditions.checkState(!context.isReturnResultFromLocal());
profile.getSummaryProfile().setTempStartTime();
if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
try {
LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
context.getQualifiedUser(), context.getDatabase(),
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
coordBase.getInstanceTotalNum());
} catch (Exception e) {
LOG.warn("Fail to print fragment concurrency for Query.", e);
}
}
return;
}
while (true) {
// register the fetch result time.
profile.getSummaryProfile().setTempStartTime();
@ -1603,6 +1581,7 @@ public class StmtExecutor {
coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
throw e;
} finally {
coordBase.close();
if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
try {
LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
@ -2038,6 +2017,7 @@ public class StmtExecutor {
*/
throwable = t;
} finally {
coord.close();
finalizeQuery();
}
@ -2741,6 +2721,7 @@ public class StmtExecutor {
throw new RuntimeException("Failed to fetch internal SQL result. " + Util.getRootCauseMessage(e), e);
}
} finally {
coord.close();
AuditLogHelper.logAuditLog(context, originStmt.toString(), parsedStmt, getQueryStatisticsForAuditLog(),
true);
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());

View File

@ -126,7 +126,7 @@ public class QueryQueue {
}
// If the token is acquired and do work success, then call this method to release it.
public void returnToken(QueueToken token) throws InterruptedException {
public void returnToken(QueueToken token) {
queueLock.lock();
try {
// If current token is not in ready to run state, then it is still in the queue

View File

@ -42,7 +42,7 @@ public class QueueToken {
private TokenState tokenState;
private long waitTimeout = 0;
private long queueWaitTimeout = 0;
private String offerResultDetail;
@ -51,15 +51,15 @@ public class QueueToken {
private final ReentrantLock tokenLock = new ReentrantLock();
private final Condition tokenCond = tokenLock.newCondition();
public QueueToken(TokenState tokenState, long waitTimeout,
public QueueToken(TokenState tokenState, long queueWaitTimeout,
String offerResultDetail) {
this.tokenId = tokenIdGenerator.addAndGet(1);
this.tokenState = tokenState;
this.waitTimeout = waitTimeout;
this.queueWaitTimeout = queueWaitTimeout;
this.offerResultDetail = offerResultDetail;
}
public boolean waitSignal() throws InterruptedException {
public boolean waitSignal(long queryTimeoutMillis) throws InterruptedException {
this.tokenLock.lock();
try {
if (isTimeout) {
@ -68,6 +68,9 @@ public class QueueToken {
if (tokenState == TokenState.READY_TO_RUN) {
return true;
}
// If query timeout is less than queue wait timeout, then should use
// query timeout as wait timeout
long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? queueWaitTimeout : queryTimeoutMillis;
tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS);
// If wait timeout and is steal not ready to run, then return false
if (tokenState != TokenState.READY_TO_RUN) {

View File

@ -17,8 +17,12 @@
suite("test_crud_wlg") {
def table_name = "wlg_test_table"
def table_name2 = "wlg_test_table2"
def table_name3 = "wlg_test_table3"
sql "drop table if exists ${table_name}"
sql "drop table if exists ${table_name2}"
sql "drop table if exists ${table_name3}"
sql """
CREATE TABLE IF NOT EXISTS `${table_name}` (
@ -37,6 +41,23 @@ suite("test_crud_wlg") {
)
"""
sql """
CREATE TABLE IF NOT EXISTS `${table_name2}` (
`siteid` int(11) NOT NULL COMMENT "",
`citycode` int(11) NOT NULL COMMENT "",
`userid` int(11) NOT NULL COMMENT "",
`pv` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`siteid`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""
sql """insert into ${table_name} values
(9,10,11,12),
(1,2,3,4)
@ -257,7 +278,21 @@ suite("test_crud_wlg") {
sql "alter workload group test_group properties ( 'max_queue_size'='0' );"
Thread.sleep(3000);
try {
sql "select 1;"
sql "select * from ${table_name};"
} catch (Exception e) {
assertTrue(e.getMessage().contains("query waiting queue is full"));
}
// test insert into select will go to queue
try {
sql "insert into ${table_name2} select * from ${table_name};"
} catch (Exception e) {
assertTrue(e.getMessage().contains("query waiting queue is full"));
}
// test create table as select will go to queue
try {
sql "create table ${table_name3} PROPERTIES('replication_num' = '1') as select * from ${table_name};"
} catch (Exception e) {
assertTrue(e.getMessage().contains("query waiting queue is full"));
}
@ -266,7 +301,7 @@ suite("test_crud_wlg") {
sql "alter workload group test_group properties ( 'queue_timeout'='500' );"
Thread.sleep(3000);
try {
sql "select 1;"
sql "select * from ${table_name};"
} catch (Exception e) {
assertTrue(e.getMessage().contains("query wait timeout"));
}