[feature](nereids) support bind external relation out of Doris fe environment (#21123)
support bind external relation out of Doris fe environment, for example, analyze sql in other java application. see BindRelationTest.bindExternalRelation.
This commit is contained in:
@ -41,6 +41,7 @@ import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleFactory;
|
||||
import org.apache.doris.nereids.rules.RuleSet;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.rules.analysis.BindRelation.CustomTableResolver;
|
||||
import org.apache.doris.nereids.trees.expressions.CTEId;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
|
||||
@ -180,6 +181,14 @@ public class CascadesContext implements ScheduleContext {
|
||||
return new Analyzer(this);
|
||||
}
|
||||
|
||||
public Analyzer newAnalyzer(Optional<CustomTableResolver> customTableResolver) {
|
||||
return new Analyzer(this, customTableResolver);
|
||||
}
|
||||
|
||||
public Analyzer newCustomAnalyzer(Optional<CustomTableResolver> customTableResolver) {
|
||||
return new Analyzer(this, customTableResolver);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushJob(Job job) {
|
||||
jobPool.push(job);
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.nereids.rules.analysis.AdjustAggregateNullableForEmptySe
|
||||
import org.apache.doris.nereids.rules.analysis.BindExpression;
|
||||
import org.apache.doris.nereids.rules.analysis.BindInsertTargetTable;
|
||||
import org.apache.doris.nereids.rules.analysis.BindRelation;
|
||||
import org.apache.doris.nereids.rules.analysis.BindRelation.CustomTableResolver;
|
||||
import org.apache.doris.nereids.rules.analysis.CheckAnalysis;
|
||||
import org.apache.doris.nereids.rules.analysis.CheckBound;
|
||||
import org.apache.doris.nereids.rules.analysis.CheckPolicy;
|
||||
@ -37,6 +38,8 @@ import org.apache.doris.nereids.rules.analysis.SubqueryToApply;
|
||||
import org.apache.doris.nereids.rules.analysis.UserAuthentication;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Bind symbols according to metadata in the catalog, perform semantic analysis, etc.
|
||||
@ -44,12 +47,45 @@ import java.util.List;
|
||||
*/
|
||||
public class Analyzer extends AbstractBatchJobExecutor {
|
||||
|
||||
public static final List<RewriteJob> ANALYZE_JOBS = jobs(
|
||||
public static final List<RewriteJob> DEFAULT_ANALYZE_JOBS = buildAnalyzeJobs(Optional.empty());
|
||||
|
||||
private Optional<CustomTableResolver> customTableResolver;
|
||||
|
||||
private List<RewriteJob> jobs;
|
||||
|
||||
/**
|
||||
* Execute the analysis job with scope.
|
||||
* @param cascadesContext planner context for execute job
|
||||
*/
|
||||
public Analyzer(CascadesContext cascadesContext) {
|
||||
this(cascadesContext, Optional.empty());
|
||||
}
|
||||
|
||||
public Analyzer(CascadesContext cascadesContext, Optional<CustomTableResolver> customTableResolver) {
|
||||
super(cascadesContext);
|
||||
this.customTableResolver = Objects.requireNonNull(customTableResolver, "customTableResolver cannot be null");
|
||||
this.jobs = !customTableResolver.isPresent() ? DEFAULT_ANALYZE_JOBS : buildAnalyzeJobs(customTableResolver);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RewriteJob> getJobs() {
|
||||
return jobs;
|
||||
}
|
||||
|
||||
/**
|
||||
* nereids analyze sql.
|
||||
*/
|
||||
public void analyze() {
|
||||
execute();
|
||||
}
|
||||
|
||||
private static List<RewriteJob> buildAnalyzeJobs(Optional<CustomTableResolver> customTableResolver) {
|
||||
return jobs(
|
||||
topDown(
|
||||
new RegisterCTE()
|
||||
),
|
||||
bottomUp(
|
||||
new BindRelation(),
|
||||
new BindRelation(customTableResolver.orElse(null)),
|
||||
new CheckPolicy(),
|
||||
new UserAuthentication(),
|
||||
new BindExpression()
|
||||
@ -81,25 +117,6 @@ public class Analyzer extends AbstractBatchJobExecutor {
|
||||
bottomUp(new SubqueryToApply()),
|
||||
bottomUp(new AdjustAggregateNullableForEmptySet()),
|
||||
bottomUp(new CheckAnalysis())
|
||||
);
|
||||
|
||||
/**
|
||||
* Execute the analysis job with scope.
|
||||
* @param cascadesContext planner context for execute job
|
||||
*/
|
||||
public Analyzer(CascadesContext cascadesContext) {
|
||||
super(cascadesContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RewriteJob> getJobs() {
|
||||
return ANALYZE_JOBS;
|
||||
}
|
||||
|
||||
/**
|
||||
* nereids analyze sql.
|
||||
*/
|
||||
public void analyze() {
|
||||
execute();
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,12 +61,24 @@ import org.apache.commons.collections.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Rule to bind relations in query plan.
|
||||
*/
|
||||
public class BindRelation extends OneAnalysisRuleFactory {
|
||||
|
||||
private CustomTableResolver customTableResolver;
|
||||
|
||||
public BindRelation() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
public BindRelation(@Nullable CustomTableResolver customTableResolver) {
|
||||
this.customTableResolver = customTableResolver;
|
||||
}
|
||||
|
||||
// TODO: cte will be copied to a sub-query with different names but the id of the unbound relation in them
|
||||
// are the same, so we use new relation id when binding relation, and will fix this bug later.
|
||||
@Override
|
||||
@ -123,6 +135,9 @@ public class BindRelation extends OneAnalysisRuleFactory {
|
||||
if (cascadesContext.getTables() != null) {
|
||||
table = cascadesContext.getTableByName(tableName);
|
||||
}
|
||||
if (customTableResolver != null) {
|
||||
table = customTableResolver.apply(tableQualifier);
|
||||
}
|
||||
if (table == null) {
|
||||
// In some cases even if we have already called the "cascadesContext.getTableByName",
|
||||
// it also gets the null. So, we just check it in the catalog again for safety.
|
||||
@ -136,7 +151,13 @@ public class BindRelation extends OneAnalysisRuleFactory {
|
||||
private LogicalPlan bind(CascadesContext cascadesContext, UnboundRelation unboundRelation) {
|
||||
List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
|
||||
unboundRelation.getNameParts());
|
||||
TableIf table = RelationUtil.getTable(tableQualifier, cascadesContext.getConnectContext().getEnv());
|
||||
TableIf table = null;
|
||||
if (customTableResolver != null) {
|
||||
table = customTableResolver.apply(tableQualifier);
|
||||
}
|
||||
if (table == null) {
|
||||
table = RelationUtil.getTable(tableQualifier, cascadesContext.getConnectContext().getEnv());
|
||||
}
|
||||
return getLogicalPlan(table, unboundRelation, tableQualifier, cascadesContext);
|
||||
}
|
||||
|
||||
@ -225,4 +246,7 @@ public class BindRelation extends OneAnalysisRuleFactory {
|
||||
return part.getId();
|
||||
}).collect(ImmutableList.toImmutableList());
|
||||
}
|
||||
|
||||
/** CustomTableResolver */
|
||||
public interface CustomTableResolver extends Function<List<String>, TableIf> {}
|
||||
}
|
||||
|
||||
@ -17,9 +17,19 @@
|
||||
|
||||
package org.apache.doris.nereids.rules.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.RandomDistributionInfo;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.nereids.analyzer.UnboundRelation;
|
||||
import org.apache.doris.nereids.pattern.GeneratedPlanPatterns;
|
||||
import org.apache.doris.nereids.rules.RulePromise;
|
||||
import org.apache.doris.nereids.rules.analysis.BindRelation.CustomTableResolver;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
|
||||
import org.apache.doris.nereids.util.PlanChecker;
|
||||
import org.apache.doris.nereids.util.PlanRewriter;
|
||||
import org.apache.doris.nereids.util.RelationUtil;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
@ -28,7 +38,10 @@ import com.google.common.collect.ImmutableList;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class BindRelationTest extends TestWithFeService {
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
class BindRelationTest extends TestWithFeService implements GeneratedPlanPatterns {
|
||||
private static final String DB1 = "db1";
|
||||
private static final String DB2 = "db2";
|
||||
|
||||
@ -66,4 +79,54 @@ class BindRelationTest extends TestWithFeService {
|
||||
ImmutableList.of(DEFAULT_CLUSTER_PREFIX + DB1, "t"),
|
||||
((LogicalOlapScan) plan).qualified());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bindExternalRelation() {
|
||||
connectContext.setDatabase(DEFAULT_CLUSTER_PREFIX + DB1);
|
||||
String tableName = "external_table";
|
||||
|
||||
List<Column> externalTableColumns = ImmutableList.of(
|
||||
new Column("id", Type.INT),
|
||||
new Column("name", Type.VARCHAR)
|
||||
);
|
||||
|
||||
OlapTable externalOlapTable = new OlapTable(1, tableName, externalTableColumns, KeysType.DUP_KEYS,
|
||||
new PartitionInfo(), new RandomDistributionInfo(10)) {
|
||||
@Override
|
||||
public List<Column> getBaseSchema() {
|
||||
return externalTableColumns;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasDeleteSign() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
CustomTableResolver customTableResolver = qualifiedTable -> {
|
||||
if (qualifiedTable.get(2).equals(tableName)) {
|
||||
return externalOlapTable;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
PlanChecker.from(connectContext)
|
||||
.parse("select * from " + tableName + " as et join db1.t on et.id = t.a")
|
||||
.customAnalyzer(Optional.of(customTableResolver)) // analyze internal relation
|
||||
.rewrite()
|
||||
.matchesFromRoot(
|
||||
logicalProject(
|
||||
logicalJoin(
|
||||
logicalOlapScan().when(r -> r.getTable() == externalOlapTable),
|
||||
logicalOlapScan().when(r -> r.getTable().getName().equals("t"))
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RulePromise defaultPromise() {
|
||||
return RulePromise.REWRITE;
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,6 +47,7 @@ import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleFactory;
|
||||
import org.apache.doris.nereids.rules.RuleSet;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.rules.analysis.BindRelation.CustomTableResolver;
|
||||
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.plans.GroupPlan;
|
||||
@ -68,6 +69,7 @@ import org.junit.jupiter.api.Assertions;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
@ -110,8 +112,13 @@ public class PlanChecker {
|
||||
return this;
|
||||
}
|
||||
|
||||
public PlanChecker analyze(String sql) {
|
||||
public PlanChecker parse(String sql) {
|
||||
this.cascadesContext = MemoTestUtils.createCascadesContext(connectContext, sql);
|
||||
this.cascadesContext.toMemo();
|
||||
return this;
|
||||
}
|
||||
|
||||
public PlanChecker analyze() {
|
||||
this.cascadesContext.newAnalyzer().analyze();
|
||||
this.cascadesContext.toMemo();
|
||||
return this;
|
||||
@ -125,6 +132,24 @@ public class PlanChecker {
|
||||
return this;
|
||||
}
|
||||
|
||||
public PlanChecker analyze(String sql) {
|
||||
this.cascadesContext = MemoTestUtils.createCascadesContext(connectContext, sql);
|
||||
this.cascadesContext.newAnalyzer().analyze();
|
||||
this.cascadesContext.toMemo();
|
||||
return this;
|
||||
}
|
||||
|
||||
public PlanChecker customAnalyzer(Optional<CustomTableResolver> customTableResolver) {
|
||||
this.cascadesContext.newAnalyzer(customTableResolver).analyze();
|
||||
this.cascadesContext.toMemo();
|
||||
return this;
|
||||
}
|
||||
|
||||
public PlanChecker setRewritePlanFromMemo() {
|
||||
this.cascadesContext.setRewritePlan(this.cascadesContext.getMemo().copyOut());
|
||||
return this;
|
||||
}
|
||||
|
||||
public PlanChecker customRewrite(CustomRewriter customRewriter) {
|
||||
new CustomRewriteJob(() -> customRewriter, RuleType.TEST_REWRITE).execute(cascadesContext.getCurrentJobContext());
|
||||
cascadesContext.toMemo();
|
||||
|
||||
Reference in New Issue
Block a user