[Improvement] (pipeline) Cancel related query if backend restarts or dead (#23863)

This commit is contained in:
zhiqqqq
2023-09-08 20:30:52 +08:00
committed by GitHub
parent e140938d81
commit 5c2f9eb92e
9 changed files with 175 additions and 13 deletions

View File

@ -212,6 +212,7 @@ import org.apache.doris.qe.AuditEventProcessor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.JournalObservable;
import org.apache.doris.qe.QueryCancelWorker;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
@ -474,6 +475,8 @@ public class Env {
private BinlogGcer binlogGcer;
private QueryCancelWorker queryCancelWorker;
/**
* TODO(tsy): to be removed after load refactor
*/
@ -717,6 +720,7 @@ public class Env {
this.binlogManager = new BinlogManager();
this.binlogGcer = new BinlogGcer();
this.columnIdFlusher = new ColumnIdFlushDaemon();
this.queryCancelWorker = new QueryCancelWorker(systemInfo);
}
public static void destroyCheckpoint() {
@ -961,6 +965,8 @@ public class Env {
if (statisticsPeriodCollector != null) {
statisticsPeriodCollector.start();
}
queryCancelWorker.start();
}
// wait until FE is ready.

View File

@ -63,6 +63,8 @@ public class ExecutionProfile {
// instance id -> dummy value
private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal;
private TUniqueId queryId;
public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
executionProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
@ -74,6 +76,7 @@ public class ExecutionProfile {
}
loadChannelProfile = new RuntimeProfile("LoadChannels");
executionProfile.addChild(loadChannelProfile);
this.queryId = queryId;
}
public RuntimeProfile getExecutionProfile() {
@ -117,7 +120,7 @@ public class ExecutionProfile {
if (profileDoneSignal != null) {
// count down to zero to notify all objects waiting for this
profileDoneSignal.countDownToZero(new Status());
LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks()
LOG.info("Query {} unfinished instance: {}", DebugUtil.printId(queryId), profileDoneSignal.getLeftMarks()
.stream().map(e -> DebugUtil.printId(e.getKey())).toArray());
}
}

View File

@ -528,11 +528,12 @@ public class Coordinator {
this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
if (LOG.isDebugEnabled()) {
LOG.debug("idToBackend size={}", idToBackend.size());
LOG.debug("Query {} idToBackend size={}", DebugUtil.printId(queryId), idToBackend.size());
for (Map.Entry<Long, Backend> entry : idToBackend.entrySet()) {
Long backendID = entry.getKey();
Backend backend = entry.getValue();
LOG.debug("backend: {}-{}-{}", backendID, backend.getHost(), backend.getBePort());
LOG.debug("Query {}, backend: {}-{}-{}-{}", DebugUtil.printId(queryId),
backendID, backend.getHost(), backend.getBePort(), backend.getProcessEpoch());
}
}
}
@ -761,7 +762,7 @@ public class Coordinator {
BackendExecStates states = beToExecStates.get(execState.backend.getId());
if (states == null) {
states = new BackendExecStates(execState.backend.getId(), execState.brpcAddress,
twoPhaseExecution);
twoPhaseExecution, execState.backend.getProcessEpoch());
beToExecStates.putIfAbsent(execState.backend.getId(), states);
}
states.addState(execState);
@ -1252,6 +1253,70 @@ public class Coordinator {
return resultBatch;
}
// We use a very conservative cancel strategy.
// 0. If backends has zero process epoch, do not cancel. Zero process epoch usually arises in cluster upgrading.
// 1. If process epoch is same, do not cancel. Means backends does not restart or die.
public boolean shouldCancel(List<Backend> currentBackends) {
Map<Long, Backend> curBeMap = Maps.newHashMap();
for (Backend be : currentBackends) {
curBeMap.put(be.getId(), be);
}
try {
lock();
if (queryOptions.isEnablePipelineEngine()) {
for (PipelineExecContext pipelineExecContext : pipelineExecContexts.values()) {
Backend be = curBeMap.get(pipelineExecContext.backend.getId());
if (be == null || !be.isAlive()) {
LOG.warn("Backend {} not exists or dead, query {} should be cancelled",
pipelineExecContext.backend.toString(), DebugUtil.printId(queryId));
return true;
}
// Backend process epoch changed, indicates that this be restarts, query should be cancelled.
// Check zero since during upgrading, older version oplog will not persistent be start time
// so newer version follower will get zero epoch when replaying oplog or snapshot
if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) {
LOG.warn("Backend process epoch changed, previous {} now {}, "
+ "means this be has already restarted, should cancel this coordinator,"
+ " query id {}",
pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
DebugUtil.printId(queryId));
return true;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?",
be.toString());
}
}
} else {
// beToExecStates will be updated only in non-pipeline query.
for (BackendExecStates beExecState : beToExecStates.values()) {
Backend be = curBeMap.get(beExecState.beId);
if (be == null || !be.isAlive()) {
LOG.warn("Backend {} not exists or dead, query {} should be cancelled.",
beExecState.beId, DebugUtil.printId(queryId));
return true;
}
if (beExecState.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) {
LOG.warn("Process epoch changed, previous {} now {}, means this be has already restarted, "
+ "should cancel this coordinator, query id {}",
beExecState.beProcessEpoch, be.getProcessEpoch(), DebugUtil.printId(queryId));
return true;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", be.toString());
}
}
}
return false;
} finally {
unlock();
}
}
// Cancel execution of query. This includes the execution of the local plan
// fragment,
// if any, as well as all plan fragments on remote nodes.
@ -1268,7 +1333,7 @@ public class Coordinator {
} else {
queryStatus.setStatus(Status.CANCELLED);
}
LOG.warn("cancel execution of query, this is outside invoke");
LOG.warn("Cancel execution of query {}, this is a outside invoke", DebugUtil.printId(queryId));
cancelInternal(cancelReason);
} finally {
unlock();
@ -1307,8 +1372,9 @@ public class Coordinator {
instanceIds.clear();
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
if (LOG.isDebugEnabled()) {
LOG.debug("fragment {} has instances {}",
params.fragment.getFragmentId(), params.instanceExecParams.size());
LOG.debug("Query {} fragment {} has {} instances.",
DebugUtil.printId(queryId), params.fragment.getFragmentId(),
params.instanceExecParams.size());
}
for (int j = 0; j < params.instanceExecParams.size(); ++j) {
@ -2823,6 +2889,7 @@ public class Coordinator {
Backend backend;
long lastMissingHeartbeatTime = -1;
long profileReportProgress = 0;
long beProcessEpoch = 0;
private final int numInstances;
public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId,
@ -2842,6 +2909,7 @@ public class Coordinator {
this.backend = idToBackend.get(backendId);
this.address = new TNetworkAddress(backend.getHost(), backend.getBePort());
this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
this.beProcessEpoch = backend.getProcessEpoch();
this.hasCanceled = false;
this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime();
@ -2898,13 +2966,16 @@ public class Coordinator {
// return true if cancel success. Otherwise, return false
public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
if (!this.initiated) {
LOG.warn("Query {}, ccancel before initiated", DebugUtil.printId(queryId));
return false;
}
// don't cancel if it is already finished
if (this.done) {
LOG.warn("Query {}, cancel after finished", DebugUtil.printId(queryId));
return false;
}
if (this.hasCanceled) {
LOG.warn("Query {}, cancel after cancelled", DebugUtil.printId(queryId));
return false;
}
for (TPipelineInstanceParams localParam : rpcParams.local_params) {
@ -2987,11 +3058,13 @@ public class Coordinator {
List<BackendExecState> states = Lists.newArrayList();
boolean twoPhaseExecution = false;
ScopedSpan scopedSpan = new ScopedSpan();
long beProcessEpoch = 0;
public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution) {
public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution, long beProcessEpoch) {
this.beId = beId;
this.brpcAddr = brpcAddr;
this.twoPhaseExecution = twoPhaseExecution;
this.beProcessEpoch = beProcessEpoch;
}
public void addState(BackendExecState state) {

View File

@ -23,6 +23,7 @@ import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TUniqueId;
import java.util.List;
import java.util.Map;
public interface QeProcessor {
@ -42,4 +43,6 @@ public interface QeProcessor {
String getCurrentQueryByQueryId(TUniqueId queryId);
Coordinator getCoordinator(TUniqueId queryId);
List<Coordinator> getAllCoordinators();
}

View File

@ -37,6 +37,8 @@ import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@ -76,6 +78,16 @@ public final class QeProcessorImpl implements QeProcessor {
return null;
}
@Override
public List<Coordinator> getAllCoordinators() {
List<Coordinator> res = new ArrayList<>();
for (QueryInfo co : coordinatorMap.values()) {
res.add(co.coord);
}
return res;
}
@Override
public void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException {
registerQuery(queryId, new QueryInfo(coord));

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.proto.Types;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import java.util.List;
public class QueryCancelWorker extends MasterDaemon {
private SystemInfoService systemInfoService;
public QueryCancelWorker(SystemInfoService systemInfoService) {
this.systemInfoService = systemInfoService;
}
@Override
protected void runAfterCatalogReady() {
List<Backend> allBackends = systemInfoService.getAllBackends();
for (Coordinator co : QeProcessorImpl.INSTANCE.getAllCoordinators()) {
if (co.shouldCancel(allBackends)) {
// TODO(zhiqiang): We need more clear cancel message, so that user can figure out what happened
// by searching log.
co.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
}
}
}
}

View File

@ -325,6 +325,12 @@ public class Backend implements Writable {
return lastMissingHeartbeatTime;
}
// Backend process epoch, is uesd to tag a beckend process
// Currently it is always equal to be start time, even during oplog replay.
public long getProcessEpoch() {
return lastStartTime;
}
public boolean isAlive() {
return this.isAlive.get();
}
@ -627,7 +633,8 @@ public class Backend implements Writable {
@Override
public String toString() {
return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get()
+ ", lastStartTime=" + TimeUtils.longToTimeString(lastStartTime) + ", tags: " + tagMap + "]";
+ ", lastStartTime=" + TimeUtils.longToTimeString(lastStartTime) + ", process epoch=" + lastStartTime
+ ", tags: " + tagMap + "]";
}
public String getHealthyStatus() {
@ -678,13 +685,21 @@ public class Backend implements Writable {
this.lastUpdateMs = hbResponse.getHbTime();
if (!isAlive.get()) {
isChanged = true;
LOG.info("{} is back to alive, update start time from {} to {}, "
+ "update be epoch from {} to {}.", this.toString(),
TimeUtils.longToTimeString(lastStartTime),
TimeUtils.longToTimeString(hbResponse.getBeStartTime()),
lastStartTime, hbResponse.getBeStartTime());
this.lastStartTime = hbResponse.getBeStartTime();
LOG.info("{} is back to alive", this.toString());
this.isAlive.set(true);
}
if (this.lastStartTime != hbResponse.getBeStartTime() && hbResponse.getBeStartTime() > 0) {
LOG.info("{} update last start time to {}", this.toString(), hbResponse.getBeStartTime());
LOG.info("{} update last start time from {} to {}, "
+ "update be epoch from {} to {}.", this.toString(),
TimeUtils.longToTimeString(lastStartTime),
TimeUtils.longToTimeString(hbResponse.getBeStartTime()),
lastStartTime, hbResponse.getBeStartTime());
this.lastStartTime = hbResponse.getBeStartTime();
isChanged = true;
}

View File

@ -39,7 +39,11 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable {
private int brpcPort;
@SerializedName(value = "nodeRole")
private String nodeRole = Tag.VALUE_MIX;
private long beStartTime;
// We need to broadcast be start time to all frontends,
// it will be used to check if query on this backend should be canceled.
@SerializedName(value = "beStartTime")
private long beStartTime = 0;
private String host;
private String version = "";
@SerializedName(value = "isShutDown")

View File

@ -47,7 +47,7 @@ struct TBackendInfo {
3: optional Types.TPort be_rpc_port
4: optional Types.TPort brpc_port
5: optional string version
6: optional i64 be_start_time
6: optional i64 be_start_time // This field will also be uesd to identify a be process
7: optional string be_node_role
8: optional bool is_shutdown
}