[branch-2.1](opt)(profile) parallel serialize fragment and add detail schedule profile #33376 #33379

This commit is contained in:
morningman
2024-04-08 13:45:53 +08:00
parent e841d82ffb
commit ee36b2f70d
6 changed files with 217 additions and 168 deletions

View File

@ -62,14 +62,18 @@ public class SummaryProfile {
public static final String INIT_SCAN_NODE_TIME = "Init Scan Node Time";
public static final String FINALIZE_SCAN_NODE_TIME = "Finalize Scan Node Time";
public static final String GET_SPLITS_TIME = "Get Splits Time";
public static final String GET_PARTITIONS_TIME = "Get PARTITIONS Time";
public static final String GET_PARTITION_FILES_TIME = "Get PARTITION FILES Time";
public static final String GET_PARTITIONS_TIME = "Get Partitions Time";
public static final String GET_PARTITION_FILES_TIME = "Get Partition Files Time";
public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range Time";
public static final String PLAN_TIME = "Plan Time";
public static final String SCHEDULE_TIME = "Schedule Time";
public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time";
public static final String FRAGMENT_SERIALIZE_TIME = "Fragment Serialize Time";
public static final String SEND_FRAGMENT_PHASE1_TIME = "Fragment RPC Phase1 Time";
public static final String SEND_FRAGMENT_PHASE2_TIME = "Fragment RPC Phase2 Time";
public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result Time";
public static final String FETCH_RESULT_TIME = "Fetch Result Time";
public static final String WRITE_RESULT_TIME = "Write Result Time";
public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result Time";
public static final String PARSE_SQL_TIME = "Parse SQL Time";
public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
@ -77,34 +81,75 @@ public class SummaryProfile {
public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
public static final String NEREIDS_TRANSLATE_TIME = "Nereids Translate Time";
public static final String FRAGMENT_COMPRESSED_SIZE = "Fragment Compressed Size";
public static final String FRAGMENT_RPC_COUNT = "Fragment RPC Count";
// These info will display on FE's web ui table, every one will be displayed as
// a column, so that should not
// add many columns here. Add to ExcecutionSummary list.
public static final ImmutableList<String> SUMMARY_KEYS = ImmutableList.of(PROFILE_ID, TASK_TYPE,
START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER, DEFAULT_DB, SQL_STATEMENT);
// The display order of execution summary items.
public static final ImmutableList<String> EXECUTION_SUMMARY_KEYS = ImmutableList.of(
PARSE_SQL_TIME, NEREIDS_ANALYSIS_TIME, NEREIDS_REWRITE_TIME, NEREIDS_OPTIMIZE_TIME, NEREIDS_TRANSLATE_TIME,
WORKLOAD_GROUP, ANALYSIS_TIME,
PLAN_TIME, JOIN_REORDER_TIME, CREATE_SINGLE_NODE_TIME, QUERY_DISTRIBUTED_TIME,
INIT_SCAN_NODE_TIME, FINALIZE_SCAN_NODE_TIME, GET_SPLITS_TIME, GET_PARTITIONS_TIME,
GET_PARTITION_FILES_TIME, CREATE_SCAN_RANGE_TIME, SCHEDULE_TIME, FETCH_RESULT_TIME,
WRITE_RESULT_TIME, WAIT_FETCH_RESULT_TIME, DORIS_VERSION, IS_NEREIDS, IS_PIPELINE,
IS_CACHED, TOTAL_INSTANCES_NUM, INSTANCES_NUM_PER_BE, PARALLEL_FRAGMENT_EXEC_INSTANCE, TRACE_ID);
PARSE_SQL_TIME,
NEREIDS_ANALYSIS_TIME,
NEREIDS_REWRITE_TIME,
NEREIDS_OPTIMIZE_TIME,
NEREIDS_TRANSLATE_TIME,
WORKLOAD_GROUP,
ANALYSIS_TIME,
PLAN_TIME,
JOIN_REORDER_TIME,
CREATE_SINGLE_NODE_TIME,
QUERY_DISTRIBUTED_TIME,
INIT_SCAN_NODE_TIME,
FINALIZE_SCAN_NODE_TIME,
GET_SPLITS_TIME,
GET_PARTITIONS_TIME,
GET_PARTITION_FILES_TIME,
CREATE_SCAN_RANGE_TIME,
SCHEDULE_TIME,
ASSIGN_FRAGMENT_TIME,
FRAGMENT_SERIALIZE_TIME,
SEND_FRAGMENT_PHASE1_TIME,
SEND_FRAGMENT_PHASE2_TIME,
FRAGMENT_COMPRESSED_SIZE,
FRAGMENT_RPC_COUNT,
WAIT_FETCH_RESULT_TIME,
FETCH_RESULT_TIME,
WRITE_RESULT_TIME,
DORIS_VERSION,
IS_NEREIDS,
IS_PIPELINE,
IS_CACHED,
TOTAL_INSTANCES_NUM,
INSTANCES_NUM_PER_BE,
PARALLEL_FRAGMENT_EXEC_INSTANCE,
TRACE_ID);
// Ident of each item. Default is 0, which doesn't need to present in this Map.
// Please set this map for new profile items if they need ident.
public static ImmutableMap<String, Integer> EXECUTION_SUMMARY_KEYS_IDENTATION = ImmutableMap.of(
JOIN_REORDER_TIME, 1,
CREATE_SINGLE_NODE_TIME, 1,
QUERY_DISTRIBUTED_TIME, 1,
INIT_SCAN_NODE_TIME, 1,
FINALIZE_SCAN_NODE_TIME, 1,
GET_SPLITS_TIME, 2,
GET_PARTITIONS_TIME, 3,
GET_PARTITION_FILES_TIME, 3,
CREATE_SCAN_RANGE_TIME, 2
);
public static ImmutableMap<String, Integer> EXECUTION_SUMMARY_KEYS_IDENTATION
= ImmutableMap.<String, Integer>builder()
.put(JOIN_REORDER_TIME, 1)
.put(CREATE_SINGLE_NODE_TIME, 1)
.put(QUERY_DISTRIBUTED_TIME, 1)
.put(INIT_SCAN_NODE_TIME, 1)
.put(FINALIZE_SCAN_NODE_TIME, 1)
.put(GET_SPLITS_TIME, 2)
.put(GET_PARTITIONS_TIME, 3)
.put(GET_PARTITION_FILES_TIME, 3)
.put(CREATE_SCAN_RANGE_TIME, 2)
.put(FETCH_RESULT_TIME, 1)
.put(WRITE_RESULT_TIME, 1)
.put(ASSIGN_FRAGMENT_TIME, 1)
.put(FRAGMENT_SERIALIZE_TIME, 1)
.put(SEND_FRAGMENT_PHASE1_TIME, 1)
.put(SEND_FRAGMENT_PHASE2_TIME, 1)
.put(FRAGMENT_COMPRESSED_SIZE, 1)
.put(FRAGMENT_RPC_COUNT, 1)
.build();
private RuntimeProfile summaryProfile;
private RuntimeProfile executionSummaryProfile;
@ -136,6 +181,12 @@ public class SummaryProfile {
private long createScanRangeFinishTime = -1;
// Plan end time
private long queryPlanFinishTime = -1;
private long assignFragmentTime = -1;
private long fragmentSerializeTime = -1;
private long fragmentSendPhase1Time = -1;
private long fragmentSendPhase2Time = -1;
private long fragmentCompressedSize = 0;
private long fragmentRpcCount = 0;
// Fragment schedule and send end time
private long queryScheduleFinishTime = -1;
// Query result fetch end time
@ -207,23 +258,47 @@ public class SummaryProfile {
executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, getPrettyNereidsRewriteTime());
executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, getPrettyNereidsOptimizeTime());
executionSummaryProfile.addInfoString(NEREIDS_TRANSLATE_TIME, getPrettyNereidsTranslateTime());
executionSummaryProfile.addInfoString(ANALYSIS_TIME, getPrettyQueryAnalysisFinishTime());
executionSummaryProfile.addInfoString(PLAN_TIME, getPrettyQueryPlanFinishTime());
executionSummaryProfile.addInfoString(JOIN_REORDER_TIME, getPrettyQueryJoinReorderFinishTime());
executionSummaryProfile.addInfoString(CREATE_SINGLE_NODE_TIME, getPrettyCreateSingleNodeFinishTime());
executionSummaryProfile.addInfoString(QUERY_DISTRIBUTED_TIME, getPrettyQueryDistributedFinishTime());
executionSummaryProfile.addInfoString(INIT_SCAN_NODE_TIME, getPrettyInitScanNodeTime());
executionSummaryProfile.addInfoString(FINALIZE_SCAN_NODE_TIME, getPrettyFinalizeScanNodeTime());
executionSummaryProfile.addInfoString(GET_SPLITS_TIME, getPrettyGetSplitsTime());
executionSummaryProfile.addInfoString(GET_PARTITIONS_TIME, getPrettyGetPartitionsTime());
executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME, getPrettyGetPartitionFilesTime());
executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME, getPrettyCreateScanRangeTime());
executionSummaryProfile.addInfoString(SCHEDULE_TIME, getPrettyQueryScheduleFinishTime());
executionSummaryProfile.addInfoString(ANALYSIS_TIME,
getPrettyTime(queryAnalysisFinishTime, queryBeginTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(PLAN_TIME,
getPrettyTime(queryPlanFinishTime, queryAnalysisFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(JOIN_REORDER_TIME,
getPrettyTime(queryJoinReorderFinishTime, queryAnalysisFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(CREATE_SINGLE_NODE_TIME,
getPrettyTime(queryCreateSingleNodeFinishTime, queryJoinReorderFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(QUERY_DISTRIBUTED_TIME,
getPrettyTime(queryDistributedFinishTime, queryCreateSingleNodeFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(INIT_SCAN_NODE_TIME,
getPrettyTime(initScanNodeFinishTime, initScanNodeStartTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(FINALIZE_SCAN_NODE_TIME,
getPrettyTime(finalizeScanNodeFinishTime, finalizeScanNodeStartTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(GET_SPLITS_TIME,
getPrettyTime(getSplitsFinishTime, getSplitsStartTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(GET_PARTITIONS_TIME,
getPrettyTime(getPartitionsFinishTime, getSplitsStartTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME,
getPrettyTime(getPartitionFilesFinishTime, getPartitionsFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME,
getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(SCHEDULE_TIME,
getPrettyTime(queryScheduleFinishTime, queryPlanFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(ASSIGN_FRAGMENT_TIME,
getPrettyTime(assignFragmentTime, queryPlanFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(FRAGMENT_SERIALIZE_TIME,
getPrettyTime(fragmentSerializeTime, assignFragmentTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(SEND_FRAGMENT_PHASE1_TIME,
getPrettyTime(fragmentSendPhase1Time, fragmentSerializeTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(SEND_FRAGMENT_PHASE2_TIME,
getPrettyTime(fragmentSendPhase2Time, fragmentSendPhase1Time, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(FRAGMENT_COMPRESSED_SIZE,
RuntimeProfile.printCounter(fragmentCompressedSize, TUnit.BYTES));
executionSummaryProfile.addInfoString(FRAGMENT_RPC_COUNT, "" + fragmentRpcCount);
executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME,
getPrettyTime(queryFetchResultFinishTime, queryScheduleFinishTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(FETCH_RESULT_TIME,
RuntimeProfile.printCounter(queryFetchResultConsumeTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(WRITE_RESULT_TIME,
RuntimeProfile.printCounter(queryWriteResultConsumeTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME, getPrettyQueryFetchResultFinishTime());
}
public void setParseSqlStartTime(long parseSqlStartTime) {
@ -330,8 +405,28 @@ public class SummaryProfile {
this.queryWriteResultConsumeTime += TimeUtils.getStartTimeMs() - tempStarTime;
}
public long getQueryBeginTime() {
return queryBeginTime;
public void setAssignFragmentTime() {
this.assignFragmentTime = TimeUtils.getStartTimeMs();
}
public void setFragmentSerializeTime() {
this.fragmentSerializeTime = TimeUtils.getStartTimeMs();
}
public void setFragmentSendPhase1Time() {
this.fragmentSendPhase1Time = TimeUtils.getStartTimeMs();
}
public void setFragmentSendPhase2Time() {
this.fragmentSendPhase2Time = TimeUtils.getStartTimeMs();
}
public void updateFragmentCompressedSize(long size) {
this.fragmentCompressedSize += size;
}
public void updateFragmentRpcCount(long count) {
this.fragmentRpcCount += count;
}
public static class SummaryBuilder {
@ -433,128 +528,29 @@ public class SummaryProfile {
}
public String getPrettyParseSqlTime() {
if (parseSqlStartTime == -1 || parseSqlFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(parseSqlFinishTime - parseSqlStartTime, TUnit.TIME_MS);
return getPrettyTime(parseSqlStartTime, parseSqlFinishTime, TUnit.TIME_MS);
}
public String getPrettyNereidsAnalysisTime() {
if (nereidsAnalysisFinishTime == -1 || queryAnalysisFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(nereidsAnalysisFinishTime - queryBeginTime, TUnit.TIME_MS);
return getPrettyTime(nereidsAnalysisFinishTime, queryBeginTime, TUnit.TIME_MS);
}
public String getPrettyNereidsRewriteTime() {
if (nereidsRewriteFinishTime == -1 || nereidsAnalysisFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(nereidsRewriteFinishTime - nereidsAnalysisFinishTime, TUnit.TIME_MS);
return getPrettyTime(nereidsRewriteFinishTime, nereidsAnalysisFinishTime, TUnit.TIME_MS);
}
public String getPrettyNereidsOptimizeTime() {
if (nereidsOptimizeFinishTime == -1 || nereidsRewriteFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(nereidsOptimizeFinishTime - nereidsRewriteFinishTime, TUnit.TIME_MS);
return getPrettyTime(nereidsOptimizeFinishTime, nereidsRewriteFinishTime, TUnit.TIME_MS);
}
public String getPrettyNereidsTranslateTime() {
if (nereidsTranslateFinishTime == -1 || nereidsOptimizeFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(nereidsTranslateFinishTime - nereidsOptimizeFinishTime, TUnit.TIME_MS);
return getPrettyTime(nereidsTranslateFinishTime, nereidsOptimizeFinishTime, TUnit.TIME_MS);
}
private String getPrettyQueryAnalysisFinishTime() {
if (queryBeginTime == -1 || queryAnalysisFinishTime == -1) {
private String getPrettyTime(long end, long start, TUnit unit) {
if (start == -1 || end == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(queryAnalysisFinishTime - queryBeginTime, TUnit.TIME_MS);
}
private String getPrettyQueryJoinReorderFinishTime() {
if (queryAnalysisFinishTime == -1 || queryJoinReorderFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(queryJoinReorderFinishTime - queryAnalysisFinishTime, TUnit.TIME_MS);
}
private String getPrettyCreateSingleNodeFinishTime() {
if (queryJoinReorderFinishTime == -1 || queryCreateSingleNodeFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(queryCreateSingleNodeFinishTime - queryJoinReorderFinishTime, TUnit.TIME_MS);
}
private String getPrettyQueryDistributedFinishTime() {
if (queryCreateSingleNodeFinishTime == -1 || queryDistributedFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(queryDistributedFinishTime - queryCreateSingleNodeFinishTime, TUnit.TIME_MS);
}
private String getPrettyInitScanNodeTime() {
if (initScanNodeStartTime == -1 || initScanNodeFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(initScanNodeFinishTime - initScanNodeStartTime, TUnit.TIME_MS);
}
private String getPrettyFinalizeScanNodeTime() {
if (finalizeScanNodeFinishTime == -1 || finalizeScanNodeStartTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(finalizeScanNodeFinishTime - finalizeScanNodeStartTime, TUnit.TIME_MS);
}
private String getPrettyGetSplitsTime() {
if (getSplitsFinishTime == -1 || getSplitsStartTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(getSplitsFinishTime - getSplitsStartTime, TUnit.TIME_MS);
}
private String getPrettyGetPartitionsTime() {
if (getSplitsStartTime == -1 || getPartitionsFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(getPartitionsFinishTime - getSplitsStartTime, TUnit.TIME_MS);
}
private String getPrettyGetPartitionFilesTime() {
if (getPartitionsFinishTime == -1 || getPartitionFilesFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(getPartitionFilesFinishTime - getPartitionsFinishTime, TUnit.TIME_MS);
}
private String getPrettyCreateScanRangeTime() {
if (getSplitsFinishTime == -1 || createScanRangeFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(createScanRangeFinishTime - getSplitsFinishTime, TUnit.TIME_MS);
}
private String getPrettyQueryPlanFinishTime() {
if (queryAnalysisFinishTime == -1 || queryPlanFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(queryPlanFinishTime - queryAnalysisFinishTime, TUnit.TIME_MS);
}
private String getPrettyQueryScheduleFinishTime() {
if (queryPlanFinishTime == -1 || queryScheduleFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(queryScheduleFinishTime - queryPlanFinishTime, TUnit.TIME_MS);
}
private String getPrettyQueryFetchResultFinishTime() {
if (queryScheduleFinishTime == -1 || queryFetchResultFinishTime == -1) {
return "N/A";
}
return RuntimeProfile.printCounter(queryFetchResultFinishTime - queryScheduleFinishTime, TUnit.TIME_MS);
return RuntimeProfile.printCounter(end - start, unit);
}
}

View File

@ -90,8 +90,7 @@ public class BrokerUtil {
brokerDesc.getName(), brokerDesc.getStorageType(), brokerDesc.getProperties());
Status st = fileSystem.list(path, rfiles, false);
if (!st.ok()) {
throw new UserException(brokerDesc.getName() + " list path failed. path=" + path
+ ",msg=" + st.getErrMsg());
throw new UserException(st.getErrMsg());
}
} catch (Exception e) {
LOG.warn("{} list path exception, path={}", brokerDesc.getName(), path, e);

View File

@ -30,6 +30,7 @@ import org.apache.doris.common.Status;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListUtil;
import org.apache.doris.common.util.RuntimeProfile;
@ -126,11 +127,14 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.jetbrains.annotations.NotNull;
import java.security.SecureRandom;
@ -150,6 +154,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@ -715,6 +720,7 @@ public class Coordinator implements CoordInterface {
LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet());
}
updateProfileIfPresent(profile -> profile.setAssignFragmentTime());
if (enablePipelineEngine) {
sendPipelineCtx();
} else {
@ -992,35 +998,44 @@ public class Coordinator implements CoordInterface {
}
// 4. send and wait fragments rpc
// 4.1 serialize fragment
// unsetFields() must be called serially.
beToPipelineExecCtxs.values().stream().forEach(ctxs -> ctxs.unsetFields());
// serializeFragments() can be called in parallel.
final AtomicLong compressedSize = new AtomicLong(0);
beToPipelineExecCtxs.values().parallelStream().forEach(ctxs -> {
try {
compressedSize.addAndGet(ctxs.serializeFragments());
} catch (TException e) {
throw new RuntimeException(e);
}
});
updateProfileIfPresent(profile -> profile.updateFragmentCompressedSize(compressedSize.get()));
updateProfileIfPresent(profile -> profile.setFragmentSerializeTime());
// 4.2 send fragments rpc
List<Triple<PipelineExecContexts, BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>>
futures = Lists.newArrayList();
BackendServiceProxy proxy = BackendServiceProxy.getInstance();
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
if (LOG.isDebugEnabled()) {
String infos = "";
for (PipelineExecContext pec : ctxs.ctxs) {
infos += pec.fragmentId + " ";
}
if (LOG.isDebugEnabled()) {
LOG.debug("query {}, sending pipeline fragments: {} to be {} bprc address {}",
DebugUtil.printId(queryId), infos, ctxs.beId, ctxs.brpcAddr.toString());
}
LOG.debug(ctxs.debugInfo());
}
ctxs.unsetFields();
BackendServiceProxy proxy = BackendServiceProxy.getInstance();
futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy)));
}
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments");
updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size()));
updateProfileIfPresent(profile -> profile.setFragmentSendPhase1Time());
if (twoPhaseExecution) {
// 5. send and wait execution start rpc
futures.clear();
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
BackendServiceProxy proxy = BackendServiceProxy.getInstance();
futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy)));
}
waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start");
updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size()));
updateProfileIfPresent(profile -> profile.setFragmentSendPhase2Time());
}
} finally {
unlock();
@ -3552,6 +3567,7 @@ public class Coordinator implements CoordInterface {
List<PipelineExecContext> ctxs = Lists.newArrayList();
boolean twoPhaseExecution = false;
int instanceNumber;
ByteString serializedFragments = null;
public PipelineExecContexts(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution,
int instanceNumber) {
@ -3585,15 +3601,10 @@ public class Coordinator implements CoordInterface {
}
}
public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync(BackendServiceProxy proxy)
throws TException {
public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync(BackendServiceProxy proxy) {
Preconditions.checkNotNull(serializedFragments);
try {
TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList();
for (PipelineExecContext cts : ctxs) {
cts.initiated = true;
paramsList.addToParamsList(cts.rpcParams);
}
return proxy.execPlanFragmentsAsync(brpcAddr, paramsList, twoPhaseExecution);
return proxy.execPlanFragmentsAsync(brpcAddr, serializedFragments, twoPhaseExecution);
} catch (RpcException e) {
// DO NOT throw exception here, return a complete future with error code,
// so that the following logic will cancel the fragment.
@ -3647,6 +3658,26 @@ public class Coordinator implements CoordInterface {
}
};
}
public long serializeFragments() throws TException {
TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList();
for (PipelineExecContext cts : ctxs) {
cts.initiated = true;
paramsList.addToParamsList(cts.rpcParams);
}
serializedFragments = ByteString.copyFrom(
new TSerializer(new TCompactProtocol.Factory()).serialize(paramsList));
return serializedFragments.size();
}
public String debugInfo() {
String infos = "";
for (PipelineExecContext pec : ctxs) {
infos += pec.fragmentId + " ";
}
return String.format("query %s, sending pipeline fragments: %s to be %s bprc address %s",
DebugUtil.printId(queryId), infos, beId, brpcAddr.toString());
}
}
// execution parameters for a single fragment,
@ -4063,5 +4094,12 @@ public class Coordinator implements CoordInterface {
this.targetFragmentInstanceAddr = host;
}
}
private void updateProfileIfPresent(Consumer<SummaryProfile> profileAction) {
Optional.ofNullable(context)
.map(ConnectContext::getExecutor)
.map(StmtExecutor::getSummaryProfile)
.ifPresent(profileAction);
}
}

View File

@ -252,6 +252,7 @@ public class StmtExecutor {
public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.MYSQL));
this.context = context;
this.context.setExecutor(this);
this.originStmt = originStmt;
this.serializer = context.getMysqlChannel().getSerializer();
this.isProxy = isProxy;

View File

@ -198,8 +198,23 @@ public class BackendServiceProxy {
}
// VERSION 3 means we send TPipelineFragmentParamsList
builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_3);
return execPlanFragmentsAsync(address, builder.build(), twoPhaseExecution);
}
final InternalService.PExecPlanFragmentRequest pRequest = builder.build();
public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentsAsync(TNetworkAddress address,
ByteString serializedFragments, boolean twoPhaseExecution) throws RpcException {
InternalService.PExecPlanFragmentRequest.Builder builder =
InternalService.PExecPlanFragmentRequest.newBuilder();
builder.setRequest(serializedFragments);
builder.setCompact(true);
// VERSION 3 means we send TPipelineFragmentParamsList
builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_3);
return execPlanFragmentsAsync(address, builder.build(), twoPhaseExecution);
}
public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentsAsync(TNetworkAddress address,
InternalService.PExecPlanFragmentRequest pRequest, boolean twoPhaseExecution)
throws RpcException {
MetricRepo.BE_COUNTER_QUERY_RPC_ALL.getOrAdd(address.hostname).increase(1L);
MetricRepo.BE_COUNTER_QUERY_RPC_SIZE.getOrAdd(address.hostname).increase((long) pRequest.getSerializedSize());
try {

View File

@ -168,7 +168,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
try {
BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
} catch (UserException e) {
throw new AnalysisException("parse file failed, path = " + path, e);
throw new AnalysisException("parse file failed, err: " + e.getMessage(), e);
}
}