[feature](Nereids) support metadata tvf and fix bugs in group_commit() (#25224)

metadata tvf list:
- backends
- catalogs
- frontends
- frontends_disks
- group_commit
- iceberg_meta
- workload_groups

fix group_commit bugs
- throw NPE when properties do not contain 'table_id'
- throw NPE when table_id's table do not exist
- throw class Cast failed when table_id's table's type is not OLAP
This commit is contained in:
morrySnow
2023-10-10 18:20:19 +08:00
committed by GitHub
parent 691889419f
commit 0435b286fb
12 changed files with 472 additions and 35 deletions

View File

@ -17,10 +17,17 @@
package org.apache.doris.catalog;
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks;
import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit;
import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups;
import com.google.common.collect.ImmutableList;
@ -32,10 +39,17 @@ import com.google.common.collect.ImmutableList;
*/
public class BuiltinTableValuedFunctions implements FunctionHelper {
public final ImmutableList<TableValuedFunc> tableValuedFunctions = ImmutableList.of(
tableValued(Numbers.class, "numbers"),
tableValued(Backends.class, "backends"),
tableValued(Catalogs.class, "catalogs"),
tableValued(Frontends.class, "frontends"),
tableValued(FrontendsDisks.class, "frontends_disks"),
tableValued(GroupCommit.class, "group_commit"),
tableValued(Local.class, "local"),
tableValued(IcebergMeta.class, "iceberg_meta"),
tableValued(Hdfs.class, "hdfs"),
tableValued(Numbers.class, "numbers"),
tableValued(S3.class, "s3"),
tableValued(Local.class, "local")
tableValued(WorkloadGroups.class, "workload_groups")
);
public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions();

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.BackendsTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
/** backends */
public class Backends extends TableValuedFunction {
public Backends(Properties properties) {
super("backends", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new BackendsTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build BackendsTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitBackends(this, context);
}
}

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.CatalogsTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
/** catalogs */
public class Catalogs extends TableValuedFunction {
public Catalogs(Properties properties) {
super("catalogs", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new CatalogsTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build CatalogsTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitCatalogs(this, context);
}
}

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.FrontendsTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
/** frontends */
public class Frontends extends TableValuedFunction {
public Frontends(Properties properties) {
super("frontends", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new FrontendsTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build FrontendsTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitFrontends(this, context);
}
}

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.FrontendsDisksTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
/** frontends_disks */
public class FrontendsDisks extends TableValuedFunction {
public FrontendsDisks(Properties properties) {
super("frontends_disks", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new FrontendsDisksTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build FrontendsDisksTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitFrontendsDisks(this, context);
}
}

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.GroupCommitTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
/** group_commit */
public class GroupCommit extends TableValuedFunction {
public GroupCommit(Properties properties) {
super("group_commit", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new GroupCommitTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build GroupCommitTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitGroupCommit(this, context);
}
}

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.IcebergTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
/** iceberg_meta */
public class IcebergMeta extends TableValuedFunction {
public IcebergMeta(Properties properties) {
super("iceberg_meta", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new IcebergTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build IcebergTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitIcebergMeta(this, context);
}
}

View File

@ -48,7 +48,7 @@ public class Numbers extends TableValuedFunction {
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(BigIntType.INSTANCE, (List) getArgumentsTypes());
return FunctionSignature.of(BigIntType.INSTANCE, getArgumentsTypes());
}
@Override

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.tablefunction.WorkloadGroupsTableValuedFunction;
import java.util.Map;
/** workload_groups */
public class WorkloadGroups extends TableValuedFunction {
public WorkloadGroups(Properties properties) {
super("workload_groups", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new WorkloadGroupsTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build WorkloadGroupsTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitWorkloadGroups(this, context);
}
}

View File

@ -17,29 +17,64 @@
package org.apache.doris.nereids.trees.expressions.visitor;
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks;
import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit;
import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups;
/** TableValuedFunctionVisitor */
public interface TableValuedFunctionVisitor<R, C> {
R visitTableValuedFunction(TableValuedFunction tableValuedFunction, C context);
default R visitNumbers(Numbers numbers, C context) {
return visitTableValuedFunction(numbers, context);
default R visitBackends(Backends backends, C context) {
return visitTableValuedFunction(backends, context);
}
default R visitCatalogs(Catalogs catalogs, C context) {
return visitTableValuedFunction(catalogs, context);
}
default R visitFrontends(Frontends frontends, C context) {
return visitTableValuedFunction(frontends, context);
}
default R visitFrontendsDisks(FrontendsDisks frontendsDisks, C context) {
return visitTableValuedFunction(frontendsDisks, context);
}
default R visitGroupCommit(GroupCommit groupCommit, C context) {
return visitTableValuedFunction(groupCommit, context);
}
default R visitHdfs(Hdfs hdfs, C context) {
return visitTableValuedFunction(hdfs, context);
}
default R visitIcebergMeta(IcebergMeta icebergMeta, C context) {
return visitTableValuedFunction(icebergMeta, context);
}
default R visitLocal(Local local, C context) {
return visitTableValuedFunction(local, context);
}
default R visitNumbers(Numbers numbers, C context) {
return visitTableValuedFunction(numbers, context);
}
default R visitS3(S3 s3, C context) {
return visitTableValuedFunction(s3, context);
}
default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) {
return visitTableValuedFunction(workloadGroups, context);
}
}

View File

@ -40,15 +40,22 @@ import java.util.List;
public class MetadataScanNode extends ExternalScanNode {
private MetadataTableValuedFunction tvf;
private final MetadataTableValuedFunction tvf;
private List<TScanRangeLocations> scanRangeLocations = Lists.newArrayList();
private final List<TScanRangeLocations> scanRangeLocations = Lists.newArrayList();
public MetadataScanNode(PlanNodeId id, TupleDescriptor desc, MetadataTableValuedFunction tvf) {
super(id, desc, "METADATA_SCAN_NODE", StatisticalType.METADATA_SCAN_NODE, false);
this.tvf = tvf;
}
// for Nereids
@Override
public void init() throws UserException {
super.init();
createScanRangeLocations();
}
@Override
protected void toThrift(TPlanNode planNode) {
planNode.setNodeType(TPlanNodeType.META_SCAN_NODE);
@ -91,25 +98,4 @@ public class MetadataScanNode extends ExternalScanNode {
public boolean needToCheckColumnPriv() {
return false;
}
private void buildScanRanges() {
// todo: split
TScanRangeLocations locations = createMetaDataTvfLocations();
scanRangeLocations.add(locations);
}
private TScanRangeLocations createMetaDataTvfLocations() {
TScanRange scanRange = new TScanRange();
scanRange.setMetaScanRange(tvf.getMetaScanRange());
// set location
TScanRangeLocation location = new TScanRangeLocation();
Backend backend = backendPolicy.getNextBe();
location.setBackendId(backend.getId());
location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
TScanRangeLocations result = new TScanRangeLocations();
result.addToLocations(location);
result.setScanRange(scanRange);
return result;
}
}

View File

@ -30,9 +30,6 @@ import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.thrift.TFileType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -42,12 +39,18 @@ import java.util.Map;
* group_commit().
*/
public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunction {
private static final Logger LOG = LogManager.getLogger(GroupCommitTableValuedFunction.class);
public static final String NAME = "group_commit";
private long tableId = -1;
private static final String TABLE_ID_KEY = "table_id";
private final long tableId;
public GroupCommitTableValuedFunction(Map<String, String> params) throws AnalysisException {
tableId = Long.parseLong(params.get("table_id"));
if (params == null || !params.containsKey(TABLE_ID_KEY)) {
throw new AnalysisException(NAME + " should contains property " + TABLE_ID_KEY);
}
tableId = Long.parseLong(params.get(TABLE_ID_KEY));
}
// =========== implement abstract methods of ExternalFileTableValuedFunction =================
@ -56,11 +59,18 @@ public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunct
public List<Column> getTableColumns() throws AnalysisException {
List<Column> fileColumns = new ArrayList<>();
Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId);
if (table == null) {
throw new AnalysisException("table with table_id " + tableId + " is not exists");
}
if (!(table instanceof OlapTable)) {
throw new AnalysisException("Only support OLAP table, but table type of table_id "
+ tableId + " is " + table.getType());
}
Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn();
List<Column> tableColumns = table.getBaseSchema(false);
for (int i = 1; i <= tableColumns.size(); i++) {
fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getDataType(), true));
}
Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn();
if (deleteSignColumn != null) {
fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getDataType(), true));
}