From a8139d9f39b057ea823d5e501a193541bbfe992c Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 25 Jul 2024 14:43:52 +0800 Subject: [PATCH] [Improvement](profile) Provide more info for schedule time (#38290) (#38338) ## Proposed changes pick #38290 --- be/src/service/internal_service.cpp | 21 +++++++ .../doris/common/profile/SummaryProfile.java | 63 +++++++++++++++++++ .../java/org/apache/doris/qe/Coordinator.java | 39 +++++++++--- gensrc/proto/internal_service.proto | 6 ++ 4 files changed, 120 insertions(+), 9 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 3c944ac27d..bd88c432b5 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -301,6 +301,9 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { + timeval tv {}; + gettimeofday(&tv, nullptr); + response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000); bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { _exec_plan_fragment_in_pthread(controller, request, response, done); }); @@ -313,6 +316,9 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c void PInternalServiceImpl::_exec_plan_fragment_in_pthread( google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { + timeval tv1 {}; + gettimeofday(&tv1, nullptr); + response->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000); brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); bool compact = request->has_compact() ? request->compact() : false; @@ -330,12 +336,18 @@ void PInternalServiceImpl::_exec_plan_fragment_in_pthread( LOG(WARNING) << "exec plan fragment failed, errmsg=" << st; } st.to_protobuf(response->mutable_status()); + timeval tv2 {}; + gettimeofday(&tv2, nullptr); + response->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000); } void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { + timeval tv {}; + gettimeofday(&tv, nullptr); + response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000); bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { _exec_plan_fragment_in_pthread(controller, request, response, done); }); @@ -349,10 +361,19 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result, google::protobuf::Closure* done) { + timeval tv {}; + gettimeofday(&tv, nullptr); + result->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000); bool ret = _light_work_pool.try_offer([this, request, result, done]() { + timeval tv1 {}; + gettimeofday(&tv1, nullptr); + result->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000); brpc::ClosureGuard closure_guard(done); auto st = _exec_env->fragment_mgr()->start_query_execution(request); st.to_protobuf(result->mutable_status()); + timeval tv2 {}; + gettimeofday(&tv2, nullptr); + result->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000); }); if (!ret) { offer_failed(result, done, _light_work_pool); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index 822b46272a..8d32b5bd21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -19,13 +19,18 @@ package org.apache.doris.common.profile; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUnit; import org.apache.doris.transaction.TransactionType; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.gson.Gson; +import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -51,6 +56,7 @@ public class SummaryProfile { public static final String IS_PIPELINE = "Is Pipeline"; public static final String TOTAL_INSTANCES_NUM = "Total Instances Num"; public static final String INSTANCES_NUM_PER_BE = "Instances Num Per BE"; + public static final String SCHEDULE_TIME_PER_BE = "Schedule Time Of BE"; public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num"; public static final String TRACE_ID = "Trace ID"; public static final String WORKLOAD_GROUP = "Workload Group"; @@ -96,6 +102,10 @@ public class SummaryProfile { public static final String HMS_ADD_PARTITION_CNT = "HMS Add Partition Count"; public static final String HMS_UPDATE_PARTITION_TIME = "HMS Update Partition Time"; public static final String HMS_UPDATE_PARTITION_CNT = "HMS Update Partition Count"; + public static final String LATENCY_FROM_FE_TO_BE = "RPC Latency From FE To BE"; + public static final String RPC_QUEUE_TIME = "RPC Work Queue Time"; + public static final String RPC_WORK_TIME = "RPC Work Time"; + public static final String LATENCY_FROM_BE_TO_FE = "RPC Latency From BE To FE"; // These info will display on FE's web ui table, every one will be displayed as // a column, so that should not @@ -129,6 +139,7 @@ public class SummaryProfile { SEND_FRAGMENT_PHASE2_TIME, FRAGMENT_COMPRESSED_SIZE, FRAGMENT_RPC_COUNT, + SCHEDULE_TIME_PER_BE, WAIT_FETCH_RESULT_TIME, FETCH_RESULT_TIME, WRITE_RESULT_TIME, @@ -231,6 +242,10 @@ public class SummaryProfile { private long filesystemDeleteFileCnt = 0; private long filesystemDeleteDirCnt = 0; private TransactionType transactionType = TransactionType.UNKNOWN; + // BE -> (RPC latency from FE to BE, Execution latency on bthread, Duration of doing work, RPC latency from BE + // to FE) + private Map> rpcPhase1Latency; + private Map> rpcPhase2Latency; public SummaryProfile() { summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME); @@ -319,6 +334,7 @@ public class SummaryProfile { getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(SCHEDULE_TIME, getPrettyTime(queryScheduleFinishTime, queryPlanFinishTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(SCHEDULE_TIME_PER_BE, getRpcLatency()); executionSummaryProfile.addInfoString(ASSIGN_FRAGMENT_TIME, getPrettyTime(assignFragmentTime, queryPlanFinishTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(FRAGMENT_SERIALIZE_TIME, @@ -494,6 +510,14 @@ public class SummaryProfile { this.fragmentRpcCount += count; } + public void setRpcPhase1Latency(Map> rpcPhase1Latency) { + this.rpcPhase1Latency = rpcPhase1Latency; + } + + public void setRpcPhase2Latency(Map> rpcPhase2Latency) { + this.rpcPhase2Latency = rpcPhase2Latency; + } + public static class SummaryBuilder { private Map map = Maps.newHashMap(); @@ -683,4 +707,43 @@ public class SummaryProfile { public void incDeleteFileCnt() { this.filesystemDeleteFileCnt += 1; } + + private String getRpcLatency() { + Map>> jsonObject = new HashMap<>(); + if (rpcPhase1Latency != null) { + Map> latencyForPhase1 = new HashMap<>(); + for (TNetworkAddress key : rpcPhase1Latency.keySet()) { + Preconditions.checkState(rpcPhase1Latency.get(key).size() == 4, "rpc latency should have 4 elements"); + Map latency = new HashMap<>(); + latency.put(LATENCY_FROM_FE_TO_BE, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(0), + TUnit.TIME_MS)); + latency.put(RPC_QUEUE_TIME, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(1), + TUnit.TIME_MS)); + latency.put(RPC_WORK_TIME, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(2), + TUnit.TIME_MS)); + latency.put(LATENCY_FROM_BE_TO_FE, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(3), + TUnit.TIME_MS)); + latencyForPhase1.put(key.getHostname() + ": " + key.getPort(), latency); + } + jsonObject.put("phase1", latencyForPhase1); + } + if (rpcPhase2Latency != null) { + Map> latencyForPhase2 = new HashMap<>(); + for (TNetworkAddress key : rpcPhase2Latency.keySet()) { + Preconditions.checkState(rpcPhase2Latency.get(key).size() == 4, "rpc latency should have 4 elements"); + Map latency = new HashMap<>(); + latency.put(LATENCY_FROM_FE_TO_BE, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(0), + TUnit.TIME_MS)); + latency.put(RPC_QUEUE_TIME, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(1), + TUnit.TIME_MS)); + latency.put(RPC_WORK_TIME, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(2), + TUnit.TIME_MS)); + latency.put(LATENCY_FROM_BE_TO_FE, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(3), + TUnit.TIME_MS)); + latencyForPhase2.put(key.getHostname() + ": " + key.getPort(), latency); + } + jsonObject.put("phase2", latencyForPhase2); + } + return new Gson().toJson(jsonObject); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 2904cfdb5e..a4c2678658 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -140,6 +140,7 @@ import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; import org.jetbrains.annotations.NotNull; +import org.joda.time.DateTime; import java.security.SecureRandom; import java.time.LocalDateTime; @@ -1045,28 +1046,35 @@ public class Coordinator implements CoordInterface { updateProfileIfPresent(profile -> profile.setFragmentSerializeTime()); // 4.2 send fragments rpc - List>> - futures = Lists.newArrayList(); + List>>> futures = Lists.newArrayList(); BackendServiceProxy proxy = BackendServiceProxy.getInstance(); for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) { if (LOG.isDebugEnabled()) { LOG.debug(ctxs.debugInfo()); } - futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy))); + futures.add(Pair.of(DateTime.now().getMillis(), + ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy)))); } - waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments"); + Map> rpcPhase1Latency = + waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments"); updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size())); updateProfileIfPresent(profile -> profile.setFragmentSendPhase1Time()); + updateProfileIfPresent(profile -> profile.setRpcPhase1Latency(rpcPhase1Latency)); if (twoPhaseExecution) { // 5. send and wait execution start rpc futures.clear(); for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) { - futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy))); + futures.add(Pair.of(DateTime.now().getMillis(), + ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy)))); } - waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start"); + Map> rpcPhase2Latency = + waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), + "send execution start"); updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size())); updateProfileIfPresent(profile -> profile.setFragmentSendPhase2Time()); + updateProfileIfPresent(profile -> profile.setRpcPhase2Latency(rpcPhase2Latency)); } } finally { unlock(); @@ -1150,8 +1158,8 @@ public class Coordinator implements CoordInterface { } } - private void waitPipelineRpc(List>> futures, long leftTimeMs, + private Map> waitPipelineRpc(List>>> futures, long leftTimeMs, String operation) throws RpcException, UserException { if (leftTimeMs <= 0) { long currentTimeMillis = System.currentTimeMillis(); @@ -1172,14 +1180,26 @@ public class Coordinator implements CoordInterface { throw new UserException(msg); } + // BE -> (RPC latency from FE to BE, Execution latency on bthread, Duration of doing work, RPC latency from BE + // to FE) + Map> beToPrepareLatency = new HashMap<>(); long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms); - for (Triple> triple : futures) { + for (Pair>> pair : futures) { + Triple> triple = pair.second; TStatusCode code; String errMsg = null; Exception exception = null; try { PExecPlanFragmentResult result = triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS); + long rpcDone = DateTime.now().getMillis(); + beToPrepareLatency.put(triple.getLeft().brpcAddr, + Lists.newArrayList(result.getReceivedTime() - pair.first, + result.getExecutionTime() - result.getReceivedTime(), + result.getExecutionDoneTime() - result.getExecutionTime(), + rpcDone - result.getExecutionDoneTime())); code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code == null) { code = TStatusCode.INTERNAL_ERROR; @@ -1229,6 +1249,7 @@ public class Coordinator implements CoordInterface { } } } + return beToPrepareLatency; } public List getExportFiles() { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 14a165a3b9..a0b0aac6e5 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -228,6 +228,12 @@ message PExecPlanFragmentStartRequest { message PExecPlanFragmentResult { required PStatus status = 1; + // BE receive rpc + optional int64 received_time = 2; + // Start executing on bthread + optional int64 execution_time = 3; + // Done on bthread + optional int64 execution_done_time = 4; }; message PCancelPlanFragmentRequest {