[fix](planner) separate table's isPartitioned() method (#28163)

This PR #27515 change the logic if Table's `isPartitioned()` method.
But this method has 2 usages:

1. To check whether a table is range or list partitioned, for some DML operation such as Alter, Export.

    For this case, it should return true if the table is range or list partitioned. even if it has only
    one partition and one buckets.

2. To check whether the data is distributed (either by partitions or by buckets), for query planner.

    For this case, it should return true if table has more than one bucket. Even if this table is not
    range or list partitioned, if it has more than one bucket, it should return true.

So we should separate this method into 2, for different usages.
Otherwise, it may cause some unreasonable plan shape
This commit is contained in:
Mingyu Chen
2023-12-08 23:15:45 +08:00
committed by GitHub
parent baf85547ae
commit 8eed760704
9 changed files with 62 additions and 10 deletions

View File

@ -1939,7 +1939,7 @@ public class SchemaChangeHandler extends AlterHandler {
BuildIndexClause buildIndexClause = (BuildIndexClause) alterClause;
IndexDef indexDef = buildIndexClause.getIndexDef();
Index index = buildIndexClause.getIndex();
if (!olapTable.isPartitioned()) {
if (!olapTable.isPartitionedTable()) {
List<String> specifiedPartitions = indexDef.getPartitionNames();
if (!specifiedPartitions.isEmpty()) {
throw new DdlException("table " + olapTable.getName()

View File

@ -261,7 +261,7 @@ public class ExportStmt extends StatementBase {
table.readLock();
try {
// check table
if (!table.isPartitioned()) {
if (!table.isPartitionedTable()) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned.");
}
Table.TableType tblType = table.getType();

View File

@ -878,7 +878,7 @@ public class NativeInsertStmt extends InsertStmt {
}
for (String hint : planHints) {
if (SHUFFLE_HINT.equalsIgnoreCase(hint)) {
if (!targetTable.isPartitioned()) {
if (!targetTable.isPartitionedTable()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT);
}
if (isRepartition != null && !isRepartition) {
@ -886,7 +886,7 @@ public class NativeInsertStmt extends InsertStmt {
}
isRepartition = Boolean.TRUE;
} else if (NOSHUFFLE_HINT.equalsIgnoreCase(hint)) {
if (!targetTable.isPartitioned()) {
if (!targetTable.isPartitionedTable()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT);
}
if (isRepartition != null && isRepartition) {

View File

@ -1307,10 +1307,23 @@ public class OlapTable extends Table {
}
@Override
public boolean isPartitioned() {
public boolean isPartitionedTable() {
return !PartitionType.UNPARTITIONED.equals(partitionInfo.getType());
}
// Return true if data is distributed by one more partitions or buckets.
@Override
public boolean isPartitionDistributed() {
int numSegs = 0;
for (Partition part : getPartitions()) {
numSegs += part.getDistributionInfo().getBucketNum();
if (numSegs > 1) {
return true;
}
}
return false;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);

View File

@ -496,8 +496,14 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
}
// return if this table is partitioned.
// For OlapTable, return true only if its partition type is RANGE or HASH
public boolean isPartitionedTable() {
return false;
}
// return if this table is partitioned, for planner.
// For OlapTable ture when is partitioned, or distributed by hash when no partition
public boolean isPartitioned() {
public boolean isPartitionDistributed() {
return false;
}

View File

@ -203,7 +203,7 @@ public class ExportCommand extends Command implements ForwardWithSync {
+ tblType + " type, do not support EXPORT.");
}
// check table
if (!table.isPartitioned()) {
if (!table.isPartitionedTable()) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned.");
}
for (String partitionName : this.partitionsNames) {

View File

@ -214,7 +214,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
* get output physical properties
*/
public PhysicalProperties getRequirePhysicalProperties() {
if (targetTable.isPartitioned()) {
if (targetTable.isPartitionDistributed()) {
DistributionInfo distributionInfo = targetTable.getDefaultDistributionInfo();
if (distributionInfo instanceof HashDistributionInfo) {
HashDistributionInfo hashDistributionInfo

View File

@ -125,7 +125,7 @@ public class DistributedPlanner {
boolean needRepartition = false;
boolean needMerge = false;
if (isFragmentPartitioned(inputFragment)) {
if (targetTable.isPartitioned()) {
if (targetTable.isPartitionDistributed()) {
if (stmt.getDataPartition().getType() == TPartitionType.RANDOM) {
return inputFragment;
}
@ -138,7 +138,7 @@ public class DistributedPlanner {
needMerge = true;
}
} else {
if (targetTable.isPartitioned()) {
if (targetTable.isPartitionDistributed()) {
if (isRepart != null && isRepart) {
needRepartition = true;
} else {

View File

@ -21,6 +21,7 @@ import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
@ -682,4 +683,36 @@ public class PlannerTest extends TestWithFeService {
Assertions.assertFalse(plan1.contains("order by:"));
}
}
@Test
public void testInsertPlan() throws Exception {
FeConstants.runningUnitTest = true;
// 1. should not contains exchange node in old planner
boolean v = connectContext.getSessionVariable().isEnableNereidsPlanner();
try {
connectContext.getSessionVariable().setEnableNereidsPlanner(false);
String sql1 = "explain insert into db1.tbl1 select * from db1.tbl1";
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
String plan1 = planner1.getExplainString(new ExplainOptions(false, false, false));
Assertions.assertFalse(plan1.contains("VEXCHANGE"));
} finally {
connectContext.getSessionVariable().setEnableNereidsPlanner(v);
}
// 1. should not contains exchange node in new planner
v = connectContext.getSessionVariable().isEnableNereidsPlanner();
try {
connectContext.getSessionVariable().setEnableNereidsPlanner(true);
String sql1 = "explain insert into db1.tbl1 select * from db1.tbl1";
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
String plan1 = planner1.getExplainString(new ExplainOptions(false, false, false));
Assertions.assertFalse(plan1.contains("VEXCHANGE"));
} finally {
connectContext.getSessionVariable().setEnableNereidsPlanner(v);
}
}
}