## Proposed changes pick #38290 <!--Describe your changes.-->
This commit is contained in:
@ -19,13 +19,18 @@ package org.apache.doris.common.profile;
|
||||
|
||||
import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TUnit;
|
||||
import org.apache.doris.transaction.TransactionType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@ -51,6 +56,7 @@ public class SummaryProfile {
|
||||
public static final String IS_PIPELINE = "Is Pipeline";
|
||||
public static final String TOTAL_INSTANCES_NUM = "Total Instances Num";
|
||||
public static final String INSTANCES_NUM_PER_BE = "Instances Num Per BE";
|
||||
public static final String SCHEDULE_TIME_PER_BE = "Schedule Time Of BE";
|
||||
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num";
|
||||
public static final String TRACE_ID = "Trace ID";
|
||||
public static final String WORKLOAD_GROUP = "Workload Group";
|
||||
@ -96,6 +102,10 @@ public class SummaryProfile {
|
||||
public static final String HMS_ADD_PARTITION_CNT = "HMS Add Partition Count";
|
||||
public static final String HMS_UPDATE_PARTITION_TIME = "HMS Update Partition Time";
|
||||
public static final String HMS_UPDATE_PARTITION_CNT = "HMS Update Partition Count";
|
||||
public static final String LATENCY_FROM_FE_TO_BE = "RPC Latency From FE To BE";
|
||||
public static final String RPC_QUEUE_TIME = "RPC Work Queue Time";
|
||||
public static final String RPC_WORK_TIME = "RPC Work Time";
|
||||
public static final String LATENCY_FROM_BE_TO_FE = "RPC Latency From BE To FE";
|
||||
|
||||
// These info will display on FE's web ui table, every one will be displayed as
|
||||
// a column, so that should not
|
||||
@ -129,6 +139,7 @@ public class SummaryProfile {
|
||||
SEND_FRAGMENT_PHASE2_TIME,
|
||||
FRAGMENT_COMPRESSED_SIZE,
|
||||
FRAGMENT_RPC_COUNT,
|
||||
SCHEDULE_TIME_PER_BE,
|
||||
WAIT_FETCH_RESULT_TIME,
|
||||
FETCH_RESULT_TIME,
|
||||
WRITE_RESULT_TIME,
|
||||
@ -231,6 +242,10 @@ public class SummaryProfile {
|
||||
private long filesystemDeleteFileCnt = 0;
|
||||
private long filesystemDeleteDirCnt = 0;
|
||||
private TransactionType transactionType = TransactionType.UNKNOWN;
|
||||
// BE -> (RPC latency from FE to BE, Execution latency on bthread, Duration of doing work, RPC latency from BE
|
||||
// to FE)
|
||||
private Map<TNetworkAddress, List<Long>> rpcPhase1Latency;
|
||||
private Map<TNetworkAddress, List<Long>> rpcPhase2Latency;
|
||||
|
||||
public SummaryProfile() {
|
||||
summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME);
|
||||
@ -319,6 +334,7 @@ public class SummaryProfile {
|
||||
getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, TUnit.TIME_MS));
|
||||
executionSummaryProfile.addInfoString(SCHEDULE_TIME,
|
||||
getPrettyTime(queryScheduleFinishTime, queryPlanFinishTime, TUnit.TIME_MS));
|
||||
executionSummaryProfile.addInfoString(SCHEDULE_TIME_PER_BE, getRpcLatency());
|
||||
executionSummaryProfile.addInfoString(ASSIGN_FRAGMENT_TIME,
|
||||
getPrettyTime(assignFragmentTime, queryPlanFinishTime, TUnit.TIME_MS));
|
||||
executionSummaryProfile.addInfoString(FRAGMENT_SERIALIZE_TIME,
|
||||
@ -494,6 +510,14 @@ public class SummaryProfile {
|
||||
this.fragmentRpcCount += count;
|
||||
}
|
||||
|
||||
public void setRpcPhase1Latency(Map<TNetworkAddress, List<Long>> rpcPhase1Latency) {
|
||||
this.rpcPhase1Latency = rpcPhase1Latency;
|
||||
}
|
||||
|
||||
public void setRpcPhase2Latency(Map<TNetworkAddress, List<Long>> rpcPhase2Latency) {
|
||||
this.rpcPhase2Latency = rpcPhase2Latency;
|
||||
}
|
||||
|
||||
public static class SummaryBuilder {
|
||||
private Map<String, String> map = Maps.newHashMap();
|
||||
|
||||
@ -683,4 +707,43 @@ public class SummaryProfile {
|
||||
public void incDeleteFileCnt() {
|
||||
this.filesystemDeleteFileCnt += 1;
|
||||
}
|
||||
|
||||
private String getRpcLatency() {
|
||||
Map<String, Map<String, Map<String, String>>> jsonObject = new HashMap<>();
|
||||
if (rpcPhase1Latency != null) {
|
||||
Map<String, Map<String, String>> latencyForPhase1 = new HashMap<>();
|
||||
for (TNetworkAddress key : rpcPhase1Latency.keySet()) {
|
||||
Preconditions.checkState(rpcPhase1Latency.get(key).size() == 4, "rpc latency should have 4 elements");
|
||||
Map<String, String> latency = new HashMap<>();
|
||||
latency.put(LATENCY_FROM_FE_TO_BE, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(0),
|
||||
TUnit.TIME_MS));
|
||||
latency.put(RPC_QUEUE_TIME, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(1),
|
||||
TUnit.TIME_MS));
|
||||
latency.put(RPC_WORK_TIME, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(2),
|
||||
TUnit.TIME_MS));
|
||||
latency.put(LATENCY_FROM_BE_TO_FE, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(3),
|
||||
TUnit.TIME_MS));
|
||||
latencyForPhase1.put(key.getHostname() + ": " + key.getPort(), latency);
|
||||
}
|
||||
jsonObject.put("phase1", latencyForPhase1);
|
||||
}
|
||||
if (rpcPhase2Latency != null) {
|
||||
Map<String, Map<String, String>> latencyForPhase2 = new HashMap<>();
|
||||
for (TNetworkAddress key : rpcPhase2Latency.keySet()) {
|
||||
Preconditions.checkState(rpcPhase2Latency.get(key).size() == 4, "rpc latency should have 4 elements");
|
||||
Map<String, String> latency = new HashMap<>();
|
||||
latency.put(LATENCY_FROM_FE_TO_BE, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(0),
|
||||
TUnit.TIME_MS));
|
||||
latency.put(RPC_QUEUE_TIME, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(1),
|
||||
TUnit.TIME_MS));
|
||||
latency.put(RPC_WORK_TIME, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(2),
|
||||
TUnit.TIME_MS));
|
||||
latency.put(LATENCY_FROM_BE_TO_FE, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(3),
|
||||
TUnit.TIME_MS));
|
||||
latencyForPhase2.put(key.getHostname() + ": " + key.getPort(), latency);
|
||||
}
|
||||
jsonObject.put("phase2", latencyForPhase2);
|
||||
}
|
||||
return new Gson().toJson(jsonObject);
|
||||
}
|
||||
}
|
||||
|
||||
@ -140,6 +140,7 @@ import org.apache.thrift.TException;
|
||||
import org.apache.thrift.TSerializer;
|
||||
import org.apache.thrift.protocol.TCompactProtocol;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
import java.time.LocalDateTime;
|
||||
@ -1045,28 +1046,35 @@ public class Coordinator implements CoordInterface {
|
||||
updateProfileIfPresent(profile -> profile.setFragmentSerializeTime());
|
||||
|
||||
// 4.2 send fragments rpc
|
||||
List<Triple<PipelineExecContexts, BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>>
|
||||
futures = Lists.newArrayList();
|
||||
List<Pair<Long, Triple<PipelineExecContexts, BackendServiceProxy,
|
||||
Future<InternalService.PExecPlanFragmentResult>>>> futures = Lists.newArrayList();
|
||||
BackendServiceProxy proxy = BackendServiceProxy.getInstance();
|
||||
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(ctxs.debugInfo());
|
||||
}
|
||||
futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy)));
|
||||
futures.add(Pair.of(DateTime.now().getMillis(),
|
||||
ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy))));
|
||||
}
|
||||
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments");
|
||||
Map<TNetworkAddress, List<Long>> rpcPhase1Latency =
|
||||
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments");
|
||||
updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size()));
|
||||
updateProfileIfPresent(profile -> profile.setFragmentSendPhase1Time());
|
||||
updateProfileIfPresent(profile -> profile.setRpcPhase1Latency(rpcPhase1Latency));
|
||||
|
||||
if (twoPhaseExecution) {
|
||||
// 5. send and wait execution start rpc
|
||||
futures.clear();
|
||||
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
|
||||
futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy)));
|
||||
futures.add(Pair.of(DateTime.now().getMillis(),
|
||||
ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy))));
|
||||
}
|
||||
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start");
|
||||
Map<TNetworkAddress, List<Long>> rpcPhase2Latency =
|
||||
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(),
|
||||
"send execution start");
|
||||
updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size()));
|
||||
updateProfileIfPresent(profile -> profile.setFragmentSendPhase2Time());
|
||||
updateProfileIfPresent(profile -> profile.setRpcPhase2Latency(rpcPhase2Latency));
|
||||
}
|
||||
} finally {
|
||||
unlock();
|
||||
@ -1150,8 +1158,8 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
}
|
||||
|
||||
private void waitPipelineRpc(List<Triple<PipelineExecContexts, BackendServiceProxy,
|
||||
Future<PExecPlanFragmentResult>>> futures, long leftTimeMs,
|
||||
private Map<TNetworkAddress, List<Long>> waitPipelineRpc(List<Pair<Long, Triple<PipelineExecContexts,
|
||||
BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>>> futures, long leftTimeMs,
|
||||
String operation) throws RpcException, UserException {
|
||||
if (leftTimeMs <= 0) {
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
@ -1172,14 +1180,26 @@ public class Coordinator implements CoordInterface {
|
||||
throw new UserException(msg);
|
||||
}
|
||||
|
||||
// BE -> (RPC latency from FE to BE, Execution latency on bthread, Duration of doing work, RPC latency from BE
|
||||
// to FE)
|
||||
Map<TNetworkAddress, List<Long>> beToPrepareLatency = new HashMap<>();
|
||||
long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms);
|
||||
for (Triple<PipelineExecContexts, BackendServiceProxy, Future<PExecPlanFragmentResult>> triple : futures) {
|
||||
for (Pair<Long, Triple<PipelineExecContexts, BackendServiceProxy,
|
||||
Future<InternalService.PExecPlanFragmentResult>>> pair : futures) {
|
||||
Triple<PipelineExecContexts, BackendServiceProxy,
|
||||
Future<InternalService.PExecPlanFragmentResult>> triple = pair.second;
|
||||
TStatusCode code;
|
||||
String errMsg = null;
|
||||
Exception exception = null;
|
||||
|
||||
try {
|
||||
PExecPlanFragmentResult result = triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
long rpcDone = DateTime.now().getMillis();
|
||||
beToPrepareLatency.put(triple.getLeft().brpcAddr,
|
||||
Lists.newArrayList(result.getReceivedTime() - pair.first,
|
||||
result.getExecutionTime() - result.getReceivedTime(),
|
||||
result.getExecutionDoneTime() - result.getExecutionTime(),
|
||||
rpcDone - result.getExecutionDoneTime()));
|
||||
code = TStatusCode.findByValue(result.getStatus().getStatusCode());
|
||||
if (code == null) {
|
||||
code = TStatusCode.INTERNAL_ERROR;
|
||||
@ -1229,6 +1249,7 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
}
|
||||
}
|
||||
return beToPrepareLatency;
|
||||
}
|
||||
|
||||
public List<String> getExportFiles() {
|
||||
|
||||
Reference in New Issue
Block a user