From a38b97fbdd5769386591c82e84cbb665341524c9 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Mon, 8 Apr 2024 22:25:24 +0800 Subject: [PATCH] [bugfix](profile) should use backend ip:heartbeat port as key during merge profile (#33368) --- .../common/profile/ExecutionProfile.java | 19 ++++++++++++++++--- .../java/org/apache/doris/qe/Coordinator.java | 3 ++- .../org/apache/doris/qe/QeProcessorImpl.java | 2 +- .../java/org/apache/doris/system/Backend.java | 9 +++++++++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index f339da8292..d1db6ff43e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -17,12 +17,14 @@ package org.apache.doris.common.profile; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.system.Backend; import org.apache.doris.thrift.TDetailedReportParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TReportExecStatusParams; @@ -243,13 +245,24 @@ public class ExecutionProfile { } } - public void updateProfile(TReportExecStatusParams params, TNetworkAddress address) { + public void updateProfile(TReportExecStatusParams params) { + Backend backend = null; + if (params.isSetBackendId()) { + backend = Env.getCurrentSystemInfo().getBackend(params.getBackendId()); + if (backend == null) { + LOG.warn("could not find backend with id {}", params.getBackendId()); + return; + } + } else { + LOG.warn("backend id is not set in report profile request, bad message"); + return; + } if (isPipelineXProfile) { int pipelineIdx = 0; List taskProfile = Lists.newArrayList(); for (TDetailedReportParams param : params.detailed_report) { String name = "Pipeline :" + pipelineIdx + " " - + " (host=" + address + ")"; + + " (host=" + backend.getHeartbeatAddress() + ")"; RuntimeProfile profile = new RuntimeProfile(name); taskProfile.add(profile); if (param.isSetProfile()) { @@ -266,7 +279,7 @@ public class ExecutionProfile { if (params.isSetLoadChannelProfile()) { loadChannelProfile.update(params.loadChannelProfile); } - multiBeProfile.get(params.fragment_id).put(address, taskProfile); + multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(), taskProfile); } else { PlanFragmentId fragmentId = instanceIdToFragmentId.get(params.fragment_instance_id); if (fragmentId == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index e05eff2c4e..9ac96c27c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3065,7 +3065,8 @@ public class Coordinator implements CoordInterface { this.backend = idToBackend.get(addressToBackendID.get(address)); this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); - String profileName = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")"; + String profileName = "Instance " + DebugUtil.printId( + fi.instanceId) + " (host=" + this.backend.getHeartbeatAddress() + ")"; RuntimeProfile instanceProfile = new RuntimeProfile(profileName); executionProfile.addInstanceProfile(fragmentId, fi.instanceId, instanceProfile); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index a62f1b66f0..a4cd867a31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -219,7 +219,7 @@ public final class QeProcessorImpl implements QeProcessor { writeProfileExecutor.submit(new Runnable() { @Override public void run() { - executionProfile.updateProfile(params, beAddr); + executionProfile.updateProfile(params); } }); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 1194204c94..1e84273678 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -178,6 +178,11 @@ public class Backend implements Writable { return id; } + // Return ip:heartbeat port + public String getAddress() { + return host + ":" + heartbeatPort; + } + public String getHost() { return host; } @@ -817,6 +822,10 @@ public class Backend implements Writable { return new TNetworkAddress(getHost(), getBrpcPort()); } + public TNetworkAddress getHeartbeatAddress() { + return new TNetworkAddress(getHost(), getHeartbeatPort()); + } + public TNetworkAddress getArrowFlightAddress() { return new TNetworkAddress(getHost(), getArrowFlightSqlPort()); }