[Fix](executor)Fix when Fe send empty wg list to be may cause query failed. (#34074)

This commit is contained in:
wangbo
2024-04-24 23:36:17 +08:00
committed by yiguolei
parent 450f443413
commit 5f2d0e3d53
8 changed files with 114 additions and 57 deletions

View File

@ -1050,16 +1050,6 @@ public class Env {
}
queryCancelWorker.start();
TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
topicPublisherThread.addToTopicPublisherList(wgPublisher);
WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this);
topicPublisherThread.addToTopicPublisherList(wpPublisher);
topicPublisherThread.start();
workloadGroupMgr.startUpdateThread();
workloadSchedPolicyMgr.start();
workloadRuntimeStatusMgr.start();
}
// wait until FE is ready.
@ -1693,6 +1683,13 @@ public class Env {
binlogGcer.start();
columnIdFlusher.start();
insertOverwriteManager.start();
TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
topicPublisherThread.addToTopicPublisherList(wgPublisher);
WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this);
topicPublisherThread.addToTopicPublisherList(wpPublisher);
topicPublisherThread.start();
}
// start threads that should running on all FE
@ -1713,6 +1710,11 @@ public class Env {
}
dnsCache.start();
workloadGroupMgr.startUpdateThread();
workloadSchedPolicyMgr.start();
workloadRuntimeStatusMgr.start();
}
private void transferToNonMaster(FrontendNodeType newType) {

View File

@ -26,6 +26,8 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TopicInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -34,6 +36,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class TopicPublisherThread extends MasterDaemon {
@ -59,7 +62,7 @@ public class TopicPublisherThread extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
LOG.info("begin publish topic info");
LOG.info("[topic_publish]begin publish topic info");
// step 1: get all publish topic info
TPublishTopicRequest request = new TPublishTopicRequest();
for (TopicPublisher topicPublisher : topicPublisherList) {
@ -106,16 +109,24 @@ public class TopicPublisherThread extends MasterDaemon {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
String logStr = "";
try {
for (Map.Entry<TTopicInfoType, List<TopicInfo>> entry : request.getTopicMap().entrySet()) {
logStr += " " + entry.getKey() + "=" + entry.getValue().size() + " ";
}
} catch (Exception e) {
LOG.warn("[topic_publish]make log detail for publish failed:", e);
}
try {
address = new TNetworkAddress(be.getHost(), be.getBePort());
client = ClientPool.backendPool.borrowObject(address);
client.publishTopicInfo(request);
ok = true;
LOG.info("publish topic info to be {} success, time cost={} ms",
be.getHost(), (System.currentTimeMillis() - beginTime));
LOG.info("[topic_publish]publish topic info to be {} success, time cost={} ms, details:{}",
be.getHost(), (System.currentTimeMillis() - beginTime), logStr);
} catch (Exception e) {
LOG.warn("publish topic info to be {} error happens: , time cost={} ms",
be.getHost(), (System.currentTimeMillis() - beginTime), e);
LOG.warn("[topic_publish]publish topic info to be {} error happens: , time cost={} ms, details:{}",
be.getHost(), (System.currentTimeMillis() - beginTime), logStr, e);
} finally {
try {
if (ok) {
@ -124,7 +135,8 @@ public class TopicPublisherThread extends MasterDaemon {
ClientPool.backendPool.invalidateObject(address, client);
}
} catch (Throwable e) {
LOG.warn("recycle topic publish client failed. related backend[{}]", be.getHost(), e);
LOG.warn("[topic_publish]recycle topic publish client failed. related backend[{}]", be.getHost(),
e);
}
handler.onResponse(be);
}

View File

@ -18,14 +18,20 @@
package org.apache.doris.common.publish;
import org.apache.doris.catalog.Env;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TopicInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
public class WorkloadGroupPublisher implements TopicPublisher {
private static final Logger LOG = LogManager.getLogger(WorkloadGroupPublisher.class);
private Env env;
public WorkloadGroupPublisher(Env env) {
@ -35,6 +41,12 @@ public class WorkloadGroupPublisher implements TopicPublisher {
@Override
public void getTopicInfo(TPublishTopicRequest req) {
List<TopicInfo> list = env.getWorkloadGroupMgr().getPublishTopicInfo();
req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list);
if (list.size() == 0) {
LOG.warn("[topic_publish]currently, doris at least has one workload group named "
+ WorkloadGroupMgr.DEFAULT_GROUP_NAME
+ ", so get a size 0 here is an error, should check it.");
} else {
req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list);
}
}
}