From 5c2f9eb92e9a4f8fe712a896ec079e08bf6fb3f4 Mon Sep 17 00:00:00 2001 From: zhiqqqq Date: Fri, 8 Sep 2023 20:30:52 +0800 Subject: [PATCH] [Improvement] (pipeline) Cancel related query if backend restarts or dead (#23863) --- .../java/org/apache/doris/catalog/Env.java | 6 ++ .../common/profile/ExecutionProfile.java | 5 +- .../java/org/apache/doris/qe/Coordinator.java | 87 +++++++++++++++++-- .../java/org/apache/doris/qe/QeProcessor.java | 3 + .../org/apache/doris/qe/QeProcessorImpl.java | 12 +++ .../apache/doris/qe/QueryCancelWorker.java | 46 ++++++++++ .../java/org/apache/doris/system/Backend.java | 21 ++++- .../doris/system/BackendHbResponse.java | 6 +- gensrc/thrift/HeartbeatService.thrift | 2 +- 9 files changed, 175 insertions(+), 13 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 629fa47a7e..7f223775b6 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -212,6 +212,7 @@ import org.apache.doris.qe.AuditEventProcessor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.GlobalVariable; import org.apache.doris.qe.JournalObservable; +import org.apache.doris.qe.QueryCancelWorker; import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.Tag; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; @@ -474,6 +475,8 @@ public class Env { private BinlogGcer binlogGcer; + private QueryCancelWorker queryCancelWorker; + /** * TODO(tsy): to be removed after load refactor */ @@ -717,6 +720,7 @@ public class Env { this.binlogManager = new BinlogManager(); this.binlogGcer = new BinlogGcer(); this.columnIdFlusher = new ColumnIdFlushDaemon(); + this.queryCancelWorker = new QueryCancelWorker(systemInfo); } public static void destroyCheckpoint() { @@ -961,6 +965,8 @@ public class Env { if (statisticsPeriodCollector != null) { statisticsPeriodCollector.start(); } + + queryCancelWorker.start(); } // wait until FE is ready. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index e4c6c7c48d..bb9e1c8907 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -63,6 +63,8 @@ public class ExecutionProfile { // instance id -> dummy value private MarkedCountDownLatch profileDoneSignal; + private TUniqueId queryId; + public ExecutionProfile(TUniqueId queryId, int fragmentNum) { executionProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); @@ -74,6 +76,7 @@ public class ExecutionProfile { } loadChannelProfile = new RuntimeProfile("LoadChannels"); executionProfile.addChild(loadChannelProfile); + this.queryId = queryId; } public RuntimeProfile getExecutionProfile() { @@ -117,7 +120,7 @@ public class ExecutionProfile { if (profileDoneSignal != null) { // count down to zero to notify all objects waiting for this profileDoneSignal.countDownToZero(new Status()); - LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks() + LOG.info("Query {} unfinished instance: {}", DebugUtil.printId(queryId), profileDoneSignal.getLeftMarks() .stream().map(e -> DebugUtil.printId(e.getKey())).toArray()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 1c97dadf3c..084da08235 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -528,11 +528,12 @@ public class Coordinator { this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); if (LOG.isDebugEnabled()) { - LOG.debug("idToBackend size={}", idToBackend.size()); + LOG.debug("Query {} idToBackend size={}", DebugUtil.printId(queryId), idToBackend.size()); for (Map.Entry entry : idToBackend.entrySet()) { Long backendID = entry.getKey(); Backend backend = entry.getValue(); - LOG.debug("backend: {}-{}-{}", backendID, backend.getHost(), backend.getBePort()); + LOG.debug("Query {}, backend: {}-{}-{}-{}", DebugUtil.printId(queryId), + backendID, backend.getHost(), backend.getBePort(), backend.getProcessEpoch()); } } } @@ -761,7 +762,7 @@ public class Coordinator { BackendExecStates states = beToExecStates.get(execState.backend.getId()); if (states == null) { states = new BackendExecStates(execState.backend.getId(), execState.brpcAddress, - twoPhaseExecution); + twoPhaseExecution, execState.backend.getProcessEpoch()); beToExecStates.putIfAbsent(execState.backend.getId(), states); } states.addState(execState); @@ -1252,6 +1253,70 @@ public class Coordinator { return resultBatch; } + + // We use a very conservative cancel strategy. + // 0. If backends has zero process epoch, do not cancel. Zero process epoch usually arises in cluster upgrading. + // 1. If process epoch is same, do not cancel. Means backends does not restart or die. + public boolean shouldCancel(List currentBackends) { + Map curBeMap = Maps.newHashMap(); + for (Backend be : currentBackends) { + curBeMap.put(be.getId(), be); + } + + try { + lock(); + + if (queryOptions.isEnablePipelineEngine()) { + for (PipelineExecContext pipelineExecContext : pipelineExecContexts.values()) { + Backend be = curBeMap.get(pipelineExecContext.backend.getId()); + if (be == null || !be.isAlive()) { + LOG.warn("Backend {} not exists or dead, query {} should be cancelled", + pipelineExecContext.backend.toString(), DebugUtil.printId(queryId)); + return true; + } + + // Backend process epoch changed, indicates that this be restarts, query should be cancelled. + // Check zero since during upgrading, older version oplog will not persistent be start time + // so newer version follower will get zero epoch when replaying oplog or snapshot + if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) { + LOG.warn("Backend process epoch changed, previous {} now {}, " + + "means this be has already restarted, should cancel this coordinator," + + " query id {}", + pipelineExecContext.beProcessEpoch, be.getProcessEpoch(), + DebugUtil.printId(queryId)); + return true; + } else if (be.getProcessEpoch() == 0) { + LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", + be.toString()); + } + } + } else { + // beToExecStates will be updated only in non-pipeline query. + for (BackendExecStates beExecState : beToExecStates.values()) { + Backend be = curBeMap.get(beExecState.beId); + if (be == null || !be.isAlive()) { + LOG.warn("Backend {} not exists or dead, query {} should be cancelled.", + beExecState.beId, DebugUtil.printId(queryId)); + return true; + } + + if (beExecState.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) { + LOG.warn("Process epoch changed, previous {} now {}, means this be has already restarted, " + + "should cancel this coordinator, query id {}", + beExecState.beProcessEpoch, be.getProcessEpoch(), DebugUtil.printId(queryId)); + return true; + } else if (be.getProcessEpoch() == 0) { + LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", be.toString()); + } + } + } + + return false; + } finally { + unlock(); + } + } + // Cancel execution of query. This includes the execution of the local plan // fragment, // if any, as well as all plan fragments on remote nodes. @@ -1268,7 +1333,7 @@ public class Coordinator { } else { queryStatus.setStatus(Status.CANCELLED); } - LOG.warn("cancel execution of query, this is outside invoke"); + LOG.warn("Cancel execution of query {}, this is a outside invoke", DebugUtil.printId(queryId)); cancelInternal(cancelReason); } finally { unlock(); @@ -1307,8 +1372,9 @@ public class Coordinator { instanceIds.clear(); for (FragmentExecParams params : fragmentExecParamsMap.values()) { if (LOG.isDebugEnabled()) { - LOG.debug("fragment {} has instances {}", - params.fragment.getFragmentId(), params.instanceExecParams.size()); + LOG.debug("Query {} fragment {} has {} instances.", + DebugUtil.printId(queryId), params.fragment.getFragmentId(), + params.instanceExecParams.size()); } for (int j = 0; j < params.instanceExecParams.size(); ++j) { @@ -2823,6 +2889,7 @@ public class Coordinator { Backend backend; long lastMissingHeartbeatTime = -1; long profileReportProgress = 0; + long beProcessEpoch = 0; private final int numInstances; public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId, @@ -2842,6 +2909,7 @@ public class Coordinator { this.backend = idToBackend.get(backendId); this.address = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + this.beProcessEpoch = backend.getProcessEpoch(); this.hasCanceled = false; this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); @@ -2898,13 +2966,16 @@ public class Coordinator { // return true if cancel success. Otherwise, return false public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) { if (!this.initiated) { + LOG.warn("Query {}, ccancel before initiated", DebugUtil.printId(queryId)); return false; } // don't cancel if it is already finished if (this.done) { + LOG.warn("Query {}, cancel after finished", DebugUtil.printId(queryId)); return false; } if (this.hasCanceled) { + LOG.warn("Query {}, cancel after cancelled", DebugUtil.printId(queryId)); return false; } for (TPipelineInstanceParams localParam : rpcParams.local_params) { @@ -2987,11 +3058,13 @@ public class Coordinator { List states = Lists.newArrayList(); boolean twoPhaseExecution = false; ScopedSpan scopedSpan = new ScopedSpan(); + long beProcessEpoch = 0; - public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution) { + public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution, long beProcessEpoch) { this.beId = beId; this.brpcAddr = brpcAddr; this.twoPhaseExecution = twoPhaseExecution; + this.beProcessEpoch = beProcessEpoch; } public void addState(BackendExecState state) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java index 64efd5ef2b..a2a23488cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java @@ -23,6 +23,7 @@ import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TUniqueId; +import java.util.List; import java.util.Map; public interface QeProcessor { @@ -42,4 +43,6 @@ public interface QeProcessor { String getCurrentQueryByQueryId(TUniqueId queryId); Coordinator getCoordinator(TUniqueId queryId); + + List getAllCoordinators(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 86f0a11134..45bddda1e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -37,6 +37,8 @@ import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -76,6 +78,16 @@ public final class QeProcessorImpl implements QeProcessor { return null; } + @Override + public List getAllCoordinators() { + List res = new ArrayList<>(); + + for (QueryInfo co : coordinatorMap.values()) { + res.add(co.coord); + } + return res; + } + @Override public void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException { registerQuery(queryId, new QueryInfo(coord)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java new file mode 100644 index 0000000000..500cad0f28 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.proto.Types; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; + +import java.util.List; + +public class QueryCancelWorker extends MasterDaemon { + private SystemInfoService systemInfoService; + + public QueryCancelWorker(SystemInfoService systemInfoService) { + this.systemInfoService = systemInfoService; + } + + @Override + protected void runAfterCatalogReady() { + List allBackends = systemInfoService.getAllBackends(); + + for (Coordinator co : QeProcessorImpl.INSTANCE.getAllCoordinators()) { + if (co.shouldCancel(allBackends)) { + // TODO(zhiqiang): We need more clear cancel message, so that user can figure out what happened + // by searching log. + co.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index a4bf77d3b6..db470fb91d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -325,6 +325,12 @@ public class Backend implements Writable { return lastMissingHeartbeatTime; } + // Backend process epoch, is uesd to tag a beckend process + // Currently it is always equal to be start time, even during oplog replay. + public long getProcessEpoch() { + return lastStartTime; + } + public boolean isAlive() { return this.isAlive.get(); } @@ -627,7 +633,8 @@ public class Backend implements Writable { @Override public String toString() { return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get() - + ", lastStartTime=" + TimeUtils.longToTimeString(lastStartTime) + ", tags: " + tagMap + "]"; + + ", lastStartTime=" + TimeUtils.longToTimeString(lastStartTime) + ", process epoch=" + lastStartTime + + ", tags: " + tagMap + "]"; } public String getHealthyStatus() { @@ -678,13 +685,21 @@ public class Backend implements Writable { this.lastUpdateMs = hbResponse.getHbTime(); if (!isAlive.get()) { isChanged = true; + LOG.info("{} is back to alive, update start time from {} to {}, " + + "update be epoch from {} to {}.", this.toString(), + TimeUtils.longToTimeString(lastStartTime), + TimeUtils.longToTimeString(hbResponse.getBeStartTime()), + lastStartTime, hbResponse.getBeStartTime()); this.lastStartTime = hbResponse.getBeStartTime(); - LOG.info("{} is back to alive", this.toString()); this.isAlive.set(true); } if (this.lastStartTime != hbResponse.getBeStartTime() && hbResponse.getBeStartTime() > 0) { - LOG.info("{} update last start time to {}", this.toString(), hbResponse.getBeStartTime()); + LOG.info("{} update last start time from {} to {}, " + + "update be epoch from {} to {}.", this.toString(), + TimeUtils.longToTimeString(lastStartTime), + TimeUtils.longToTimeString(hbResponse.getBeStartTime()), + lastStartTime, hbResponse.getBeStartTime()); this.lastStartTime = hbResponse.getBeStartTime(); isChanged = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java index a0baf60e9f..18c5b94568 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java @@ -39,7 +39,11 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { private int brpcPort; @SerializedName(value = "nodeRole") private String nodeRole = Tag.VALUE_MIX; - private long beStartTime; + + // We need to broadcast be start time to all frontends, + // it will be used to check if query on this backend should be canceled. + @SerializedName(value = "beStartTime") + private long beStartTime = 0; private String host; private String version = ""; @SerializedName(value = "isShutDown") diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 0d6badac3b..7fcf45804d 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -47,7 +47,7 @@ struct TBackendInfo { 3: optional Types.TPort be_rpc_port 4: optional Types.TPort brpc_port 5: optional string version - 6: optional i64 be_start_time + 6: optional i64 be_start_time // This field will also be uesd to identify a be process 7: optional string be_node_role 8: optional bool is_shutdown }