[Improvement](pipeline) Cancel outdated query if original fe restarts (#23582)

If any FE restarts, queries that is emitted from this FE will be cancelled.

Implementation of #23704
This commit is contained in:
hzq
2023-08-31 17:58:52 +08:00
committed by GitHub
parent f214485733
commit c083336bbe
28 changed files with 449 additions and 88 deletions

View File

@ -241,6 +241,7 @@ import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.PriorityMasterTaskExecutor;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TFrontendInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
@ -477,6 +478,19 @@ public class Env {
private HiveTransactionMgr hiveTransactionMgr;
public List<TFrontendInfo> getFrontendInfos() {
List<TFrontendInfo> res = new ArrayList<>();
for (Frontend fe : frontends.values()) {
TFrontendInfo feInfo = new TFrontendInfo();
feInfo.setCoordinatorAddress(new TNetworkAddress(fe.getHost(), fe.getRpcPort()));
feInfo.setProcessUuid(fe.getProcessUUID());
res.add(feInfo);
}
return res;
}
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all

View File

@ -72,6 +72,7 @@ import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@ -399,6 +400,7 @@ public class Coordinator {
this.queryOptions.setQueryTimeout(context.getExecTimeout());
this.queryOptions.setExecutionTimeout(context.getExecTimeout());
this.queryOptions.setEnableScanNodeRunSerial(context.getSessionVariable().isEnableScanRunSerial());
this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
}
public ConnectContext getConnectContext() {

View File

@ -30,14 +30,14 @@ public class ExecuteEnv {
private static volatile ExecuteEnv INSTANCE;
private MultiLoadMgr multiLoadMgr;
private ConnectScheduler scheduler;
private long startupTime;
private long processUUID;
private List<FeDiskInfo> diskInfos;
private ExecuteEnv() {
multiLoadMgr = new MultiLoadMgr();
scheduler = new ConnectScheduler(Config.qe_max_connection);
startupTime = System.currentTimeMillis();
processUUID = System.currentTimeMillis();
diskInfos = new ArrayList<FeDiskInfo>() {{
add(new FeDiskInfo("meta", Config.meta_dir, DiskUtils.df(Config.meta_dir)));
add(new FeDiskInfo("log", Config.sys_log_dir, DiskUtils.df(Config.sys_log_dir)));
@ -65,8 +65,8 @@ public class ExecuteEnv {
return multiLoadMgr;
}
public long getStartupTime() {
return startupTime;
public long getProcessUUID() {
return processUUID;
}
public List<FeDiskInfo> getDiskInfos() {

View File

@ -1933,6 +1933,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
txnState.addTableIndexes(table);
plan.setTableName(table.getName());
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
return plan;
} finally {
table.readUnlock();
@ -2033,7 +2034,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
result.setQueryPort(Config.query_port);
result.setRpcPort(Config.rpc_port);
result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
result.setLastStartupTime(exeEnv.getStartupTime());
result.setLastStartupTime(exeEnv.getProcessUUID());
if (exeEnv.getDiskInfos() != null) {
result.setDiskInfos(FeDiskInfo.toThrifts(exeEnv.getDiskInfos()));
}

View File

@ -60,6 +60,8 @@ public class Frontend implements Writable {
private boolean isAlive = false;
private long processUUID = 0;
public Frontend() {
}
@ -122,6 +124,10 @@ public class Frontend implements Writable {
return lastStartupTime;
}
public long getProcessUUID() {
return processUUID;
}
public long getLastUpdateTime() {
return lastUpdateTime;
}
@ -150,10 +156,16 @@ public class Frontend implements Writable {
replayedJournalId = hbResponse.getReplayedJournalId();
lastUpdateTime = hbResponse.getHbTime();
heartbeatErrMsg = "";
lastStartupTime = hbResponse.getFeStartTime();
lastStartupTime = hbResponse.getProcessUUID();
diskInfos = hbResponse.getDiskInfos();
isChanged = true;
processUUID = lastStartupTime;
} else {
// A non-master node disconnected.
// Set startUUID to zero, and be's heartbeat mgr will ignore this hb,
// so that its cancel worker will not cancel queries from this fe immediately
// until it receives a valid start UUID.
processUUID = 0;
if (isAlive) {
isAlive = false;
isChanged = true;

View File

@ -19,6 +19,7 @@ package org.apache.doris.system;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FeDiskInfo;
import com.google.gson.annotations.SerializedName;
@ -41,7 +42,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
@SerializedName(value = "replayedJournalId")
private long replayedJournalId;
private String version;
private long feStartTime;
private long processUUID;
private List<FeDiskInfo> diskInfos;
public FrontendHbResponse() {
@ -50,7 +51,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
public FrontendHbResponse(String name, int queryPort, int rpcPort,
long replayedJournalId, long hbTime, String version,
long feStartTime, List<FeDiskInfo> diskInfos) {
long processUUID, List<FeDiskInfo> diskInfos) {
super(HeartbeatResponse.Type.FRONTEND);
this.status = HbStatus.OK;
this.name = name;
@ -59,7 +60,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
this.replayedJournalId = replayedJournalId;
this.hbTime = hbTime;
this.version = version;
this.feStartTime = feStartTime;
this.processUUID = processUUID;
this.diskInfos = diskInfos;
}
@ -68,6 +69,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
this.status = HbStatus.BAD;
this.name = name;
this.msg = errMsg;
this.processUUID = ExecuteEnv.getInstance().getProcessUUID();
}
public String getName() {
@ -90,8 +92,8 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
return version;
}
public long getFeStartTime() {
return feStartTime;
public long getProcessUUID() {
return processUUID;
}
public List<FeDiskInfo> getDiskInfos() {
@ -116,7 +118,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
sb.append(", queryPort: ").append(queryPort);
sb.append(", rpcPort: ").append(rpcPort);
sb.append(", replayedJournalId: ").append(replayedJournalId);
sb.append(", festartTime: ").append(feStartTime);
sb.append(", festartTime: ").append(processUUID);
return sb.toString();
}

View File

@ -39,6 +39,7 @@ import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPingBrokerRequest;
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.TFrontendInfo;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
@ -101,11 +102,12 @@ public class HeartbeatMgr extends MasterDaemon {
*/
@Override
protected void runAfterCatalogReady() {
// Get feInfos of previous iteration.
List<TFrontendInfo> feInfos = Env.getCurrentEnv().getFrontendInfos();
List<Future<HeartbeatResponse>> hbResponses = Lists.newArrayList();
// send backend heartbeat
for (Backend backend : nodeMgr.getIdToBackend().values()) {
BackendHeartbeatHandler handler = new BackendHeartbeatHandler(backend);
BackendHeartbeatHandler handler = new BackendHeartbeatHandler(backend, feInfos);
hbResponses.add(executor.submit(handler));
}
@ -204,9 +206,11 @@ public class HeartbeatMgr extends MasterDaemon {
// backend heartbeat
private class BackendHeartbeatHandler implements Callable<HeartbeatResponse> {
private Backend backend;
private List<TFrontendInfo> feInfos;
public BackendHeartbeatHandler(Backend backend) {
public BackendHeartbeatHandler(Backend backend, List<TFrontendInfo> feInfos) {
this.backend = backend;
this.feInfos = feInfos;
}
@Override
@ -222,6 +226,7 @@ public class HeartbeatMgr extends MasterDaemon {
long flags = heartbeatFlags.getHeartbeatFlags();
copiedMasterInfo.setHeartbeatFlags(flags);
copiedMasterInfo.setBackendId(backendId);
copiedMasterInfo.setFrontendInfos(feInfos);
THeartbeatResult result;
if (!FeConstants.runningUnitTest) {
client = ClientPool.backendHeartbeatPool.borrowObject(beAddr);
@ -301,7 +306,7 @@ public class HeartbeatMgr extends MasterDaemon {
return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port,
Env.getCurrentEnv().getMaxJournalId(), System.currentTimeMillis(),
Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH,
ExecuteEnv.getInstance().getStartupTime(), ExecuteEnv.getInstance().getDiskInfos());
ExecuteEnv.getInstance().getProcessUUID(), ExecuteEnv.getInstance().getDiskInfos());
} else {
return new FrontendHbResponse(fe.getNodeName(), "not ready");
}