From 0435b286fb56a2d61e277ee954e9ddd01d7a0a87 Mon Sep 17 00:00:00 2001 From: morrySnow <101034200+morrySnow@users.noreply.github.com> Date: Tue, 10 Oct 2023 18:20:19 +0800 Subject: [PATCH] [feature](Nereids) support metadata tvf and fix bugs in group_commit() (#25224) metadata tvf list: - backends - catalogs - frontends - frontends_disks - group_commit - iceberg_meta - workload_groups fix group_commit bugs - throw NPE when properties do not contain 'table_id' - throw NPE when table_id's table do not exist - throw class Cast failed when table_id's table's type is not OLAP --- .../catalog/BuiltinTableValuedFunctions.java | 18 +++++- .../expressions/functions/table/Backends.java | 56 +++++++++++++++++++ .../expressions/functions/table/Catalogs.java | 56 +++++++++++++++++++ .../functions/table/Frontends.java | 56 +++++++++++++++++++ .../functions/table/FrontendsDisks.java | 56 +++++++++++++++++++ .../functions/table/GroupCommit.java | 56 +++++++++++++++++++ .../functions/table/IcebergMeta.java | 56 +++++++++++++++++++ .../expressions/functions/table/Numbers.java | 2 +- .../functions/table/WorkloadGroups.java | 56 +++++++++++++++++++ .../visitor/TableValuedFunctionVisitor.java | 39 ++++++++++++- .../planner/external/MetadataScanNode.java | 32 +++-------- .../GroupCommitTableValuedFunction.java | 24 +++++--- 12 files changed, 472 insertions(+), 35 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Backends.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Catalogs.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Frontends.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/FrontendsDisks.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/GroupCommit.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/WorkloadGroups.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index edf5ce47fa..8f227bc77c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -17,10 +17,17 @@ package org.apache.doris.catalog; +import org.apache.doris.nereids.trees.expressions.functions.table.Backends; +import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; +import org.apache.doris.nereids.trees.expressions.functions.table.Frontends; +import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks; +import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit; import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs; +import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta; import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; import org.apache.doris.nereids.trees.expressions.functions.table.S3; +import org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups; import com.google.common.collect.ImmutableList; @@ -32,10 +39,17 @@ import com.google.common.collect.ImmutableList; */ public class BuiltinTableValuedFunctions implements FunctionHelper { public final ImmutableList tableValuedFunctions = ImmutableList.of( - tableValued(Numbers.class, "numbers"), + tableValued(Backends.class, "backends"), + tableValued(Catalogs.class, "catalogs"), + tableValued(Frontends.class, "frontends"), + tableValued(FrontendsDisks.class, "frontends_disks"), + tableValued(GroupCommit.class, "group_commit"), + tableValued(Local.class, "local"), + tableValued(IcebergMeta.class, "iceberg_meta"), tableValued(Hdfs.class, "hdfs"), + tableValued(Numbers.class, "numbers"), tableValued(S3.class, "s3"), - tableValued(Local.class, "local") + tableValued(WorkloadGroups.class, "workload_groups") ); public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Backends.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Backends.java new file mode 100644 index 0000000000..14784a1df0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Backends.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.BackendsTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** backends */ +public class Backends extends TableValuedFunction { + public Backends(Properties properties) { + super("backends", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new BackendsTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build BackendsTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitBackends(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Catalogs.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Catalogs.java new file mode 100644 index 0000000000..cfe2dcf64c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Catalogs.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.CatalogsTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** catalogs */ +public class Catalogs extends TableValuedFunction { + public Catalogs(Properties properties) { + super("catalogs", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new CatalogsTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build CatalogsTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitCatalogs(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Frontends.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Frontends.java new file mode 100644 index 0000000000..0af943b92d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Frontends.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.FrontendsTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** frontends */ +public class Frontends extends TableValuedFunction { + public Frontends(Properties properties) { + super("frontends", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new FrontendsTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build FrontendsTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitFrontends(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/FrontendsDisks.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/FrontendsDisks.java new file mode 100644 index 0000000000..c81981d60a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/FrontendsDisks.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.FrontendsDisksTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** frontends_disks */ +public class FrontendsDisks extends TableValuedFunction { + public FrontendsDisks(Properties properties) { + super("frontends_disks", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new FrontendsDisksTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build FrontendsDisksTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitFrontendsDisks(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/GroupCommit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/GroupCommit.java new file mode 100644 index 0000000000..88096adee0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/GroupCommit.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.GroupCommitTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** group_commit */ +public class GroupCommit extends TableValuedFunction { + public GroupCommit(Properties properties) { + super("group_commit", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new GroupCommitTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build GroupCommitTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitGroupCommit(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java new file mode 100644 index 0000000000..fe7654b730 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.IcebergTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** iceberg_meta */ +public class IcebergMeta extends TableValuedFunction { + public IcebergMeta(Properties properties) { + super("iceberg_meta", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new IcebergTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build IcebergTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitIcebergMeta(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java index c5febcf974..b7c28b9eff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java @@ -48,7 +48,7 @@ public class Numbers extends TableValuedFunction { @Override public FunctionSignature customSignature() { - return FunctionSignature.of(BigIntType.INSTANCE, (List) getArgumentsTypes()); + return FunctionSignature.of(BigIntType.INSTANCE, getArgumentsTypes()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/WorkloadGroups.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/WorkloadGroups.java new file mode 100644 index 0000000000..84aa4de697 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/WorkloadGroups.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.TableValuedFunctionIf; +import org.apache.doris.tablefunction.WorkloadGroupsTableValuedFunction; + +import java.util.Map; + +/** workload_groups */ +public class WorkloadGroups extends TableValuedFunction { + public WorkloadGroups(Properties properties) { + super("workload_groups", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new WorkloadGroupsTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build WorkloadGroupsTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitWorkloadGroups(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index cbf980282a..ab3f30a094 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -17,29 +17,64 @@ package org.apache.doris.nereids.trees.expressions.visitor; +import org.apache.doris.nereids.trees.expressions.functions.table.Backends; +import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; +import org.apache.doris.nereids.trees.expressions.functions.table.Frontends; +import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks; +import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit; import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs; +import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta; import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; +import org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups; /** TableValuedFunctionVisitor */ public interface TableValuedFunctionVisitor { R visitTableValuedFunction(TableValuedFunction tableValuedFunction, C context); - default R visitNumbers(Numbers numbers, C context) { - return visitTableValuedFunction(numbers, context); + default R visitBackends(Backends backends, C context) { + return visitTableValuedFunction(backends, context); + } + + default R visitCatalogs(Catalogs catalogs, C context) { + return visitTableValuedFunction(catalogs, context); + } + + default R visitFrontends(Frontends frontends, C context) { + return visitTableValuedFunction(frontends, context); + } + + default R visitFrontendsDisks(FrontendsDisks frontendsDisks, C context) { + return visitTableValuedFunction(frontendsDisks, context); + } + + default R visitGroupCommit(GroupCommit groupCommit, C context) { + return visitTableValuedFunction(groupCommit, context); } default R visitHdfs(Hdfs hdfs, C context) { return visitTableValuedFunction(hdfs, context); } + default R visitIcebergMeta(IcebergMeta icebergMeta, C context) { + return visitTableValuedFunction(icebergMeta, context); + } + default R visitLocal(Local local, C context) { return visitTableValuedFunction(local, context); } + default R visitNumbers(Numbers numbers, C context) { + return visitTableValuedFunction(numbers, context); + } + default R visitS3(S3 s3, C context) { return visitTableValuedFunction(s3, context); } + + default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) { + return visitTableValuedFunction(workloadGroups, context); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java index 9952d9d783..abfd5ac063 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java @@ -40,15 +40,22 @@ import java.util.List; public class MetadataScanNode extends ExternalScanNode { - private MetadataTableValuedFunction tvf; + private final MetadataTableValuedFunction tvf; - private List scanRangeLocations = Lists.newArrayList(); + private final List scanRangeLocations = Lists.newArrayList(); public MetadataScanNode(PlanNodeId id, TupleDescriptor desc, MetadataTableValuedFunction tvf) { super(id, desc, "METADATA_SCAN_NODE", StatisticalType.METADATA_SCAN_NODE, false); this.tvf = tvf; } + // for Nereids + @Override + public void init() throws UserException { + super.init(); + createScanRangeLocations(); + } + @Override protected void toThrift(TPlanNode planNode) { planNode.setNodeType(TPlanNodeType.META_SCAN_NODE); @@ -91,25 +98,4 @@ public class MetadataScanNode extends ExternalScanNode { public boolean needToCheckColumnPriv() { return false; } - - private void buildScanRanges() { - // todo: split - TScanRangeLocations locations = createMetaDataTvfLocations(); - scanRangeLocations.add(locations); - } - - private TScanRangeLocations createMetaDataTvfLocations() { - TScanRange scanRange = new TScanRange(); - scanRange.setMetaScanRange(tvf.getMetaScanRange()); - // set location - TScanRangeLocation location = new TScanRangeLocation(); - Backend backend = backendPolicy.getNextBe(); - location.setBackendId(backend.getId()); - location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); - - TScanRangeLocations result = new TScanRangeLocations(); - result.addToLocations(location); - result.setScanRange(scanRange); - return result; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java index 3f7902d736..3abcbee06e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java @@ -30,9 +30,6 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.thrift.TFileType; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -42,12 +39,18 @@ import java.util.Map; * group_commit(). */ public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunction { - private static final Logger LOG = LogManager.getLogger(GroupCommitTableValuedFunction.class); + public static final String NAME = "group_commit"; - private long tableId = -1; + + private static final String TABLE_ID_KEY = "table_id"; + + private final long tableId; public GroupCommitTableValuedFunction(Map params) throws AnalysisException { - tableId = Long.parseLong(params.get("table_id")); + if (params == null || !params.containsKey(TABLE_ID_KEY)) { + throw new AnalysisException(NAME + " should contains property " + TABLE_ID_KEY); + } + tableId = Long.parseLong(params.get(TABLE_ID_KEY)); } // =========== implement abstract methods of ExternalFileTableValuedFunction ================= @@ -56,11 +59,18 @@ public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunct public List getTableColumns() throws AnalysisException { List fileColumns = new ArrayList<>(); Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId); + if (table == null) { + throw new AnalysisException("table with table_id " + tableId + " is not exists"); + } + if (!(table instanceof OlapTable)) { + throw new AnalysisException("Only support OLAP table, but table type of table_id " + + tableId + " is " + table.getType()); + } + Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn(); List tableColumns = table.getBaseSchema(false); for (int i = 1; i <= tableColumns.size(); i++) { fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getDataType(), true)); } - Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn(); if (deleteSignColumn != null) { fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getDataType(), true)); }