[bugfix](profile) should use backend ip:heartbeat port as key during merge profile (#33368)
This commit is contained in:
@ -17,12 +17,14 @@
|
||||
|
||||
package org.apache.doris.common.profile;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.PlanFragmentId;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.TDetailedReportParams;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TReportExecStatusParams;
|
||||
@ -243,13 +245,24 @@ public class ExecutionProfile {
|
||||
}
|
||||
}
|
||||
|
||||
public void updateProfile(TReportExecStatusParams params, TNetworkAddress address) {
|
||||
public void updateProfile(TReportExecStatusParams params) {
|
||||
Backend backend = null;
|
||||
if (params.isSetBackendId()) {
|
||||
backend = Env.getCurrentSystemInfo().getBackend(params.getBackendId());
|
||||
if (backend == null) {
|
||||
LOG.warn("could not find backend with id {}", params.getBackendId());
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
LOG.warn("backend id is not set in report profile request, bad message");
|
||||
return;
|
||||
}
|
||||
if (isPipelineXProfile) {
|
||||
int pipelineIdx = 0;
|
||||
List<RuntimeProfile> taskProfile = Lists.newArrayList();
|
||||
for (TDetailedReportParams param : params.detailed_report) {
|
||||
String name = "Pipeline :" + pipelineIdx + " "
|
||||
+ " (host=" + address + ")";
|
||||
+ " (host=" + backend.getHeartbeatAddress() + ")";
|
||||
RuntimeProfile profile = new RuntimeProfile(name);
|
||||
taskProfile.add(profile);
|
||||
if (param.isSetProfile()) {
|
||||
@ -266,7 +279,7 @@ public class ExecutionProfile {
|
||||
if (params.isSetLoadChannelProfile()) {
|
||||
loadChannelProfile.update(params.loadChannelProfile);
|
||||
}
|
||||
multiBeProfile.get(params.fragment_id).put(address, taskProfile);
|
||||
multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(), taskProfile);
|
||||
} else {
|
||||
PlanFragmentId fragmentId = instanceIdToFragmentId.get(params.fragment_instance_id);
|
||||
if (fragmentId == null) {
|
||||
|
||||
@ -3065,7 +3065,8 @@ public class Coordinator implements CoordInterface {
|
||||
this.backend = idToBackend.get(addressToBackendID.get(address));
|
||||
this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
|
||||
this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime();
|
||||
String profileName = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")";
|
||||
String profileName = "Instance " + DebugUtil.printId(
|
||||
fi.instanceId) + " (host=" + this.backend.getHeartbeatAddress() + ")";
|
||||
RuntimeProfile instanceProfile = new RuntimeProfile(profileName);
|
||||
executionProfile.addInstanceProfile(fragmentId, fi.instanceId, instanceProfile);
|
||||
}
|
||||
|
||||
@ -219,7 +219,7 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
writeProfileExecutor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
executionProfile.updateProfile(params, beAddr);
|
||||
executionProfile.updateProfile(params);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
|
||||
@ -178,6 +178,11 @@ public class Backend implements Writable {
|
||||
return id;
|
||||
}
|
||||
|
||||
// Return ip:heartbeat port
|
||||
public String getAddress() {
|
||||
return host + ":" + heartbeatPort;
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
@ -817,6 +822,10 @@ public class Backend implements Writable {
|
||||
return new TNetworkAddress(getHost(), getBrpcPort());
|
||||
}
|
||||
|
||||
public TNetworkAddress getHeartbeatAddress() {
|
||||
return new TNetworkAddress(getHost(), getHeartbeatPort());
|
||||
}
|
||||
|
||||
public TNetworkAddress getArrowFlightAddress() {
|
||||
return new TNetworkAddress(getHost(), getArrowFlightSqlPort());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user