[fix](nereids)Add catalog/db/table filter info in SchemaScanNode (#46864) (#47759)

backport: https://github.com/apache/doris/pull/46864
This commit is contained in:
James
2025-02-11 19:05:27 +08:00
committed by GitHub
parent 964c940131
commit e2f9b436a4
15 changed files with 347 additions and 21 deletions

View File

@ -112,6 +112,9 @@ Status SchemaTablesScanner::_get_new_table() {
if (nullptr != _param->common_param->wild) {
table_params.__set_pattern(*(_param->common_param->wild));
}
if (nullptr != _param->common_param->table) {
table_params.__set_table(*(_param->common_param->table));
}
if (nullptr != _param->common_param->current_user_ident) {
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
} else {

View File

@ -910,9 +910,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
SchemaScanNode scanNode = null;
if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable(
table.getName())) {
scanNode = new BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), tupleDescriptor);
scanNode = new BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), tupleDescriptor,
schemaScan.getSchemaCatalog().orElse(null), schemaScan.getSchemaDatabase().orElse(null),
schemaScan.getSchemaTable().orElse(null));
} else {
scanNode = new SchemaScanNode(context.nextPlanNodeId(), tupleDescriptor);
scanNode = new SchemaScanNode(context.nextPlanNodeId(), tupleDescriptor,
schemaScan.getSchemaCatalog().orElse(null), schemaScan.getSchemaDatabase().orElse(null),
schemaScan.getSchemaTable().orElse(null));
}
scanNode.setNereidsId(schemaScan.getId());
SchemaScanNode finalScanNode = scanNode;

View File

@ -106,6 +106,7 @@ import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderTopN;
import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoinOneSide;
import org.apache.doris.nereids.rules.rewrite.PushDownDistinctThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownFilterIntoSchemaScan;
import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughProject;
import org.apache.doris.nereids.rules.rewrite.PushDownLimit;
import org.apache.doris.nereids.rules.rewrite.PushDownLimitDistinctThroughJoin;
@ -388,7 +389,8 @@ public class Rewriter extends AbstractBatchJobExecutor {
topDown(
new PruneOlapScanPartition(),
new PruneEmptyPartition(),
new PruneFileScanPartition()
new PruneFileScanPartition(),
new PushDownFilterIntoSchemaScan()
)
),
topic("MV optimization",

View File

@ -280,6 +280,7 @@ public enum RuleType {
OLAP_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
FILE_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
PUSH_FILTER_INTO_SCHEMA_SCAN(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_JDBC_SCAN(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_ODBC_SCAN(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_ES_SCAN(RuleTypeClass.REWRITE),

View File

@ -34,7 +34,10 @@ public class LogicalSchemaScanToPhysicalSchemaScan extends OneImplementationRule
scan.getTable(),
scan.getQualifier(),
Optional.empty(),
scan.getLogicalProperties())
scan.getLogicalProperties(),
scan.getSchemaCatalog(),
scan.getSchemaDatabase(),
scan.getSchemaTable())
).toRule(RuleType.LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE);
}
}

View File

@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.catalog.Column;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
import com.google.common.collect.ImmutableList;
import java.util.Optional;
/**
* Used to push down catalog/db/table name to schema scan node.
*/
public class PushDownFilterIntoSchemaScan extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalFilter(logicalSchemaScan()).when(p -> !p.child().isFilterPushed()).thenApply(ctx -> {
LogicalFilter<LogicalSchemaScan> filter = ctx.root;
LogicalSchemaScan scan = filter.child();
Optional<String> schemaCatalog = Optional.empty();
Optional<String> schemaDatabase = Optional.empty();
Optional<String> schemaTable = Optional.empty();
for (Expression expression : filter.getConjuncts()) {
if (!(expression instanceof EqualTo)) {
continue;
}
Expression slot = expression.child(0);
if (!(slot instanceof SlotReference)) {
continue;
}
Optional<Column> column = ((SlotReference) slot).getColumn();
if (!column.isPresent()) {
continue;
}
String columnName = column.get().getName();
Expression slotValue = expression.child(1);
if (!(slotValue instanceof VarcharLiteral)) {
continue;
}
String columnValue = ((VarcharLiteral) slotValue).getValue();
if ("TABLE_CATALOG".equals(columnName)) {
schemaCatalog = Optional.of(columnValue);
} else if ("TABLE_SCHEMA".equals(columnName)) {
schemaDatabase = Optional.of(columnValue);
} else if ("TABLE_NAME".equals(columnName)) {
schemaTable = Optional.of(columnValue);
}
}
LogicalSchemaScan rewrittenScan = scan.withSchemaIdentifier(schemaCatalog, schemaDatabase, schemaTable);
return filter.withChildren(ImmutableList.of(rewrittenScan));
}).toRule(RuleType.PUSH_FILTER_INTO_SCHEMA_SCAN);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
@ -34,13 +35,44 @@ import java.util.Optional;
*/
public class LogicalSchemaScan extends LogicalCatalogRelation {
private final boolean filterPushed;
private final Optional<String> schemaCatalog;
private final Optional<String> schemaDatabase;
private final Optional<String> schemaTable;
public LogicalSchemaScan(RelationId id, TableIf table, List<String> qualifier) {
super(id, PlanType.LOGICAL_SCHEMA_SCAN, table, qualifier);
this.filterPushed = false;
this.schemaCatalog = Optional.empty();
this.schemaDatabase = Optional.empty();
this.schemaTable = Optional.empty();
}
public LogicalSchemaScan(RelationId id, TableIf table, List<String> qualifier,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties) {
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
boolean filterPushed, Optional<String> schemaCatalog, Optional<String> schemaDatabase,
Optional<String> schemaTable) {
super(id, PlanType.LOGICAL_SCHEMA_SCAN, table, qualifier, groupExpression, logicalProperties);
this.filterPushed = filterPushed;
this.schemaCatalog = schemaCatalog;
this.schemaDatabase = schemaDatabase;
this.schemaTable = schemaTable;
}
public boolean isFilterPushed() {
return filterPushed;
}
public Optional<String> getSchemaCatalog() {
return schemaCatalog;
}
public Optional<String> getSchemaDatabase() {
return schemaDatabase;
}
public Optional<String> getSchemaTable() {
return schemaTable;
}
@Override
@ -56,22 +88,54 @@ public class LogicalSchemaScan extends LogicalCatalogRelation {
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalSchemaScan(relationId, table, qualifier,
groupExpression, Optional.of(getLogicalProperties()));
groupExpression, Optional.of(getLogicalProperties()), filterPushed,
schemaCatalog, schemaDatabase, schemaTable);
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalSchemaScan(relationId, table, qualifier, groupExpression, logicalProperties);
return new LogicalSchemaScan(relationId, table, qualifier, groupExpression, logicalProperties, filterPushed,
schemaCatalog, schemaDatabase, schemaTable);
}
@Override
public LogicalSchemaScan withRelationId(RelationId relationId) {
return new LogicalSchemaScan(relationId, table, qualifier, Optional.empty(), Optional.empty());
return new LogicalSchemaScan(relationId, table, qualifier, Optional.empty(), Optional.empty(), filterPushed,
schemaCatalog, schemaDatabase, schemaTable);
}
public LogicalSchemaScan withSchemaIdentifier(Optional<String> schemaCatalog, Optional<String> schemaDatabase,
Optional<String> schemaTable) {
return new LogicalSchemaScan(relationId, table, qualifier, Optional.empty(),
Optional.of(getLogicalProperties()), true, schemaCatalog, schemaDatabase, schemaTable);
}
@Override
public String toString() {
return Utils.toSqlString("LogicalSchemaScan");
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
LogicalSchemaScan that = (LogicalSchemaScan) o;
return Objects.equals(schemaCatalog, that.schemaCatalog)
&& Objects.equals(schemaDatabase, that.schemaDatabase)
&& Objects.equals(schemaTable, that.schemaTable)
&& filterPushed == that.filterPushed;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), schemaCatalog, schemaDatabase, schemaTable, filterPushed);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
@ -36,16 +37,40 @@ import java.util.Optional;
*/
public class PhysicalSchemaScan extends PhysicalCatalogRelation {
private final Optional<String> schemaCatalog;
private final Optional<String> schemaDatabase;
private final Optional<String> schemaTable;
public PhysicalSchemaScan(RelationId id, TableIf table, List<String> qualifier,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties) {
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
Optional<String> schemaCatalog, Optional<String> schemaDatabase, Optional<String> schemaTable) {
super(id, PlanType.PHYSICAL_SCHEMA_SCAN, table, qualifier, groupExpression, logicalProperties);
this.schemaCatalog = schemaCatalog;
this.schemaDatabase = schemaDatabase;
this.schemaTable = schemaTable;
}
public PhysicalSchemaScan(RelationId id, TableIf table, List<String> qualifier,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics) {
PhysicalProperties physicalProperties, Statistics statistics,
Optional<String> schemaCatalog, Optional<String> schemaDatabase, Optional<String> schemaTable) {
super(id, PlanType.PHYSICAL_SCHEMA_SCAN, table, qualifier, groupExpression,
logicalProperties, physicalProperties, statistics);
this.schemaCatalog = schemaCatalog;
this.schemaDatabase = schemaDatabase;
this.schemaTable = schemaTable;
}
public Optional<String> getSchemaCatalog() {
return schemaCatalog;
}
public Optional<String> getSchemaDatabase() {
return schemaDatabase;
}
public Optional<String> getSchemaTable() {
return schemaTable;
}
@Override
@ -61,21 +86,24 @@ public class PhysicalSchemaScan extends PhysicalCatalogRelation {
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalSchemaScan(relationId, getTable(), qualifier,
groupExpression, getLogicalProperties(), physicalProperties, statistics);
groupExpression, getLogicalProperties(), physicalProperties, statistics,
schemaCatalog, schemaDatabase, schemaTable);
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalSchemaScan(relationId, getTable(), qualifier,
groupExpression, logicalProperties.get(), physicalProperties, statistics);
groupExpression, logicalProperties.get(), physicalProperties, statistics,
schemaCatalog, schemaDatabase, schemaTable);
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
Statistics statistics) {
return new PhysicalSchemaScan(relationId, getTable(), qualifier,
groupExpression, getLogicalProperties(), physicalProperties, statistics);
groupExpression, getLogicalProperties(), physicalProperties, statistics,
schemaCatalog, schemaDatabase, schemaTable);
}
@Override
@ -83,6 +111,28 @@ public class PhysicalSchemaScan extends PhysicalCatalogRelation {
return Utils.toSqlString("PhysicalSchemaScan");
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
PhysicalSchemaScan that = (PhysicalSchemaScan) o;
return Objects.equals(schemaCatalog, that.schemaCatalog)
&& Objects.equals(schemaDatabase, that.schemaDatabase)
&& Objects.equals(schemaTable, that.schemaTable);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), schemaCatalog, schemaDatabase, schemaTable);
}
@Override
public boolean canPushDownRuntimeFilter() {
// currently be doesn't support schema scan rf

View File

@ -90,8 +90,9 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode {
private Map<Long, Long> partitionIDToBackendID;
private Collection<Long> selectedPartitionIds = Lists.newArrayList();
public BackendPartitionedSchemaScanNode(PlanNodeId id, TupleDescriptor desc) {
super(id, desc);
public BackendPartitionedSchemaScanNode(PlanNodeId id, TupleDescriptor desc,
String schemaCatalog, String schemaDatabase, String schemaTable) {
super(id, desc, schemaCatalog, schemaDatabase, schemaTable);
}
@Override

View File

@ -61,9 +61,13 @@ public class SchemaScanNode extends ScanNode {
/**
* Constructs node to scan given data files of table 'tbl'.
*/
public SchemaScanNode(PlanNodeId id, TupleDescriptor desc) {
public SchemaScanNode(PlanNodeId id, TupleDescriptor desc,
String schemaCatalog, String schemaDb, String schemaTable) {
super(id, desc, "SCAN SCHEMA", StatisticalType.SCHEMA_SCAN_NODE);
this.tableName = desc.getTable().getName();
this.schemaCatalog = schemaCatalog;
this.schemaDb = schemaDb;
this.schemaTable = schemaTable;
}
@Override

View File

@ -1939,9 +1939,10 @@ public class SingleNodePlanner {
case SCHEMA:
if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable(
tblRef.getDesc().getTable().getName())) {
scanNode = new BackendPartitionedSchemaScanNode(ctx.getNextNodeId(), tblRef.getDesc());
scanNode = new BackendPartitionedSchemaScanNode(ctx.getNextNodeId(), tblRef.getDesc(),
null, null, null);
} else {
scanNode = new SchemaScanNode(ctx.getNextNodeId(), tblRef.getDesc());
scanNode = new SchemaScanNode(ctx.getNextNodeId(), tblRef.getDesc(), null, null, null);
}
break;
case BROKER:

View File

@ -566,6 +566,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
List<TTableStatus> tablesResult = Lists.newArrayList();
result.setTables(tablesResult);
PatternMatcher matcher = null;
String specifiedTable = null;
if (params.isSetPattern()) {
try {
matcher = PatternMatcher.createMysqlPattern(params.getPattern(),
@ -574,6 +575,9 @@ public class FrontendServiceImpl implements FrontendService.Iface {
throw new TException("Pattern is in bad format " + params.getPattern());
}
}
if (params.isSetTable()) {
specifiedTable = params.getTable();
}
// database privs should be checked in analysis phrase
UserIdentity currentUser;
@ -611,11 +615,14 @@ public class FrontendServiceImpl implements FrontendService.Iface {
table.getName(), PrivPredicate.SHOW)) {
continue;
}
if (matcher != null && !matcher.match(table.getName())) {
continue;
}
if (specifiedTable != null && !specifiedTable.equals(table.getName())) {
continue;
}
table.readLock();
try {
if (matcher != null && !matcher.match(table.getName())) {
continue;
}
long lastCheckTime = table.getLastCheckTime() <= 0 ? 0 : table.getLastCheckTime();
TTableStatus status = new TTableStatus();
status.setName(table.getName());

View File

@ -335,6 +335,7 @@ struct TGetTablesParams {
5: optional Types.TUserIdentity current_user_ident // to replace the user and user ip
6: optional string type
7: optional string catalog
8: optional string table
}
struct TTableStatus {

View File

@ -0,0 +1,39 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !test1 --
internal information_schema_p0 table1
-- !test2 --
internal information_schema_p0 Order2
internal information_schema_p0 order1
internal information_schema_p0 table2
-- !test3 --
-- !test4 --
internal information_schema_p0 Order2
internal information_schema_p0 order1
internal information_schema_p0 table1
internal information_schema_p0 table2
-- !test5 --
internal information_schema_p0 order1
-- !test6 --
-- !test7 --
internal information_schema_p0 Order2
-- !test8 --
internal information_schema_p0 table1
internal information_schema_p0 table2
-- !test9 --
internal information_schema_p0 Order2
internal information_schema_p0 order1
-- !test10 --
internal information_schema_p0 order1
-- !test11 --
internal information_schema_p0 Order2

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_information_schema") {
sql """drop database if exists information_schema_p0"""
sql """create database information_schema_p0"""
sql """use information_schema_p0"""
sql """
CREATE TABLE IF NOT EXISTS table1 (
col1 integer not null
)
DUPLICATE KEY(col1)
DISTRIBUTED BY HASH(col1) BUCKETS 3
PROPERTIES ("replication_num" = "1");
"""
sql """
CREATE TABLE IF NOT EXISTS table2 (
col1 integer not null
)
DUPLICATE KEY(col1)
DISTRIBUTED BY HASH(col1) BUCKETS 3
PROPERTIES ("replication_num" = "1");
"""
sql """
CREATE TABLE IF NOT EXISTS order1 (
col1 integer not null
)
DUPLICATE KEY(col1)
DISTRIBUTED BY HASH(col1) BUCKETS 3
PROPERTIES ("replication_num" = "1");
"""
sql """
CREATE TABLE IF NOT EXISTS Order2 (
col1 integer not null
)
DUPLICATE KEY(col1)
DISTRIBUTED BY HASH(col1) BUCKETS 3
PROPERTIES ("replication_num" = "1");
"""
qt_test1 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME = "table1" order by TABLE_NAME;"""
qt_test2 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME != "table1" order by TABLE_NAME;"""
qt_test3 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME = "table%" order by TABLE_NAME;"""
qt_test4 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME != "table%" order by TABLE_NAME;"""
qt_test5 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME = "order1" order by TABLE_NAME;"""
qt_test6 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME = "order2" order by TABLE_NAME;"""
qt_test7 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME = "Order2" order by TABLE_NAME;"""
qt_test8 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME like "table%" order by TABLE_NAME;"""
qt_test9 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME not like "table%" order by TABLE_NAME;"""
qt_test10 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME like "order%" order by TABLE_NAME;"""
qt_test11 """select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME from information_schema.tables where TABLE_SCHEMA = "information_schema_p0" and TABLE_NAME like "Order%" order by TABLE_NAME;"""
}