[feature](sink) support paritition tablet sink shuffle (#30914)

Co-authored-by: morrySnow <morrysnow@126.com>
This commit is contained in:
zhangstar333
2024-02-29 20:09:24 +08:00
committed by yiguolei
parent 0aa7108ee2
commit 819ab6fc00
14 changed files with 310 additions and 57 deletions

View File

@ -70,6 +70,7 @@ import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.properties.DistributionSpecStorageAny;
import org.apache.doris.nereids.properties.DistributionSpecStorageGather;
import org.apache.doris.nereids.properties.DistributionSpecTabletIdShuffle;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
@ -2458,6 +2459,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
+ distributionSpecHash.getShuffleType());
}
return new DataPartition(partitionType, partitionExprs);
} else if (distributionSpec instanceof DistributionSpecTabletIdShuffle) {
return DataPartition.TABLET_ID;
} else {
throw new RuntimeException("Unknown DistributionSpec: " + distributionSpec);
}

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.properties;
/**
* use for shuffle data by tablet-id before sink.
*/
public class DistributionSpecTabletIdShuffle extends DistributionSpec {
public static final DistributionSpecTabletIdShuffle INSTANCE = new DistributionSpecTabletIdShuffle();
private DistributionSpecTabletIdShuffle() {
super();
}
@Override
public boolean satisfy(DistributionSpec other) {
return other instanceof DistributionSpecTabletIdShuffle;
}
}

View File

@ -46,6 +46,9 @@ public class PhysicalProperties {
public static PhysicalProperties MUST_SHUFFLE = new PhysicalProperties(DistributionSpecMustShuffle.INSTANCE);
public static PhysicalProperties TABLET_ID_SHUFFLE
= new PhysicalProperties(DistributionSpecTabletIdShuffle.INSTANCE);
private final OrderSpec orderSpec;
private final DistributionSpec distributionSpec;

View File

@ -60,7 +60,10 @@ import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
@ -74,6 +77,8 @@ import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
@ -153,8 +158,8 @@ public class InsertExecutor {
/**
* finalize sink to complete enough info for sink execution
*/
public void finalizeSink(DataSink sink, boolean isPartialUpdate, boolean isFromInsert,
boolean allowAutoPartition) {
public void finalizeSink(PlanFragment fragment, DataSink sink,
boolean isPartialUpdate, boolean isFromInsert, boolean allowAutoPartition) {
if (!(sink instanceof OlapTableSink)) {
return;
}
@ -175,6 +180,23 @@ public class InsertExecutor {
if (!allowAutoPartition) {
olapTableSink.setAutoPartition(false);
}
// update
// set schema and partition info for tablet id shuffle exchange
if (fragment.getPlanRoot() instanceof ExchangeNode
&& fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink());
Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get());
dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
database.getId(), olapTableSink.getDstTable(), analyzer));
dataStreamSink.setTabletSinkPartitionParam(olapTableSink.createPartition(
database.getId(), olapTableSink.getDstTable(), analyzer));
dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor());
List<TOlapTableLocationParam> locationParams = olapTableSink
.createLocation(olapTableSink.getDstTable());
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}

View File

@ -37,9 +37,12 @@ import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.GroupCommitPlanner;
@ -183,7 +186,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), label, planner);
insertExecutor.beginTransaction();
insertExecutor.finalizeSink(sink, physicalOlapTableSink.isPartialUpdate(),
insertExecutor.finalizeSink(planner.getFragments().get(0), sink, physicalOlapTableSink.isPartialUpdate(),
physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT, this.allowAutoPartition);
} finally {
targetTableIf.readUnlock();
@ -254,12 +257,20 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
}
OlapTable targetTable = physicalOlapTableSink.getTargetTable();
return ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& !ctx.isTxnModel() && sink.getFragment().getPlanRoot() instanceof UnionNode
&& !ctx.isTxnModel() && isGroupCommitAvailablePlan(physicalOlapTableSink)
&& physicalOlapTableSink.getPartitionIds().isEmpty() && targetTable.getTableProperty()
.getUseSchemaLightChange() && !targetTable.getQualifiedDbName()
.equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME);
}
private boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<? extends Plan> sink) {
Plan child = sink.child();
if (child instanceof PhysicalDistribute) {
child = child.child(0);
}
return child instanceof OneRowRelation || (child instanceof PhysicalUnion && child.arity() == 0);
}
@Override
public Plan getExplainPlan(ConnectContext ctx) {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {

View File

@ -25,7 +25,6 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
@ -42,13 +41,11 @@ import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* physical olap table sink for insert command
@ -218,23 +215,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
if (targetTable.isPartitionDistributed()) {
DistributionInfo distributionInfo = targetTable.getDefaultDistributionInfo();
if (distributionInfo instanceof HashDistributionInfo) {
HashDistributionInfo hashDistributionInfo
= ((HashDistributionInfo) targetTable.getDefaultDistributionInfo());
List<Column> distributedColumns = hashDistributionInfo.getDistributionColumns();
List<Integer> columnIndexes = Lists.newArrayList();
int idx = 0;
for (int i = 0; i < targetTable.getFullSchema().size(); ++i) {
if (targetTable.getFullSchema().get(i).equals(distributedColumns.get(idx))) {
columnIndexes.add(i);
idx++;
if (idx == distributedColumns.size()) {
break;
}
}
}
return PhysicalProperties.createHash(columnIndexes.stream()
.map(colIdx -> child().getOutput().get(colIdx).getExprId())
.collect(Collectors.toList()), ShuffleType.NATURAL);
return PhysicalProperties.TABLET_ID_SHUFFLE;
} else if (distributionInfo instanceof RandomDistributionInfo) {
return PhysicalProperties.ANY;
} else {

View File

@ -32,8 +32,6 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
@ -46,16 +44,14 @@ import java.util.List;
* TODO: better name? just Partitioning?
*/
public class DataPartition {
private static final Logger LOG = LogManager.getLogger(DataPartition.class);
public static final DataPartition UNPARTITIONED = new DataPartition(TPartitionType.UNPARTITIONED);
public static final DataPartition RANDOM = new DataPartition(TPartitionType.RANDOM);
public static final DataPartition TABLET_ID = new DataPartition(TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED);
private final TPartitionType type;
// for hash partition: exprs used to compute hash value
private ImmutableList<Expr> partitionExprs = ImmutableList.of();
private ImmutableList<Expr> partitionExprs;
public DataPartition(TPartitionType type, List<Expr> exprs) {
Preconditions.checkNotNull(exprs);
@ -67,13 +63,10 @@ public class DataPartition {
this.partitionExprs = ImmutableList.copyOf(exprs);
}
public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws AnalysisException {
List<Expr> list = Expr.trySubstituteList(partitionExprs, smap, analyzer, false);
partitionExprs = ImmutableList.copyOf(list);
}
public DataPartition(TPartitionType type) {
Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type == TPartitionType.RANDOM);
Preconditions.checkState(type == TPartitionType.UNPARTITIONED
|| type == TPartitionType.RANDOM
|| type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED);
this.type = type;
this.partitionExprs = ImmutableList.of();
}
@ -82,6 +75,11 @@ public class DataPartition {
return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs);
}
public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws AnalysisException {
List<Expr> list = Expr.trySubstituteList(partitionExprs, smap, analyzer, false);
partitionExprs = ImmutableList.copyOf(list);
}
public boolean isPartitioned() {
return type != TPartitionType.UNPARTITIONED;
}
@ -106,16 +104,6 @@ public class DataPartition {
return result;
}
/**
* Returns true if 'this' is a partition that is compatible with the
* requirements of 's'.
* TODO: specify more clearly and implement
*/
public boolean isCompatible(DataPartition s) {
// TODO: implement
return true;
}
public String getExplainString(TExplainLevel explainLevel) {
StringBuilder str = new StringBuilder();
str.append(type.toString());
@ -127,7 +115,7 @@ public class DataPartition {
for (Expr expr : partitionExprs) {
strings.add(expr.toSql());
}
str.append(": " + Joiner.on(", ").join(strings));
str.append(": ").append(Joiner.on(", ").join(strings));
}
str.append("\n");
return str.toString();

View File

@ -27,8 +27,12 @@ import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TDataStreamSink;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TOlapTablePartitionParam;
import org.apache.doris.thrift.TOlapTableSchemaParam;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;
@ -52,6 +56,13 @@ public class DataStreamSink extends DataSink {
protected List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
// use for tablet id shuffle sink only
protected TOlapTableSchemaParam tabletSinkSchemaParam = null;
protected TOlapTablePartitionParam tabletSinkPartitionParam = null;
protected TOlapTableLocationParam tabletSinkLocationParam = null;
protected TupleDescriptor tabletSinkTupleDesc = null;
protected long tabletSinkTxnId = -1;
public DataStreamSink() {
}
@ -118,6 +129,26 @@ public class DataStreamSink extends DataSink {
this.runtimeFilters.add(runtimeFilter);
}
public void setTabletSinkSchemaParam(TOlapTableSchemaParam schemaParam) {
this.tabletSinkSchemaParam = schemaParam;
}
public void setTabletSinkPartitionParam(TOlapTablePartitionParam partitionParam) {
this.tabletSinkPartitionParam = partitionParam;
}
public void setTabletSinkTupleDesc(TupleDescriptor tupleDesc) {
this.tabletSinkTupleDesc = tupleDesc;
}
public void setTabletSinkLocationParam(TOlapTableLocationParam locationParam) {
this.tabletSinkLocationParam = locationParam;
}
public void setTabletSinkTxnId(long txnId) {
this.tabletSinkTxnId = txnId;
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
@ -179,6 +210,21 @@ public class DataStreamSink extends DataSink {
tStreamSink.addToRuntimeFilters(rf.toThrift());
}
}
Preconditions.checkState((tabletSinkSchemaParam != null) == (tabletSinkPartitionParam != null),
"schemaParam and partitionParam should be set together.");
if (tabletSinkSchemaParam != null) {
tStreamSink.setTabletSinkSchema(tabletSinkSchemaParam);
}
if (tabletSinkPartitionParam != null) {
tStreamSink.setTabletSinkPartition(tabletSinkPartitionParam);
}
if (tabletSinkTupleDesc != null) {
tStreamSink.setTabletSinkTupleId(tabletSinkTupleDesc.getId().asInt());
}
if (tabletSinkLocationParam != null) {
tStreamSink.setTabletSinkLocation(tabletSinkLocationParam);
}
tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
result.setStreamSink(tStreamSink);
return result;
}

View File

@ -110,6 +110,7 @@ public class OlapTableSink extends DataSink {
private boolean singleReplicaLoad;
private boolean isStrictMode = false;
private long txnId = -1;
public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
boolean singleReplicaLoad) {
@ -129,6 +130,7 @@ public class OlapTableSink extends DataSink {
tSink.setLoadChannelTimeoutS(loadChannelTimeoutS);
tSink.setSendBatchParallelism(sendBatchParallelism);
this.isStrictMode = isStrictMode;
this.txnId = txnId;
if (loadToSingleTablet && !(dstTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) {
throw new AnalysisException(
"if load_to_single_tablet set to true," + " the olap table must be with random distribution");
@ -237,7 +239,7 @@ public class OlapTableSink extends DataSink {
return tDataSink;
}
private TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer analyzer) throws AnalysisException {
public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer analyzer) throws AnalysisException {
TOlapTableSchemaParam schemaParam = new TOlapTableSchemaParam();
schemaParam.setDbId(dbId);
schemaParam.setTableId(table.getId());
@ -321,7 +323,7 @@ public class OlapTableSink extends DataSink {
return distColumns;
}
private TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer)
public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer)
throws UserException {
TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam();
partitionParam.setDbId(dbId);
@ -479,7 +481,7 @@ public class OlapTableSink extends DataSink {
}
}
private List<TOlapTableLocationParam> createLocation(OlapTable table) throws UserException {
public List<TOlapTableLocationParam> createLocation(OlapTable table) throws UserException {
TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam();
// BE id -> path hash
@ -571,7 +573,7 @@ public class OlapTableSink extends DataSink {
bePathsMap.putAll(result);
}
private TPaloNodesInfo createPaloNodesInfo() {
public TPaloNodesInfo createPaloNodesInfo() {
TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
for (Long id : systemInfoService.getAllBackendIds(false)) {
@ -584,4 +586,16 @@ public class OlapTableSink extends DataSink {
protected TDataSinkType getDataSinkType() {
return TDataSinkType.OLAP_TABLE_SINK;
}
public OlapTable getDstTable() {
return dstTable;
}
public TupleDescriptor getTupleDescriptor() {
return tupleDescriptor;
}
public long getTxnId() {
return txnId;
}
}

View File

@ -3206,6 +3206,15 @@ public class SessionVariable implements Serializable, Writable {
this.dumpNereidsMemo = dumpNereidsMemo;
}
public void disableStrictConsistencyDmlOnce() throws DdlException {
if (!enableStrictConsistencyDml) {
return;
}
setIsSingleSetVar(true);
VariableMgr.setVar(this,
new SetVar(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, new StringLiteral("false")));
}
public void enableFallbackToOriginalPlannerOnce() throws DdlException {
if (enableFallbackToOriginalPlanner) {
return;