[feature](Nereids) parallel output file (#31623)

legacy planner impl PR: #6539
This commit is contained in:
morrySnow
2024-03-04 12:14:07 +08:00
committed by yiguolei
parent a5b9127656
commit e2ebf9d566
13 changed files with 141 additions and 70 deletions

View File

@ -129,7 +129,7 @@ public class OutFileClause {
private static final String HADOOP_FS_PROP_PREFIX = "dfs.";
private static final String HADOOP_PROP_PREFIX = "hadoop.";
private static final String BROKER_PROP_PREFIX = "broker.";
private static final String PROP_BROKER_NAME = "broker.name";
public static final String PROP_BROKER_NAME = "broker.name";
public static final String PROP_COLUMN_SEPARATOR = "column_separator";
public static final String PROP_LINE_DELIMITER = "line_delimiter";
public static final String PROP_MAX_FILE_SIZE = "max_file_size";

View File

@ -94,7 +94,7 @@ public class SlotRef extends Expr {
this.desc = desc;
this.type = desc.getType();
// TODO(zc): label is meaningful
this.label = null;
this.label = desc.getLabel();
if (this.type.equals(Type.CHAR)) {
this.type = Type.VARCHAR;
}

View File

@ -245,10 +245,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
*/
public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
PlanFragment rootFragment = physicalPlan.accept(this, context);
List<Expr> outputExprs = Lists.newArrayList();
physicalPlan.getOutput().stream().map(Slot::getExprId)
.forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
rootFragment.setOutputExprs(outputExprs);
if (CollectionUtils.isEmpty(rootFragment.getOutputExprs())) {
List<Expr> outputExprs = Lists.newArrayList();
physicalPlan.getOutput().stream().map(Slot::getExprId)
.forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
rootFragment.setOutputExprs(outputExprs);
}
Collections.reverse(context.getPlanFragments());
// TODO: maybe we need to trans nullable directly? and then we could remove call computeMemLayout
context.getDescTable().computeMemLayout();
@ -369,8 +371,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
public PlanFragment visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink,
PlanTranslatorContext context) {
PlanFragment planFragment = physicalResultSink.child().accept(this, context);
TResultSinkType resultSinkType = context.getConnectContext() != null ? context.getConnectContext()
.getResultSinkType() : null;
TResultSinkType resultSinkType = context.getConnectContext() != null
? context.getConnectContext().getResultSinkType() : null;
planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId(), resultSinkType));
return planFragment;
}
@ -426,7 +428,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = fileSink.child().accept(this, context);
PlanFragment sinkFragment = fileSink.child().accept(this, context);
OutFileClause outFile = new OutFileClause(
fileSink.getFilePath(),
fileSink.getFormat(),
@ -436,7 +438,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
List<Expr> outputExprs = Lists.newArrayList();
fileSink.getOutput().stream().map(Slot::getExprId)
.forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
rootFragment.setOutputExprs(outputExprs);
sinkFragment.setOutputExprs(outputExprs);
// generate colLabels
List<String> labels = fileSink.getOutput().stream().map(NamedExpression::getName).collect(Collectors.toList());
@ -447,11 +449,49 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
ResultFileSink sink = new ResultFileSink(rootFragment.getPlanRoot().getId(), outFile,
ResultFileSink resultFileSink = new ResultFileSink(sinkFragment.getPlanRoot().getId(), outFile,
(ArrayList<String>) labels);
rootFragment.setSink(sink);
return rootFragment;
sinkFragment.setSink(resultFileSink);
// TODO: do parallel sink, we should do it in Nereids, but now we impl here temporarily
// because impl in Nereids affect too many things
if (fileSink.requestProperties(context.getConnectContext()).equals(PhysicalProperties.GATHER)) {
return sinkFragment;
} else {
// create output tuple
TupleDescriptor fileStatusDesc = ResultFileSink.constructFileStatusTupleDesc(context.getDescTable());
// create exchange node
ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), sinkFragment.getPlanRoot());
exchangeNode.setPartitionType(TPartitionType.UNPARTITIONED);
exchangeNode.setNumInstances(1);
// create final result sink
TResultSinkType resultSinkType = context.getConnectContext() != null
? context.getConnectContext().getResultSinkType() : null;
ResultSink resultSink = new ResultSink(exchangeNode.getId(), resultSinkType);
// create top fragment
PlanFragment topFragment = new PlanFragment(context.nextFragmentId(), exchangeNode,
DataPartition.UNPARTITIONED);
topFragment.addChild(sinkFragment);
topFragment.setSink(resultSink);
context.addPlanFragment(topFragment);
// update sink fragment and result file sink
DataStreamSink streamSink = new DataStreamSink(exchangeNode.getId());
streamSink.setOutputPartition(DataPartition.UNPARTITIONED);
resultFileSink.resetByDataStreamSink(streamSink);
resultFileSink.setOutputTupleId(fileStatusDesc.getId());
sinkFragment.setDestination(exchangeNode);
// set out expr and tuple correct
exchangeNode.resetTupleIds(Lists.newArrayList(fileStatusDesc.getId()));
topFragment.resetOutputExprs(fileStatusDesc);
return topFragment;
}
}
/* ********************************************************************************************

View File

@ -331,7 +331,7 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
@Override
public Void visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
addRequestPropertyToChildren(fileSink.requestProperties(connectContext));
return null;
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
@ -27,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
@ -83,6 +85,21 @@ public class PhysicalFileSink<CHILD_TYPE extends Plan> extends PhysicalSink<CHIL
return properties;
}
/**
* if enable parallel outfile and not broker export, we should request any here.
* and it will add a top fragment to summary export result in PhysicalPlanTranslator.
*/
public PhysicalProperties requestProperties(ConnectContext ctx) {
if (!ctx.getSessionVariable().enableParallelOutfile
|| ctx.getSessionVariable().getEnablePipelineEngine()
|| ctx.getSessionVariable().getEnablePipelineXEngine()
|| properties.containsKey(OutFileClause.PROP_BROKER_NAME)) {
return PhysicalProperties.GATHER;
}
// come here means we turn on parallel output export
return PhysicalProperties.ANY;
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "PhysicalFileSink only accepts one child");

View File

@ -37,8 +37,6 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
@ -385,7 +383,7 @@ public class OriginalPlanner extends Planner {
return;
}
// create result file sink desc
TupleDescriptor fileStatusDesc = constructFileStatusTupleDesc(analyzer);
TupleDescriptor fileStatusDesc = ResultFileSink.constructFileStatusTupleDesc(analyzer.getDescTbl());
resultFileSink.resetByDataStreamSink((DataStreamSink) secondPlanFragment.getSink());
resultFileSink.setOutputTupleId(fileStatusDesc.getId());
secondPlanFragment.setOutputExprs(topPlanFragment.getOutputExprs());
@ -554,41 +552,6 @@ public class OriginalPlanner extends Planner {
}
}
/**
* Construct a tuple for file status, the tuple schema as following:
* | FileNumber | Int |
* | TotalRows | Bigint |
* | FileSize | Bigint |
* | URL | Varchar |
*/
private TupleDescriptor constructFileStatusTupleDesc(Analyzer analyzer) {
TupleDescriptor resultFileStatusTupleDesc =
analyzer.getDescTbl().createTupleDescriptor("result_file_status");
resultFileStatusTupleDesc.setIsMaterialized(true);
SlotDescriptor fileNumber = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
fileNumber.setLabel("FileNumber");
fileNumber.setType(ScalarType.createType(PrimitiveType.INT));
fileNumber.setIsMaterialized(true);
fileNumber.setIsNullable(false);
SlotDescriptor totalRows = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
totalRows.setLabel("TotalRows");
totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT));
totalRows.setIsMaterialized(true);
totalRows.setIsNullable(false);
SlotDescriptor fileSize = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
fileSize.setLabel("FileSize");
fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT));
fileSize.setIsMaterialized(true);
fileSize.setIsNullable(false);
SlotDescriptor url = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
url.setLabel("URL");
url.setType(ScalarType.createType(PrimitiveType.VARCHAR));
url.setIsMaterialized(true);
url.setIsNullable(false);
resultFileStatusTupleDesc.computeStatAndMemLayout();
return resultFileStatusTupleDesc;
}
private static class QueryStatisticsTransferOptimizer {
private final PlanFragment root;

View File

@ -17,9 +17,15 @@
package org.apache.doris.planner;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
@ -136,4 +142,43 @@ public class ResultFileSink extends DataSink {
public DataPartition getOutputPartition() {
return outputPartition;
}
/**
* Construct a tuple for file status, the tuple schema as following:
* | FileNumber | Int |
* | TotalRows | Bigint |
* | FileSize | Bigint |
* | URL | Varchar |
*/
public static TupleDescriptor constructFileStatusTupleDesc(DescriptorTable descriptorTable) {
TupleDescriptor resultFileStatusTupleDesc =
descriptorTable.createTupleDescriptor("result_file_status");
resultFileStatusTupleDesc.setIsMaterialized(true);
SlotDescriptor fileNumber = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
fileNumber.setLabel("FileNumber");
fileNumber.setType(ScalarType.createType(PrimitiveType.INT));
fileNumber.setColumn(new Column("FileNumber", ScalarType.createType(PrimitiveType.INT)));
fileNumber.setIsMaterialized(true);
fileNumber.setIsNullable(false);
SlotDescriptor totalRows = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
totalRows.setLabel("TotalRows");
totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT));
totalRows.setColumn(new Column("TotalRows", ScalarType.createType(PrimitiveType.BIGINT)));
totalRows.setIsMaterialized(true);
totalRows.setIsNullable(false);
SlotDescriptor fileSize = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
fileSize.setLabel("FileSize");
fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT));
fileSize.setColumn(new Column("FileSize", ScalarType.createType(PrimitiveType.BIGINT)));
fileSize.setIsMaterialized(true);
fileSize.setIsNullable(false);
SlotDescriptor url = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
url.setLabel("URL");
url.setType(ScalarType.createType(PrimitiveType.VARCHAR));
url.setColumn(new Column("URL", ScalarType.createType(PrimitiveType.VARCHAR)));
url.setIsMaterialized(true);
url.setIsNullable(false);
resultFileStatusTupleDesc.computeStatAndMemLayout();
return resultFileStatusTupleDesc;
}
}