From 8eed760704cf8f707cda839dfc570eef875a7a6a Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 8 Dec 2023 23:15:45 +0800 Subject: [PATCH] [fix](planner) separate table's isPartitioned() method (#28163) This PR #27515 change the logic if Table's `isPartitioned()` method. But this method has 2 usages: 1. To check whether a table is range or list partitioned, for some DML operation such as Alter, Export. For this case, it should return true if the table is range or list partitioned. even if it has only one partition and one buckets. 2. To check whether the data is distributed (either by partitions or by buckets), for query planner. For this case, it should return true if table has more than one bucket. Even if this table is not range or list partitioned, if it has more than one bucket, it should return true. So we should separate this method into 2, for different usages. Otherwise, it may cause some unreasonable plan shape --- .../doris/alter/SchemaChangeHandler.java | 2 +- .../org/apache/doris/analysis/ExportStmt.java | 2 +- .../doris/analysis/NativeInsertStmt.java | 4 +-- .../org/apache/doris/catalog/OlapTable.java | 15 ++++++++- .../java/org/apache/doris/catalog/Table.java | 8 ++++- .../trees/plans/commands/ExportCommand.java | 2 +- .../plans/physical/PhysicalOlapTableSink.java | 2 +- .../doris/planner/DistributedPlanner.java | 4 +-- .../org/apache/doris/planner/PlannerTest.java | 33 +++++++++++++++++++ 9 files changed, 62 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index dc238044eb..d6ab6a972e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1939,7 +1939,7 @@ public class SchemaChangeHandler extends AlterHandler { BuildIndexClause buildIndexClause = (BuildIndexClause) alterClause; IndexDef indexDef = buildIndexClause.getIndexDef(); Index index = buildIndexClause.getIndex(); - if (!olapTable.isPartitioned()) { + if (!olapTable.isPartitionedTable()) { List specifiedPartitions = indexDef.getPartitionNames(); if (!specifiedPartitions.isEmpty()) { throw new DdlException("table " + olapTable.getName() diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 5034d01be1..686dfcc6fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -261,7 +261,7 @@ public class ExportStmt extends StatementBase { table.readLock(); try { // check table - if (!table.isPartitioned()) { + if (!table.isPartitionedTable()) { throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned."); } Table.TableType tblType = table.getType(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index b69ed837b8..0c70189bec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -878,7 +878,7 @@ public class NativeInsertStmt extends InsertStmt { } for (String hint : planHints) { if (SHUFFLE_HINT.equalsIgnoreCase(hint)) { - if (!targetTable.isPartitioned()) { + if (!targetTable.isPartitionedTable()) { ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT); } if (isRepartition != null && !isRepartition) { @@ -886,7 +886,7 @@ public class NativeInsertStmt extends InsertStmt { } isRepartition = Boolean.TRUE; } else if (NOSHUFFLE_HINT.equalsIgnoreCase(hint)) { - if (!targetTable.isPartitioned()) { + if (!targetTable.isPartitionedTable()) { ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT); } if (isRepartition != null && isRepartition) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index adfa20fe1c..9195fa1f4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1307,10 +1307,23 @@ public class OlapTable extends Table { } @Override - public boolean isPartitioned() { + public boolean isPartitionedTable() { return !PartitionType.UNPARTITIONED.equals(partitionInfo.getType()); } + // Return true if data is distributed by one more partitions or buckets. + @Override + public boolean isPartitionDistributed() { + int numSegs = 0; + for (Partition part : getPartitions()) { + numSegs += part.getDistributionInfo().getBucketNum(); + if (numSegs > 1) { + return true; + } + } + return false; + } + @Override public void write(DataOutput out) throws IOException { super.write(out); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index bb9a9cd28e..f6481aea2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -496,8 +496,14 @@ public abstract class Table extends MetaObject implements Writable, TableIf { } // return if this table is partitioned. + // For OlapTable, return true only if its partition type is RANGE or HASH + public boolean isPartitionedTable() { + return false; + } + + // return if this table is partitioned, for planner. // For OlapTable ture when is partitioned, or distributed by hash when no partition - public boolean isPartitioned() { + public boolean isPartitionDistributed() { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java index 0aa34a0428..6d4fb22282 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java @@ -203,7 +203,7 @@ public class ExportCommand extends Command implements ForwardWithSync { + tblType + " type, do not support EXPORT."); } // check table - if (!table.isPartitioned()) { + if (!table.isPartitionedTable()) { throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned."); } for (String partitionName : this.partitionsNames) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index 093c87281e..8a4daba8cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -214,7 +214,7 @@ public class PhysicalOlapTableSink extends PhysicalSink * get output physical properties */ public PhysicalProperties getRequirePhysicalProperties() { - if (targetTable.isPartitioned()) { + if (targetTable.isPartitionDistributed()) { DistributionInfo distributionInfo = targetTable.getDefaultDistributionInfo(); if (distributionInfo instanceof HashDistributionInfo) { HashDistributionInfo hashDistributionInfo diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index a719081496..d3dd27ee73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -125,7 +125,7 @@ public class DistributedPlanner { boolean needRepartition = false; boolean needMerge = false; if (isFragmentPartitioned(inputFragment)) { - if (targetTable.isPartitioned()) { + if (targetTable.isPartitionDistributed()) { if (stmt.getDataPartition().getType() == TPartitionType.RANDOM) { return inputFragment; } @@ -138,7 +138,7 @@ public class DistributedPlanner { needMerge = true; } } else { - if (targetTable.isPartitioned()) { + if (targetTable.isPartitionDistributed()) { if (isRepart != null && isRepart) { needRepartition = true; } else { diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java index f2349733c1..493053b3b1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.ExplainOptions; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeConstants; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; @@ -682,4 +683,36 @@ public class PlannerTest extends TestWithFeService { Assertions.assertFalse(plan1.contains("order by:")); } } + + @Test + public void testInsertPlan() throws Exception { + FeConstants.runningUnitTest = true; + // 1. should not contains exchange node in old planner + boolean v = connectContext.getSessionVariable().isEnableNereidsPlanner(); + try { + connectContext.getSessionVariable().setEnableNereidsPlanner(false); + String sql1 = "explain insert into db1.tbl1 select * from db1.tbl1"; + StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); + stmtExecutor1.execute(); + Planner planner1 = stmtExecutor1.planner(); + String plan1 = planner1.getExplainString(new ExplainOptions(false, false, false)); + Assertions.assertFalse(plan1.contains("VEXCHANGE")); + } finally { + connectContext.getSessionVariable().setEnableNereidsPlanner(v); + } + + // 1. should not contains exchange node in new planner + v = connectContext.getSessionVariable().isEnableNereidsPlanner(); + try { + connectContext.getSessionVariable().setEnableNereidsPlanner(true); + String sql1 = "explain insert into db1.tbl1 select * from db1.tbl1"; + StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); + stmtExecutor1.execute(); + Planner planner1 = stmtExecutor1.planner(); + String plan1 = planner1.getExplainString(new ExplainOptions(false, false, false)); + Assertions.assertFalse(plan1.contains("VEXCHANGE")); + } finally { + connectContext.getSessionVariable().setEnableNereidsPlanner(v); + } + } }