[feature](Export) support export with nereids (#23319)

This commit is contained in:
Tiewei Fang
2023-08-29 19:36:19 +08:00
committed by GitHub
parent 94a8fa6bc9
commit 103fa4eb55
13 changed files with 582 additions and 52 deletions

View File

@ -407,6 +407,9 @@ WITHIN: 'WITHIN';
YEAR: 'YEAR';
ZONE: 'ZONE';
DATEV2: 'DATEV2';
S3: 'S3';
HDFS: 'HDFS';
BROKER: 'BROKER';
//--DORIS-KEYWORD-LIST-END
//============================
// End of the keywords list

View File

@ -53,11 +53,14 @@ statement
(PARTITION partition=identifierList)?
(USING relation (COMMA relation)*)
whereClause #delete
| EXPORT TABLE tableName=multipartIdentifier
(PARTITION partition=identifierList)?
(whereClause)?
TO filePath=constant
(propertyClause)?
(withRemoteStorageSystem)? #export
;
propertiesStatment
: properties+=property (COMMA properties+=property)*
;
// -----------------Command accessories-----------------
@ -88,6 +91,22 @@ planType
| ALL // default type
;
withRemoteStorageSystem
: WITH S3 LEFT_PAREN
brokerProperties=propertyItemList
RIGHT_PAREN
| WITH HDFS LEFT_PAREN
brokerProperties=propertyItemList
RIGHT_PAREN
| WITH LOCAL LEFT_PAREN
brokerProperties=propertyItemList
RIGHT_PAREN
| WITH BROKER brokerName=identifierOrText
(LEFT_PAREN
brokerProperties=propertyItemList
RIGHT_PAREN)?
;
// -----------------Query-----------------
// add queryOrganization for parse (q1) union (q2) union (q3) order by keys, otherwise 'order' will be recognized to be
// identifier.
@ -95,7 +114,7 @@ planType
outFileClause
: INTO OUTFILE filePath=constant
(FORMAT AS format=identifier)?
(PROPERTIES LEFT_PAREN properties+=property (COMMA properties+=property)* RIGHT_PAREN)?
(propertyClause)?
;
query
@ -271,15 +290,25 @@ relationPrimary
: multipartIdentifier specifiedPartition? tabletList? tableAlias relationHint? lateralView* #tableName
| LEFT_PAREN query RIGHT_PAREN tableAlias lateralView* #aliasedQuery
| tvfName=identifier LEFT_PAREN
(properties+=property (COMMA properties+=property)*)?
(properties=propertyItemList)?
RIGHT_PAREN tableAlias #tableValuedFunction
;
property
: key=propertyItem EQ value=propertyItem
propertyClause
: PROPERTIES LEFT_PAREN fileProperties=propertyItemList RIGHT_PAREN
;
propertyItem : identifier | constant ;
propertyItemList
: properties+=propertyItem (COMMA properties+=propertyItem)*
;
propertyItem
: key=propertyKey EQ value=propertyValue
;
propertyKey : identifier | constant ;
propertyValue : identifier | constant ;
tableAlias
: (AS? strictIdentifier identifierList?)?
@ -798,5 +827,8 @@ nonReserved
| WITHIN
| YEAR
| ZONE
| S3
| HDFS
| BROKER
//--DEFAULT-NON-RESERVED-END
;

View File

@ -24,6 +24,7 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
@ -50,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
// EXPORT statement, export data to dirs by broker.
//
@ -65,7 +67,6 @@ public class ExportStmt extends StatementBase {
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_COLUMNS = "";
private static final String DEFAULT_PARALLELISM = "1";
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
@ -85,6 +86,7 @@ public class ExportStmt extends StatementBase {
private TableName tblName;
private List<String> partitionStringNames;
private Expr whereExpr;
private String whereSql;
private String path;
private BrokerDesc brokerDesc;
private Map<String, String> properties = Maps.newHashMap();
@ -121,13 +123,21 @@ public class ExportStmt extends StatementBase {
this.brokerDesc = brokerDesc;
this.columnSeparator = DEFAULT_COLUMN_SEPARATOR;
this.lineDelimiter = DEFAULT_LINE_DELIMITER;
this.columns = DEFAULT_COLUMNS;
Optional<SessionVariable> optionalSessionVariable = Optional.ofNullable(
ConnectContext.get().getSessionVariable());
this.sessionVariables = optionalSessionVariable.orElse(VariableMgr.getDefaultSessionVariable());
}
/**
* This constructor used by nereids planner
*/
public ExportStmt(TableRef tableRef, String whereSql, String path,
Map<String, String> properties, BrokerDesc brokerDesc) {
this(tableRef, (Expr) null, path, properties, brokerDesc);
this.whereSql = whereSql;
}
@Override
public boolean needAuditEncryption() {
return brokerDesc != null;
@ -209,6 +219,7 @@ public class ExportStmt extends StatementBase {
// set where expr
exportJob.setWhereExpr(this.whereExpr);
exportJob.setWhereSql(this.whereSql);
// set path
exportJob.setExportPath(this.path);
@ -223,7 +234,7 @@ public class ExportStmt extends StatementBase {
exportJob.setMaxFileSize(this.maxFileSize);
exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
if (!Strings.isNullOrEmpty(this.columns)) {
if (columns != null) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
exportJob.setExportColumns(split.splitToList(this.columns.toLowerCase()));
}
@ -343,7 +354,14 @@ public class ExportStmt extends StatementBase {
properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR));
this.lineDelimiter = Separator.convertSeparator(PropertyAnalyzer.analyzeLineDelimiter(
properties, ExportStmt.DEFAULT_LINE_DELIMITER));
this.columns = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_COLUMNS, DEFAULT_COLUMNS);
// null means not specified
// "" means user specified zero columns
this.columns = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_COLUMNS, null);
// check columns are exits
if (columns != null) {
checkColumns();
}
// format
this.format = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE, "csv").toLowerCase();
@ -370,6 +388,24 @@ public class ExportStmt extends StatementBase {
}
}
private void checkColumns() throws DdlException {
if (this.columns.isEmpty()) {
throw new DdlException("columns can not be empty");
}
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb());
Table table = db.getTableOrDdlException(this.tblName.getTbl());
List<String> tableColumns = table.getBaseSchema().stream().map(column -> column.getName())
.collect(Collectors.toList());
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
List<String> columnsSpecified = split.splitToList(this.columns.toLowerCase());
for (String columnName : columnsSpecified) {
if (!tableColumns.contains(columnName)) {
throw new DdlException("unknown column [" + columnName + "] in table [" + this.tblName.getTbl() + "]");
}
}
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();

View File

@ -156,6 +156,7 @@ public class ExportJob implements Writable {
private TableRef tableRef;
private Expr whereExpr;
private String whereSql;
private String sql = "";

View File

@ -18,6 +18,9 @@
package org.apache.doris.nereids.parser;
import org.apache.doris.analysis.ArithmeticExpr.Operator;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
@ -50,6 +53,7 @@ import org.apache.doris.nereids.DorisParser.DereferenceContext;
import org.apache.doris.nereids.DorisParser.ElementAtContext;
import org.apache.doris.nereids.DorisParser.ExistContext;
import org.apache.doris.nereids.DorisParser.ExplainContext;
import org.apache.doris.nereids.DorisParser.ExportContext;
import org.apache.doris.nereids.DorisParser.FromClauseContext;
import org.apache.doris.nereids.DorisParser.GroupingElementContext;
import org.apache.doris.nereids.DorisParser.GroupingSetContext;
@ -81,9 +85,11 @@ import org.apache.doris.nereids.DorisParser.PlanTypeContext;
import org.apache.doris.nereids.DorisParser.PredicateContext;
import org.apache.doris.nereids.DorisParser.PredicatedContext;
import org.apache.doris.nereids.DorisParser.PrimitiveDataTypeContext;
import org.apache.doris.nereids.DorisParser.PropertiesStatmentContext;
import org.apache.doris.nereids.DorisParser.PropertyContext;
import org.apache.doris.nereids.DorisParser.PropertyClauseContext;
import org.apache.doris.nereids.DorisParser.PropertyItemContext;
import org.apache.doris.nereids.DorisParser.PropertyItemListContext;
import org.apache.doris.nereids.DorisParser.PropertyKeyContext;
import org.apache.doris.nereids.DorisParser.PropertyValueContext;
import org.apache.doris.nereids.DorisParser.QualifiedNameContext;
import org.apache.doris.nereids.DorisParser.QueryContext;
import org.apache.doris.nereids.DorisParser.QueryOrganizationContext;
@ -117,6 +123,7 @@ import org.apache.doris.nereids.DorisParser.UserVariableContext;
import org.apache.doris.nereids.DorisParser.WhereClauseContext;
import org.apache.doris.nereids.DorisParser.WindowFrameContext;
import org.apache.doris.nereids.DorisParser.WindowSpecContext;
import org.apache.doris.nereids.DorisParser.WithRemoteStorageSystemContext;
import org.apache.doris.nereids.DorisParserBaseVisitor;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundAlias;
@ -232,6 +239,7 @@ import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
@ -380,6 +388,68 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
ctx.explain());
}
@Override
public LogicalPlan visitExport(ExportContext ctx) {
List<String> tableName = visitMultipartIdentifier(ctx.tableName);
List<String> partitions = ctx.partition == null ? ImmutableList.of() : visitIdentifierList(ctx.partition);
String path = parseConstant(ctx.filePath);
String whereSql = null;
if (ctx.whereClause() != null) {
WhereClauseContext whereClauseContext = ctx.whereClause();
int startIndex = whereClauseContext.start.getStartIndex();
int stopIndex = whereClauseContext.stop.getStopIndex();
org.antlr.v4.runtime.misc.Interval interval = new org.antlr.v4.runtime.misc.Interval(startIndex,
stopIndex);
whereSql = whereClauseContext.start.getInputStream().getText(interval);
}
Map<String, String> filePropertiesMap = null;
if (ctx.propertyClause() != null) {
filePropertiesMap = visitPropertyClause(ctx.propertyClause());
}
BrokerDesc brokerDesc = null;
if (ctx.withRemoteStorageSystem() != null) {
brokerDesc = visitWithRemoteStorageSystem(ctx.withRemoteStorageSystem());
}
return new ExportCommand(tableName, partitions, whereSql, path, filePropertiesMap, brokerDesc);
}
@Override
public Map<String, String> visitPropertyClause(PropertyClauseContext ctx) {
return visitPropertyItemList(ctx.fileProperties);
}
@Override
public Map<String, String> visitPropertyItemList(PropertyItemListContext ctx) {
Builder<String, String> propertiesMap = ImmutableMap.builder();
for (PropertyItemContext argument : ctx.properties) {
String key = parsePropertyKey(argument.key);
String value = parsePropertyValue(argument.value);
propertiesMap.put(key, value);
}
return propertiesMap.build();
}
@Override
public BrokerDesc visitWithRemoteStorageSystem(WithRemoteStorageSystemContext ctx) {
BrokerDesc brokerDesc = null;
Map<String, String> brokerPropertiesMap = visitPropertyItemList(ctx.brokerProperties);
if (ctx.S3() != null) {
brokerDesc = new BrokerDesc("S3", StorageBackend.StorageType.S3, brokerPropertiesMap);
} else if (ctx.HDFS() != null) {
brokerDesc = new BrokerDesc("HDFS", StorageBackend.StorageType.HDFS, brokerPropertiesMap);
} else if (ctx.LOCAL() != null) {
brokerDesc = new BrokerDesc("HDFS", StorageType.LOCAL, brokerPropertiesMap);
} else if (ctx.BROKER() != null) {
brokerDesc = new BrokerDesc(visitIdentifierOrText(ctx.brokerName), brokerPropertiesMap);
}
return brokerDesc;
}
/**
* Visit multi-statements.
*/
@ -399,17 +469,6 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
return logicalPlans;
}
@Override
public Properties visitPropertiesStatment(PropertiesStatmentContext ctx) {
Builder<String, String> map = ImmutableMap.builder();
for (PropertyContext argument : ctx.properties) {
String key = parsePropertyItem(argument.key);
String value = parsePropertyItem(argument.value);
map.put(key, value);
}
return new Properties(map.build());
}
/* ********************************************************************************************
* Plan parsing
* ******************************************************************************************** */
@ -641,14 +700,9 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
return ParserUtils.withOrigin(ctx, () -> {
String functionName = ctx.tvfName.getText();
Builder<String, String> map = ImmutableMap.builder();
for (PropertyContext argument : ctx.properties) {
String key = parsePropertyItem(argument.key);
String value = parsePropertyItem(argument.value);
map.put(key, value);
}
Map<String, String> map = visitPropertyItemList(ctx.properties);
LogicalPlan relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
functionName, new Properties(map.build()));
functionName, new Properties(map));
return withTableAlias(relation, ctx.tableAlias());
});
}
@ -1523,11 +1577,10 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
if (ctx.format != null) {
format = ctx.format.getText();
}
Map<String, String> properties = Maps.newHashMap();
for (PropertyContext argument : ctx.properties) {
String key = parseConstant(argument.key.constant());
String value = parseConstant(argument.value.constant());
properties.put(key, value);
Map<String, String> properties = ImmutableMap.of();
if (ctx.propertyClause() != null) {
properties = visitPropertyClause(ctx.propertyClause());
}
Literal filePath = (Literal) visit(ctx.filePath);
return new LogicalFileSink<>(filePath.getStringValue(), format, properties, ImmutableList.of(), plan);
@ -1973,7 +2026,14 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
}
}
private String parsePropertyItem(PropertyItemContext item) {
private String parsePropertyKey(PropertyKeyContext item) {
if (item.constant() != null) {
return parseConstant(item.constant());
}
return item.getText();
}
private String parsePropertyValue(PropertyValueContext item) {
if (item.constant() != null) {
return parseConstant(item.constant());
}

View File

@ -81,7 +81,7 @@ public class NereidsParser {
}
public Properties parseProperties(String properties) {
return parse(properties, DorisParser::propertiesStatment);
return parse(properties, DorisParser::propertyItemList);
}
private <T> T parse(String sql, Function<DorisParser, ParserRuleContext> parseFunction) {

View File

@ -120,5 +120,6 @@ public enum PlanType {
EXPLAIN_COMMAND,
INSERT_INTO_TABLE_COMMAND,
SELECT_INTO_OUTFILE_COMMAND,
UPDATE_COMMAND
UPDATE_COMMAND,
EXPORT_COMMAND
}

View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.ExportStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import java.util.List;
import java.util.Map;
/**
* export table
*/
public class ExportCommand extends Command implements ForwardWithSync {
private List<String> nameParts;
private String whereSql;
private String path;
private List<String> partitionsNameList;
private Map<String, String> fileProperties;
private BrokerDesc brokerDesc;
/**
* constructor of ExportCommand
*/
public ExportCommand(List<String> nameParts, List<String> partitions, String whereSql, String path,
Map<String, String> fileProperties, BrokerDesc brokerDesc) {
super(PlanType.EXPORT_COMMAND);
this.nameParts = nameParts;
this.partitionsNameList = partitions;
this.whereSql = whereSql;
this.path = path.trim();
this.fileProperties = fileProperties;
this.brokerDesc = brokerDesc;
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
ExportStmt exportStmt = generateExportStmt();
Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
exportStmt.analyze(analyzer);
ctx.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt);
}
private ExportStmt generateExportStmt() {
// generate tableRef
PartitionNames partitionNames = null;
if (!this.partitionsNameList.isEmpty()) {
partitionNames = new PartitionNames(false, this.partitionsNameList);
}
TableRef tableRef = new TableRef(new TableName(getTableName()), null, partitionNames, null, null, null);
return new ExportStmt(tableRef, whereSql, path, fileProperties, brokerDesc);
}
public String getTableName() {
return nameParts.stream().map(Utils::quoteIfNeeded)
.reduce((left, right) -> left + "." + right).orElse("");
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitExportCommand(this, context);
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
@ -49,4 +50,8 @@ public interface CommandVisitor<R, C> {
default R visitDeleteCommand(DeleteCommand deleteCommand, C context) {
return visitCommand(deleteCommand, context);
}
default R visitExportCommand(ExportCommand exportCommand, C context) {
return visitCommand(exportCommand, context);
}
}