Parallel fragment exec instance (#851)
This commit is contained in:
58
fe/src/main/java/org/apache/doris/common/util/ListUtil.java
Normal file
58
fe/src/main/java/org/apache/doris/common/util/ListUtil.java
Normal file
@ -0,0 +1,58 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common.util;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class ListUtil {
|
||||
/**
|
||||
* split a list to multi expected number sublist
|
||||
* for example:
|
||||
*
|
||||
* list is : [1, 2, 3, 4, 5, 6, 7]
|
||||
* expectedSize is : 3
|
||||
*
|
||||
* return :
|
||||
* [1, 4, 7]
|
||||
* [2, 5]
|
||||
* [3, 6]
|
||||
*/
|
||||
public static <T> List<List<T>> splitBySize(List<T> list, int expectedSize)
|
||||
throws NullPointerException, IllegalArgumentException {
|
||||
Preconditions.checkNotNull(list, "list must not be null");
|
||||
Preconditions.checkArgument(expectedSize > 0, "expectedSize must larger than 0");
|
||||
|
||||
int splitSize = Math.min(expectedSize, list.size());
|
||||
|
||||
List<List<T>> result = new ArrayList<List<T>>(splitSize);
|
||||
for (int i = 0; i < splitSize; i++) {
|
||||
result.add(new ArrayList<>());
|
||||
}
|
||||
|
||||
int index = 0;
|
||||
for (T t : list) {
|
||||
result.get(index).add(t);
|
||||
index = (index + 1) % splitSize;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@ -294,6 +294,10 @@ public class HashJoinNode extends PlanNode {
|
||||
return Math.max(children.get(0).getNumInstances(), children.get(1).getNumInstances());
|
||||
}
|
||||
|
||||
public boolean isShuffleJoin() {
|
||||
return distrMode == DistributionMode.PARTITIONED;
|
||||
}
|
||||
|
||||
enum DistributionMode {
|
||||
NONE("NONE"),
|
||||
BROADCAST("BROADCAST"),
|
||||
|
||||
@ -145,15 +145,8 @@ public class Planner {
|
||||
singleNodePlanner = new SingleNodePlanner(plannerContext);
|
||||
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
|
||||
|
||||
singleNodePlanner.validatePlan(singleNodePlan);
|
||||
|
||||
List<Expr> resultExprs = queryStmt.getResultExprs();
|
||||
if (statment instanceof InsertStmt) {
|
||||
if (queryOptions.isSetMt_dop() && queryOptions.mt_dop > 0) {
|
||||
throw new NotImplementedException(
|
||||
"MT_DOP not supported for plans with insert.");
|
||||
}
|
||||
|
||||
InsertStmt insertStmt = (InsertStmt) statment;
|
||||
if (insertStmt.getOlapTuple() != null && !insertStmt.isStreaming()) {
|
||||
singleNodePlan = new OlapRewriteNode(plannerContext.getNextNodeId(), singleNodePlan, insertStmt);
|
||||
|
||||
@ -116,31 +116,6 @@ public class SingleNodePlanner {
|
||||
return singleNodePlan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that the given single-node plan is executable:
|
||||
* - It may not contain right or full outer joins with no equi-join conjuncts that
|
||||
* are not inside the right child of a SubplanNode.
|
||||
* - MT_DOP > 0 is not supported for plans with base table joins or table sinks.
|
||||
* Throws a NotImplementedException if plan validation fails.
|
||||
*/
|
||||
public void validatePlan(PlanNode planNode) throws NotImplementedException {
|
||||
if (ctx_.getQueryOptions().isSetMt_dop() && ctx_.getQueryOptions().mt_dop > 0
|
||||
&& (planNode instanceof HashJoinNode || planNode instanceof CrossJoinNode)) {
|
||||
throw new NotImplementedException(
|
||||
"MT_DOP not supported for plans with base table joins or table sinks.");
|
||||
}
|
||||
|
||||
// As long as MT_DOP is unset or 0 any join can run in a single-node plan.
|
||||
if (ctx_.isSingleNodeExec() &&
|
||||
(!ctx_.getQueryOptions().isSetMt_dop() || ctx_.getQueryOptions().mt_dop == 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (PlanNode child : planNode.getChildren()) {
|
||||
validatePlan(child);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an EmptyNode that 'materializes' the tuples of the given stmt.
|
||||
* Marks all collection-typed slots referenced in stmt as non-materialized because
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.common.Reference;
|
||||
import org.apache.doris.common.Status;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.ListUtil;
|
||||
import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.load.LoadErrorHub;
|
||||
import org.apache.doris.planner.DataPartition;
|
||||
@ -33,7 +34,6 @@ import org.apache.doris.planner.DataSink;
|
||||
import org.apache.doris.planner.DataStreamSink;
|
||||
import org.apache.doris.planner.ExchangeNode;
|
||||
import org.apache.doris.planner.HashJoinNode;
|
||||
import org.apache.doris.planner.MysqlScanNode;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.PlanFragmentId;
|
||||
@ -56,7 +56,6 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
|
||||
import org.apache.doris.thrift.TLoadErrorHubInfo;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPaloScanRange;
|
||||
import org.apache.doris.thrift.TPartitionType;
|
||||
import org.apache.doris.thrift.TPlanFragmentDestination;
|
||||
import org.apache.doris.thrift.TPlanFragmentExecParams;
|
||||
import org.apache.doris.thrift.TQueryGlobals;
|
||||
@ -78,7 +77,6 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -87,7 +85,6 @@ import org.apache.thrift.TException;
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -365,13 +362,7 @@ public class Coordinator {
|
||||
// compute Fragment Instance
|
||||
computeScanRangeAssignment();
|
||||
|
||||
// if mt_dop <= 1
|
||||
if (queryOptions.mt_dop <= 1) {
|
||||
computeFragmentExecParams();
|
||||
} else {
|
||||
computeFragmentExecParamsForParallelExec();
|
||||
validate();
|
||||
}
|
||||
computeFragmentExecParams();
|
||||
|
||||
traceInstance();
|
||||
|
||||
@ -701,7 +692,7 @@ public class Coordinator {
|
||||
// assign instance ids
|
||||
numBackends = 0;
|
||||
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
|
||||
LOG.debug("parameter has instances.{}", params.instanceExecParams.size());
|
||||
LOG.debug("fragment {} has instances {}", params.fragment.getFragmentId(), params.instanceExecParams.size());
|
||||
for (int j = 0; j < params.instanceExecParams.size(); ++j) {
|
||||
// we add instance_num to query_id.lo to create a
|
||||
// globally-unique instance id
|
||||
@ -856,15 +847,28 @@ public class Coordinator {
|
||||
} else {
|
||||
//normat fragment
|
||||
Iterator iter = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment.entrySet().iterator();
|
||||
int parallelExecInstanceNum = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
|
||||
while (iter.hasNext()) {
|
||||
Map.Entry entry = (Map.Entry) iter.next();
|
||||
TNetworkAddress key = (TNetworkAddress) entry.getKey();
|
||||
Map<Integer, List<TScanRangeParams>> value = (Map<Integer, List<TScanRangeParams>>) entry.getValue();
|
||||
FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params);
|
||||
|
||||
for (Integer planNodeId : value.keySet()) {
|
||||
instanceParam.perNodeScanRanges.put(planNodeId, value.get(planNodeId));
|
||||
List<TScanRangeParams> perNodeScanRanges = value.get(planNodeId);
|
||||
int expectedInstanceNum = 1;
|
||||
if (parallelExecInstanceNum > 1) {
|
||||
//the scan instance num should not larger than the tablets num
|
||||
expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
|
||||
}
|
||||
List<List<TScanRangeParams>> perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges,
|
||||
expectedInstanceNum);
|
||||
|
||||
for (List<TScanRangeParams> scanRangeParams : perInstanceScanRanges) {
|
||||
FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params);
|
||||
instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams);
|
||||
params.instanceExecParams.add(instanceParam);
|
||||
}
|
||||
}
|
||||
params.instanceExecParams.add(instanceParam);
|
||||
}
|
||||
}
|
||||
|
||||
@ -912,296 +916,6 @@ public class Coordinator {
|
||||
return false;
|
||||
}
|
||||
|
||||
private void computeFragmentExecParamsForParallelExec() throws Exception {
|
||||
// create exec params and set instance_id, host, per_node_scan_ranges
|
||||
computeFragmentInstances(fragmentExecParamsMap.get(fragments.get(0).getFragmentId()));
|
||||
|
||||
// Set destinations, per_exch_num_senders, sender_id.
|
||||
for (PlanFragment srcFragment : fragments) {
|
||||
if (!(srcFragment.getSink() instanceof DataStreamSink)) {
|
||||
continue;
|
||||
}
|
||||
final PlanFragmentId desFragmentId = srcFragment.getDestFragment().getFragmentId();
|
||||
final FragmentExecParams srcParams = fragmentExecParamsMap.get(srcFragment.getFragmentId());
|
||||
final FragmentExecParams destParams = fragmentExecParamsMap.get(desFragmentId);
|
||||
|
||||
// populate src_params->destinations
|
||||
for (int i = 0; i < destParams.instanceExecParams.size(); i++) {
|
||||
TPlanFragmentDestination dest = new TPlanFragmentDestination();
|
||||
dest.setFragment_instance_id(destParams.instanceExecParams.get(i).instanceId);
|
||||
dest.setServer(toRpcHost(destParams.instanceExecParams.get(i).host));
|
||||
dest.setBrpc_server(toBrpcHost(destParams.instanceExecParams.get(i).host));
|
||||
srcParams.destinations.add(dest);
|
||||
}
|
||||
|
||||
final DataSink sinker = srcFragment.getSink();
|
||||
Preconditions.checkState(
|
||||
sinker.getOutputPartition().getType() == TPartitionType.HASH_PARTITIONED
|
||||
|| sinker.getOutputPartition().getType() == TPartitionType.UNPARTITIONED
|
||||
|| sinker.getOutputPartition().getType() == TPartitionType.RANDOM);
|
||||
|
||||
PlanNodeId exchId = sinker.getExchNodeId();
|
||||
Integer senderIdBase = destParams.perExchNumSenders.get(exchId);
|
||||
if (senderIdBase == null) {
|
||||
destParams.perExchNumSenders.put(exchId.asInt(), srcParams.instanceExecParams.size());
|
||||
senderIdBase = 0;
|
||||
} else {
|
||||
destParams.perExchNumSenders.put(exchId.asInt(),
|
||||
senderIdBase + srcParams.instanceExecParams.size());
|
||||
}
|
||||
|
||||
for (int i = 0; i < srcParams.instanceExecParams.size(); i++) {
|
||||
FInstanceExecParam srcInstanceParam = srcParams.instanceExecParams.get(i);
|
||||
srcInstanceParam.senderId = senderIdBase + i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// compute instances from fragment
|
||||
private void computeFragmentInstances(FragmentExecParams params) throws Exception {
|
||||
// // traverse input fragments
|
||||
for (PlanFragmentId fragmentId : params.inputFragments) {
|
||||
computeFragmentInstances(fragmentExecParamsMap.get(fragmentId));
|
||||
}
|
||||
|
||||
// case 1: single instance executed at coordinator
|
||||
final PlanFragment fragment = params.fragment;
|
||||
if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {
|
||||
Reference<Long> backendIdRef = new Reference<Long>();
|
||||
TNetworkAddress execHostport = SimpleScheduler.getHost(this.idToBackend, backendIdRef);
|
||||
if (execHostport == null) {
|
||||
LOG.warn("DataPartition UNPARTITIONED, no scanNode Backend");
|
||||
throw new UserException("there is no scanNode Backend");
|
||||
}
|
||||
TUniqueId instanceId = getNextInstanceId();
|
||||
FInstanceExecParam instanceParam = new FInstanceExecParam(instanceId, execHostport,
|
||||
0, params);
|
||||
params.instanceExecParams.add(instanceParam);
|
||||
this.addressToBackendID.put(execHostport, backendIdRef.getRef());
|
||||
return;
|
||||
}
|
||||
|
||||
if (containsUnionNode(fragment.getPlanRoot())) {
|
||||
createUnionInstance(params);
|
||||
return;
|
||||
}
|
||||
|
||||
PlanNode leftPlanNode = findLeftmostNode(fragment.getPlanRoot());
|
||||
if (leftPlanNode instanceof MysqlScanNode
|
||||
|| leftPlanNode instanceof OlapScanNode) {
|
||||
// case 2: leaf fragment with leftmost scan
|
||||
// TODO: check that there's only one scan in this fragment
|
||||
createScanInstance(leftPlanNode.getId(), params);
|
||||
} else {
|
||||
// case 3: interior fragment without leftmost scan
|
||||
// we assign the same hosts as those of our leftmost input fragment (so that a
|
||||
// merge aggregation fragment runs on the hosts that provide the input data)
|
||||
createCollocatedInstance(params);
|
||||
}
|
||||
}
|
||||
|
||||
private List<PlanNodeId> findScanNodes(PlanNode plan) {
|
||||
List<PlanNodeId> result = Lists.newArrayList();
|
||||
List<PlanNode> nodeList = Lists.newArrayList();
|
||||
getAllNodes(plan, nodeList);
|
||||
for (PlanNode node : nodeList) {
|
||||
if (node instanceof MysqlScanNode
|
||||
|| node instanceof OlapScanNode) {
|
||||
result.add(node.getId());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void getAllNodes(PlanNode plan, List<PlanNode> nodeList) {
|
||||
if (plan.getChildren().size() > 0) {
|
||||
nodeList.addAll(plan.getChildren());
|
||||
for (PlanNode child : plan.getChildren()) {
|
||||
getAllNodes(child, nodeList);
|
||||
}
|
||||
}
|
||||
nodeList.add(plan);
|
||||
}
|
||||
|
||||
private Set<TNetworkAddress> getScanHosts(PlanNodeId id, FragmentExecParams fragmentExecParams) {
|
||||
Set<TNetworkAddress> result = Sets.newHashSet();
|
||||
for (TNetworkAddress host : fragmentExecParams.scanRangeAssignment.keySet()) {
|
||||
Map<Integer, List<TScanRangeParams>> planNodeToScanRangeParams
|
||||
= fragmentExecParams.scanRangeAssignment.get(host);
|
||||
for (Integer planNodeId : planNodeToScanRangeParams.keySet()) {
|
||||
if (id.asInt() == planNodeId) {
|
||||
result.add(host);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private void createScanInstance(PlanNodeId leftMostScanId, FragmentExecParams fragmentExecParams)
|
||||
throws UserException {
|
||||
int maxNumInstance = queryOptions.mt_dop;
|
||||
if (maxNumInstance == 0) {
|
||||
maxNumInstance = 1;
|
||||
}
|
||||
|
||||
if (fragmentExecParams.scanRangeAssignment.isEmpty()) {
|
||||
// this scan doesn't have any scan ranges: run a single instance on the random backend
|
||||
Reference<Long> backendIdRef = new Reference<Long>();
|
||||
TNetworkAddress execHostport = SimpleScheduler.getHost(this.idToBackend, backendIdRef);
|
||||
if (execHostport == null) {
|
||||
throw new UserException("there is no scanNode Backend");
|
||||
}
|
||||
FInstanceExecParam instanceParam = new FInstanceExecParam(getNextInstanceId(), execHostport, 0,
|
||||
fragmentExecParams);
|
||||
fragmentExecParams.instanceExecParams.add(instanceParam);
|
||||
return;
|
||||
}
|
||||
|
||||
final int leftMostScanIdInt = leftMostScanId.asInt();
|
||||
int perFragmentInstanceIdx = 0;
|
||||
for (TNetworkAddress host : fragmentExecParams.scanRangeAssignment.keySet()) {
|
||||
// evenly divide up the scan ranges of the leftmost scan between at most
|
||||
// <dop> instances
|
||||
final Map<Integer, List<TScanRangeParams>> scanMap = fragmentExecParams.scanRangeAssignment.get(host);
|
||||
final List<TScanRangeParams> scanRangesList = scanMap.get(leftMostScanIdInt);
|
||||
Preconditions.checkState(scanRangesList != null);
|
||||
// try to load-balance scan ranges by assigning just beyond the average number of
|
||||
// bytes to each instance
|
||||
// TODO: fix shortcomings introduced by uneven split sizes,
|
||||
// this could end up assigning 0 scan ranges to an instance
|
||||
final int numInstance = Math.min(maxNumInstance, scanRangesList.size());
|
||||
Preconditions.checkState(numInstance != 0);
|
||||
final List<FInstanceExecParam> perHostInstanceExecParams = Lists.newArrayList();
|
||||
// create FInstanceExecParam in one host
|
||||
for (int i = 0; i < numInstance; i++) {
|
||||
final FInstanceExecParam instanceParam = new FInstanceExecParam(getNextInstanceId(),
|
||||
host, perFragmentInstanceIdx++, fragmentExecParams);
|
||||
fragmentExecParams.instanceExecParams.add(instanceParam);
|
||||
perHostInstanceExecParams.add(instanceParam);
|
||||
List<TScanRangeParams> paramList = instanceParam.perNodeScanRanges.get(leftMostScanIdInt);
|
||||
if (paramList == null) {
|
||||
paramList = Lists.newArrayList();
|
||||
instanceParam.perNodeScanRanges.put(leftMostScanIdInt, paramList);
|
||||
}
|
||||
}
|
||||
|
||||
// assign tablet
|
||||
Collections.shuffle(scanRangesList);
|
||||
for (int i = 0; i < scanRangesList.size(); i++) {
|
||||
final TScanRangeParams scanRangeParams = scanRangesList.get(i);
|
||||
final int position = i % numInstance;
|
||||
perHostInstanceExecParams.get(position).perNodeScanRanges.get(leftMostScanIdInt).add(scanRangeParams);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void validate() {
|
||||
int numFragments = 0;
|
||||
for (PlanFragment fragment : fragments) {
|
||||
// TODO chenhao fragment' id produced in palo may larger than fragment sizes,
|
||||
// need to update this after merge latest impala plan codes
|
||||
//Preconditions.checkState(fragment.getFragmentId().asInt() <= fragments.size());
|
||||
Preconditions.checkState(fragment.getFragmentId()
|
||||
== fragmentExecParamsMap.get(fragment.getFragmentId()).fragment.getFragmentId());
|
||||
++numFragments;
|
||||
}
|
||||
|
||||
Preconditions.checkState(numFragments == fragmentExecParamsMap.size());
|
||||
|
||||
// we assigned the correct number of scan ranges per (host, node id):
|
||||
// assemble a map from host -> (map from node id -> #scan ranges)
|
||||
Map<TNetworkAddress, Map<Integer, Integer>> countMap = Maps.newHashMap();
|
||||
for (FragmentExecParams fragmentExecParams : fragmentExecParamsMap.values()) {
|
||||
for (FInstanceExecParam instanceExecParam : fragmentExecParams.instanceExecParams) {
|
||||
Map<Integer, Integer> planNodeIdToCount = countMap.get(instanceExecParam.host);
|
||||
if (planNodeIdToCount == null) {
|
||||
planNodeIdToCount = Maps.newHashMap();
|
||||
countMap.put(instanceExecParam.host, planNodeIdToCount);
|
||||
}
|
||||
|
||||
for (Integer planNodeId : instanceExecParam.perNodeScanRanges.keySet()) {
|
||||
Integer count = planNodeIdToCount.get(planNodeId);
|
||||
if (count == null) {
|
||||
planNodeIdToCount.put(planNodeId, 0);
|
||||
count = 0;
|
||||
}
|
||||
int lastCount = planNodeIdToCount.get(planNodeId);
|
||||
planNodeIdToCount.put(planNodeId, lastCount +
|
||||
instanceExecParam.perNodeScanRanges.get(planNodeId).size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (FragmentExecParams fragmentExecParams : fragmentExecParamsMap.values()) {
|
||||
for (TNetworkAddress host : fragmentExecParams.scanRangeAssignment.keySet()) {
|
||||
Preconditions.checkState(countMap.get(host).size() != 0);
|
||||
final Map<Integer, Integer> nodeCountMap = countMap.get(host);
|
||||
Map<Integer, List<TScanRangeParams>> planNodeIdToScanRangeList
|
||||
= fragmentExecParams.scanRangeAssignment.get(host);
|
||||
for (Integer planNodeId : planNodeIdToScanRangeList.keySet()) {
|
||||
Preconditions.checkState(nodeCountMap.get(planNodeId) > 0);
|
||||
Preconditions.checkState(nodeCountMap.get(planNodeId)
|
||||
== planNodeIdToScanRangeList.get(planNodeId).size());
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: add validation for BackendExecParams
|
||||
}
|
||||
|
||||
// create collocated instance according to inputFragments
|
||||
private void createCollocatedInstance(FragmentExecParams fragmentExecParams) {
|
||||
Preconditions.checkState(fragmentExecParams.inputFragments.size() >= 1);
|
||||
final FragmentExecParams inputFragmentParams = fragmentExecParamsMap.get(fragmentExecParams.
|
||||
inputFragments.get(0));
|
||||
int perFragmentInstanceIdx = 0;
|
||||
for (FInstanceExecParam inputInstanceParams : inputFragmentParams.instanceExecParams) {
|
||||
FInstanceExecParam instanceParam = new FInstanceExecParam(getNextInstanceId(),
|
||||
inputInstanceParams.host, perFragmentInstanceIdx++, fragmentExecParams);
|
||||
fragmentExecParams.instanceExecParams.add(instanceParam);
|
||||
}
|
||||
}
|
||||
|
||||
private TUniqueId getNextInstanceId() {
|
||||
TUniqueId result = nextInstanceId.deepCopy();
|
||||
nextInstanceId.lo++;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
private void createUnionInstance(FragmentExecParams fragmentExecParams) {
|
||||
final PlanFragment fragment = fragmentExecParams.fragment;
|
||||
// Add hosts of scan nodes
|
||||
List<PlanNodeId> scanNodeIds = findScanNodes(fragment.getPlanRoot());
|
||||
|
||||
Set<TNetworkAddress> hostsSets = Sets.newHashSet();
|
||||
for(PlanNodeId id: scanNodeIds) {
|
||||
hostsSets.addAll(getScanHosts(id, fragmentExecParams));
|
||||
}
|
||||
|
||||
// UnionNode's child is not ScanNode
|
||||
for (PlanFragmentId inputFragmentId : fragmentExecParams.inputFragments) {
|
||||
FragmentExecParams inputeExecParams = fragmentExecParamsMap.get(inputFragmentId);
|
||||
for (FInstanceExecParam instanceParam : inputeExecParams.instanceExecParams) {
|
||||
hostsSets.add(instanceParam.host);
|
||||
}
|
||||
}
|
||||
|
||||
// create a single instance per host
|
||||
// TODO-MT: figure out how to parallelize Union
|
||||
int perFragmentIdx = 0;
|
||||
for (TNetworkAddress host : hostsSets) {
|
||||
FInstanceExecParam instanceParam = new FInstanceExecParam(getNextInstanceId(), host,
|
||||
perFragmentIdx++, fragmentExecParams);
|
||||
// assign all scan ranges
|
||||
fragmentExecParams.instanceExecParams.add(instanceParam);
|
||||
if (fragmentExecParams.scanRangeAssignment.get(host) != null
|
||||
&& fragmentExecParams.scanRangeAssignment.get(host).size() > 0) {
|
||||
instanceParam.perNodeScanRanges = fragmentExecParams.scanRangeAssignment.get(host);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the id of the leftmost node of any of the gives types in 'plan_root',
|
||||
// or INVALID_PLAN_NODE_ID if no such node present.
|
||||
private PlanNode findLeftmostNode(PlanNode plan) {
|
||||
|
||||
@ -67,7 +67,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final String BATCH_SIZE = "batch_size";
|
||||
public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations";
|
||||
public static final String DISABLE_COLOCATE_JOIN = "disable_colocate_join";
|
||||
public static final String MT_DOP = "mt_dop";
|
||||
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
|
||||
public static final int MIN_EXEC_INSTANCE_NUM = 1;
|
||||
public static final int MAX_EXEC_INSTANCE_NUM = 32;
|
||||
|
||||
// max memory used on every backend.
|
||||
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
|
||||
@ -163,11 +165,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
// if true, need report to coordinator when plan fragment execute successfully.
|
||||
@VariableMgr.VarAttr(name = CODEGEN_LEVEL)
|
||||
private int codegenLevel = 0;
|
||||
|
||||
// multithreaded degree of intra-node parallelism
|
||||
@VariableMgr.VarAttr(name = MT_DOP)
|
||||
private int mtDop = 0;
|
||||
private int codegenLevel = 0;
|
||||
|
||||
@VariableMgr.VarAttr(name = BATCH_SIZE)
|
||||
private int batchSize = 1024;
|
||||
@ -178,6 +176,13 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN)
|
||||
private boolean disableColocateJoin = false;
|
||||
|
||||
/*
|
||||
* the parallel exec instance num for one Fragment in one BE
|
||||
* 1 means disable this feature
|
||||
*/
|
||||
@VariableMgr.VarAttr(name = PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM)
|
||||
private int parallelExecInstanceNum = 1;
|
||||
|
||||
public long getMaxExecMemByte() {
|
||||
return maxExecMemByte;
|
||||
}
|
||||
@ -402,14 +407,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
this.resourceGroup = resourceGroup;
|
||||
}
|
||||
|
||||
public int getMtDop() {
|
||||
return this.mtDop;
|
||||
}
|
||||
|
||||
public void setMtDop(int mtDop) {
|
||||
this.mtDop = mtDop;
|
||||
}
|
||||
|
||||
public boolean isDisableColocateJoin() {
|
||||
return disableColocateJoin;
|
||||
}
|
||||
@ -418,7 +415,21 @@ public class SessionVariable implements Serializable, Writable {
|
||||
this.disableColocateJoin = disableColocateJoin;
|
||||
}
|
||||
|
||||
// Serialize to thrift object
|
||||
public int getParallelExecInstanceNum() {
|
||||
return parallelExecInstanceNum;
|
||||
}
|
||||
|
||||
public void setParallelExecInstanceNum(int parallelExecInstanceNum) {
|
||||
if (parallelExecInstanceNum < MIN_EXEC_INSTANCE_NUM) {
|
||||
this.parallelExecInstanceNum = MIN_EXEC_INSTANCE_NUM;
|
||||
} else if (parallelExecInstanceNum > MAX_EXEC_INSTANCE_NUM) {
|
||||
this.parallelExecInstanceNum = MAX_EXEC_INSTANCE_NUM;
|
||||
} else {
|
||||
this.parallelExecInstanceNum = parallelExecInstanceNum;
|
||||
}
|
||||
}
|
||||
|
||||
// Serialize to thrift object
|
||||
TQueryOptions toThrift() {
|
||||
TQueryOptions tResult = new TQueryOptions();
|
||||
tResult.setMem_limit(maxExecMemByte);
|
||||
@ -435,7 +446,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
tResult.setBatch_size(batchSize);
|
||||
tResult.setDisable_stream_preaggregations(disableStreamPreaggregations);
|
||||
tResult.setMt_dop(mtDop);
|
||||
return tResult;
|
||||
}
|
||||
|
||||
@ -470,7 +480,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
Text.writeString(out, collationServer);
|
||||
out.writeInt(batchSize);
|
||||
out.writeBoolean(disableStreamPreaggregations);
|
||||
out.writeInt(mtDop);
|
||||
out.writeInt(parallelExecInstanceNum);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -507,7 +517,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_38) {
|
||||
batchSize = in.readInt();
|
||||
disableStreamPreaggregations = in.readBoolean();
|
||||
mtDop = in.readInt();
|
||||
parallelExecInstanceNum = in.readInt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
101
fe/src/test/java/org/apache/doris/common/util/ListUtilTest.java
Normal file
101
fe/src/test/java/org/apache/doris/common/util/ListUtilTest.java
Normal file
@ -0,0 +1,101 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common.util;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class ListUtilTest {
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedEx = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testSplitBySizeNormal() {
|
||||
List<Integer> lists = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7);
|
||||
int expectSize = 3;
|
||||
|
||||
List<List<Integer>> splitLists = ListUtil.splitBySize(lists, expectSize);
|
||||
|
||||
Assert.assertEquals(splitLists.size(), 3);
|
||||
Assert.assertEquals(splitLists.get(0).size(), 3);
|
||||
Assert.assertEquals(splitLists.get(1).size(), 2);
|
||||
Assert.assertEquals(splitLists.get(2).size(), 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitBySizeNormal2() {
|
||||
List<Integer> lists = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7);
|
||||
int expectSize = 1;
|
||||
|
||||
List<List<Integer>> splitLists = ListUtil.splitBySize(lists, expectSize);
|
||||
|
||||
Assert.assertEquals(splitLists.size(), 1);
|
||||
Assert.assertEquals(lists, splitLists.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitBySizeWithLargeExpectSize() {
|
||||
List<Integer> lists = Lists.newArrayList(1, 2, 3);
|
||||
int expectSize = 10;
|
||||
|
||||
List<List<Integer>> splitLists = ListUtil.splitBySize(lists, expectSize);
|
||||
|
||||
Assert.assertEquals(splitLists.size(), lists.size());
|
||||
Assert.assertTrue( splitLists.get(0).get(0) == 1);
|
||||
Assert.assertTrue( splitLists.get(1).get(0) == 2);
|
||||
Assert.assertTrue( splitLists.get(2).get(0) == 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitBySizeWithEmptyList() {
|
||||
List<Integer> lists = Lists.newArrayList();
|
||||
int expectSize = 10;
|
||||
|
||||
List<List<Integer>> splitLists = ListUtil.splitBySize(lists, expectSize);
|
||||
|
||||
Assert.assertEquals(splitLists.size(), lists.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitBySizeWithNullList() {
|
||||
List<Integer> lists = null;
|
||||
int expectSize = 10;
|
||||
|
||||
expectedEx.expect(NullPointerException.class);
|
||||
expectedEx.expectMessage("list must not be null");
|
||||
|
||||
ListUtil.splitBySize(lists, expectSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitBySizeWithNegativeSize() {
|
||||
List<Integer> lists = Lists.newArrayList(1, 2, 3);
|
||||
int expectSize = -1;
|
||||
|
||||
expectedEx.expect(IllegalArgumentException.class);
|
||||
expectedEx.expectMessage("expectedSize must larger than 0");
|
||||
|
||||
ListUtil.splitBySize(lists, expectSize);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user