[feature] add table valued function framework and numbers table valued function (#10214)

This commit is contained in:
Tiewei Fang
2022-06-28 14:01:57 +08:00
committed by GitHub
parent 2f30c7cf03
commit 17eb8c00d3
28 changed files with 1924 additions and 12 deletions

View File

@ -404,6 +404,7 @@ nonterminal WithClause opt_with_clause;
nonterminal ArrayList<View> with_view_def_list;
nonterminal View with_view_def;
nonterminal Subquery subquery;
nonterminal TableValuedFunctionRef table_valued_function_ref;
nonterminal InlineViewRef inline_view_ref;
nonterminal JoinOperator join_operator;
nonterminal ArrayList<String> opt_plan_hints;
@ -4271,6 +4272,17 @@ table_ref ::=
s.setLateralViewRefs(lateralViewRefList);
RESULT = s;
:}
| table_valued_function_ref:f
{:
RESULT = f;
:}
;
table_valued_function_ref ::=
ident:func_name LPAREN string_list:param_list RPAREN opt_table_alias:alias
{:
RESULT = new TableValuedFunctionRef(func_name, alias, param_list);
:}
;
inline_view_ref ::=

View File

@ -299,6 +299,10 @@ public class SelectStmt extends QueryStmt {
// Inline view reference
QueryStmt inlineStmt = ((InlineViewRef) tblRef).getViewStmt();
inlineStmt.getTables(analyzer, tableMap, parentViewNameSet);
} else if (tblRef instanceof TableValuedFunctionRef) {
TableValuedFunctionRef tblFuncRef = (TableValuedFunctionRef) tblRef;
tableMap.put(tblFuncRef.getTableFunction().getTable().getId(),
tblFuncRef.getTableFunction().getTable());
} else {
String dbName = tblRef.getName().getDb();
String tableName = tblRef.getName().getTbl();

View File

@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.UserException;
import org.apache.doris.tablefunction.TableValuedFunctionInf;
import java.util.List;
public class TableValuedFunctionRef extends TableRef {
private Table table;
private TableValuedFunctionInf tableFunction;
public TableValuedFunctionRef(String funcName, String alias, List<String> params) throws UserException {
super(new TableName(null, "_table_valued_function_" + funcName), alias);
this.tableFunction = TableValuedFunctionInf.getTableFunction(funcName, params);
if (hasExplicitAlias()) {
return;
}
aliases = new String[] { "_table_valued_function_" + funcName };
}
public TableValuedFunctionRef(TableValuedFunctionRef other) {
super(other);
this.tableFunction = other.tableFunction;
}
@Override
public TableRef clone() {
return new TableValuedFunctionRef(this);
}
@Override
public TupleDescriptor createTupleDescriptor(Analyzer analyzer) {
TupleDescriptor result = analyzer.getDescTbl().createTupleDescriptor();
result.setTable(table);
return result;
}
/**
* Register this table ref and then analyze the Join clause.
*/
@Override
public void analyze(Analyzer analyzer) throws UserException {
if (isAnalyzed) {
return;
}
// Table function could generate a table which will has columns
// Maybe will call be during this process
this.table = tableFunction.getTable();
desc = analyzer.registerTableRef(this);
isAnalyzed = true; // true that we have assigned desc
analyzeJoin(analyzer);
}
public TableValuedFunctionInf getTableFunction() {
return tableFunction;
}
}

View File

@ -96,7 +96,7 @@ public interface TableIf {
* Doris table type.
*/
public enum TableType {
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI;
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, TABLE_VALUED_FUNCTION;
public String toEngineName() {
switch (this) {
@ -120,6 +120,8 @@ public interface TableIf {
return "Hive";
case HUDI:
return "Hudi";
case TABLE_VALUED_FUNCTION:
return "Table_Valued_Function";
default:
return null;
}
@ -140,6 +142,7 @@ public interface TableIf {
case ELASTICSEARCH:
case HIVE:
case HUDI:
case TABLE_VALUED_FUNCTION:
return "EXTERNAL TABLE";
default:
return null;

View File

@ -236,7 +236,6 @@ public class DistributedPlanner {
// move 'result' to end, it depends on all of its children
fragments.remove(result);
fragments.add(result);
if (!isPartitioned && result.isPartitioned() && result.getPlanRoot().getNumInstances() > 1) {
result = createMergeFragment(result);
fragments.add(result);
@ -276,6 +275,8 @@ public class DistributedPlanner {
return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.UNPARTITIONED);
} else if (node instanceof SchemaScanNode) {
return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.UNPARTITIONED);
} else if (node instanceof TableValuedFunctionScanNode) {
return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM);
} else if (node instanceof OlapScanNode) {
// olap scan node
OlapScanNode olapScanNode = (OlapScanNode) node;

View File

@ -47,6 +47,7 @@ import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TableValuedFunctionRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.analysis.TupleIsNullPredicate;
@ -1721,6 +1722,10 @@ public class SingleNodePlanner {
scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HudiScanNode",
null, -1);
break;
case TABLE_VALUED_FUNCTION:
scanNode = new TableValuedFunctionScanNode(ctx.getNextNodeId(), tblRef.getDesc(),
"TableValuedFunctionScanNode", ((TableValuedFunctionRef) tblRef).getTableFunction());
break;
default:
break;
}
@ -1892,7 +1897,7 @@ public class SingleNodePlanner {
private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef, SelectStmt selectStmt)
throws UserException {
PlanNode scanNode = null;
if (tblRef instanceof BaseTableRef) {
if (tblRef instanceof BaseTableRef || tblRef instanceof TableValuedFunctionRef) {
scanNode = createScanNode(analyzer, tblRef, selectStmt);
}
if (tblRef instanceof InlineViewRef) {
@ -2176,8 +2181,8 @@ public class SingleNodePlanner {
* @param analyzer
*/
private void materializeTableResultForCrossJoinOrCountStar(TableRef tblRef, Analyzer analyzer) {
if (tblRef instanceof BaseTableRef) {
materializeSlotForEmptyMaterializedTableRef((BaseTableRef) tblRef, analyzer);
if (tblRef instanceof BaseTableRef || tblRef instanceof TableValuedFunctionRef) {
materializeSlotForEmptyMaterializedTableRef(tblRef, analyzer);
} else if (tblRef instanceof InlineViewRef) {
materializeInlineViewResultExprForCrossJoinOrCountStar((InlineViewRef) tblRef, analyzer);
} else {
@ -2203,7 +2208,7 @@ public class SingleNodePlanner {
* @param tblRef
* @param analyzer
*/
private void materializeSlotForEmptyMaterializedTableRef(BaseTableRef tblRef, Analyzer analyzer) {
private void materializeSlotForEmptyMaterializedTableRef(TableRef tblRef, Analyzer analyzer) {
if (tblRef.getDesc().getMaterializedSlots().isEmpty()) {
Column minimuColumn = null;
for (Column col : tblRef.getTable().getBaseSchema()) {

View File

@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.tablefunction.TableValuedFunctionInf;
import org.apache.doris.tablefunction.TableValuedFunctionTask;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableValuedFunctionScanNode;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* This scan node is used for table valued function.
*/
public class TableValuedFunctionScanNode extends ScanNode {
private static final Logger LOG = LogManager.getLogger(TableValuedFunctionScanNode.class.getName());
private List<TScanRangeLocations> shardScanRanges;
private TableValuedFunctionInf tvf;
private boolean isFinalized = false;
public TableValuedFunctionScanNode(PlanNodeId id, TupleDescriptor desc,
String planNodeName, TableValuedFunctionInf tvf) {
super(id, desc, planNodeName, StatisticalType.TABLE_VALUED_FUNCTION_NODE);
this.tvf = tvf;
}
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
computeStats(analyzer);
}
@Override
public int getNumInstances() {
return shardScanRanges.size();
}
@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
return shardScanRanges;
}
@Override
public void finalize(Analyzer analyzer) throws UserException {
if (isFinalized) {
return;
}
try {
shardScanRanges = getShardLocations();
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
isFinalized = true;
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.TABLE_VALUED_FUNCTION_SCAN_NODE;
TTableValuedFunctionScanNode tvfScanNode = new TTableValuedFunctionScanNode();
tvfScanNode.setTupleId(desc.getId().asInt());
tvfScanNode.setFuncName(tvf.getFuncName());
msg.table_valued_func_scan_node = tvfScanNode;
}
private List<TScanRangeLocations> getShardLocations() throws AnalysisException {
List<TScanRangeLocations> result = Lists.newArrayList();
for (TableValuedFunctionTask task : tvf.getTasks()) {
TScanRangeLocations locations = new TScanRangeLocations();
TScanRangeLocation location = new TScanRangeLocation();
location.setBackendId(task.getBackend().getId());
location.setServer(new TNetworkAddress(task.getBackend().getHost(), task.getBackend().getBePort()));
locations.addToLocations(location);
locations.setScanRange(task.getExecParams());
result.add(locations);
}
return result;
}
}

View File

@ -577,9 +577,13 @@ public class Coordinator {
int profileFragmentId = 0;
long memoryLimit = queryOptions.getMemLimit();
Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap();
// If #fragments >=3, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start,
// If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start,
// else use exec_plan_fragments directly.
boolean twoPhaseExecution = fragments.size() >= 3;
// we choose #fragments >=2 because in some cases
// we need ensure that A fragment is already prepared to receive data before B fragment sends data.
// For example: select * from numbers("10","w") will generate ExchangeNode and TableValuedFunctionScanNode,
// we should ensure TableValuedFunctionScanNode does not send data until ExchangeNode is ready to receive.
boolean twoPhaseExecution = fragments.size() >= 2;
for (PlanFragment fragment : fragments) {
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());

View File

@ -44,4 +44,5 @@ public enum StatisticalType {
STREAM_LOAD_SCAN_NODE,
TABLE_FUNCTION_NODE,
UNION_NODE,
TABLE_VALUED_FUNCTION_NODE,
}

View File

@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TTVFNumbersScanRange;
import org.apache.doris.thrift.TTVFScanRange;
import org.apache.doris.thrift.TTVFunctionName;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
// Table function that generate int64 numbers
// have a single column number
/**
* The Implement of table valued function——numbers(N,M).
*/
public class NumbersTableValuedFunction extends TableValuedFunctionInf {
public static final String NAME = "numbers";
private static final Logger LOG = LogManager.getLogger(PlanNode.class);
// The total numbers will be generated.
private long totalNumbers;
// The total backends will server it.
private int tabletsNum;
/**
* Constructor.
* @param params params from user
* @throws UserException exception
*/
public NumbersTableValuedFunction(List<String> params) throws UserException {
if (params.size() < 1 || params.size() > 2) {
throw new UserException(
"numbers table function only support numbers(10000 /*total numbers*/)"
+ "or numbers(10000, 2 /*number of tablets to run*/)");
}
totalNumbers = Long.parseLong(params.get(0));
// default tabletsNum is 1.
tabletsNum = 1;
if (params.size() == 2) {
tabletsNum = Integer.parseInt(params.get(1));
}
}
@Override
public TTVFunctionName getFuncName() {
return TTVFunctionName.NUMBERS;
}
@Override
public String getTableName() {
return "NumbersTableValuedFunction";
}
@Override
public List<Column> getTableColumns() {
List<Column> resColumns = new ArrayList<>();
resColumns.add(new Column("number", PrimitiveType.BIGINT, false));
return resColumns;
}
@Override
public List<TableValuedFunctionTask> getTasks() throws AnalysisException {
List<Backend> backendList = Lists.newArrayList();
for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) {
if (be.isAlive()) {
backendList.add(be);
}
}
if (backendList.isEmpty()) {
throw new AnalysisException("No Alive backends");
}
Collections.shuffle(backendList);
List<TableValuedFunctionTask> res = Lists.newArrayList();
for (int i = 0; i < tabletsNum; ++i) {
TScanRange scanRange = new TScanRange();
TTVFScanRange tvfScanRange = new TTVFScanRange();
TTVFNumbersScanRange tvfNumbersScanRange = new TTVFNumbersScanRange();
tvfNumbersScanRange.setTotalNumbers(totalNumbers);
tvfScanRange.setNumbersParams(tvfNumbersScanRange);
scanRange.setTvfScanRange(tvfScanRange);
res.add(new TableValuedFunctionTask(backendList.get(i % backendList.size()), scanRange));
}
return res;
}
}

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.TTVFunctionName;
import java.util.List;
public abstract class TableValuedFunctionInf {
public abstract TTVFunctionName getFuncName();
public Table getTable() {
Table table = new Table(-1, getTableName(), TableIf.TableType.TABLE_VALUED_FUNCTION, getTableColumns());
return table;
}
// All table functions should be registered here
public static TableValuedFunctionInf getTableFunction(String funcName, List<String> params) throws UserException {
if (funcName.equalsIgnoreCase(NumbersTableValuedFunction.NAME)) {
return new NumbersTableValuedFunction(params);
}
throw new UserException("Could not find table function " + funcName);
}
public abstract String getTableName();
public abstract List<Column> getTableColumns();
public abstract List<TableValuedFunctionTask> getTasks() throws AnalysisException;
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TScanRange;
public class TableValuedFunctionTask {
// Expected running backend
private Backend backend;
// Function running parameters
private TScanRange execParams;
public TableValuedFunctionTask(Backend backend, TScanRange execParams) {
this.backend = backend;
this.execParams = execParams;
}
public Backend getBackend() {
return backend;
}
public TScanRange getExecParams() {
return execParams;
}
}