[Feature](Nereids) support select into outfile (#21197)

This commit is contained in:
mch_ucchi
2023-07-13 17:01:47 +08:00
committed by GitHub
parent b72e0d9172
commit d4bdd6768c
25 changed files with 3179 additions and 8 deletions

View File

@ -272,6 +272,7 @@ OR: 'OR';
ORDER: 'ORDER';
OUT: 'OUT';
OUTER: 'OUTER';
OUTFILE: 'OUTFILE';
OUTPUTFORMAT: 'OUTPUTFORMAT';
OVER: 'OVER';
OVERLAPS: 'OVERLAPS';

View File

@ -34,7 +34,7 @@ singleStatement
;
statement
: explain? query #statementDefault
: explain? query outFileClause? #statementDefault
| CREATE ROW POLICY (IF NOT EXISTS)? name=identifier
ON table=multipartIdentifier
AS type=(RESTRICTIVE | PERMISSIVE)
@ -84,6 +84,13 @@ planType
// -----------------Query-----------------
// add queryOrganization for parse (q1) union (q2) union (q3) order by keys, otherwise 'order' will be recognized to be
// identifier.
outFileClause
: INTO OUTFILE filePath=constant
(FORMAT AS format=identifier)?
(PROPERTIES LEFT_PAREN properties+=tvfProperty (COMMA properties+=tvfProperty)* RIGHT_PAREN)?
;
query
: {!doris_legacy_SQL_syntax}? cte? queryTerm queryOrganization
| {doris_legacy_SQL_syntax}? queryTerm queryOrganization

View File

@ -26,6 +26,7 @@ import org.apache.doris.analysis.StatementBase;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import java.util.ArrayList;
@ -59,7 +60,7 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable {
@Override
public boolean hasOutFileClause() {
return false;
return logicalPlan instanceof LogicalFileSink;
}
@Override

View File

@ -29,6 +29,7 @@ import org.apache.doris.analysis.GroupByClause.GroupingType;
import org.apache.doris.analysis.GroupingInfo;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.OrderByElement;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
@ -95,6 +96,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
@ -146,6 +148,7 @@ import org.apache.doris.planner.PartitionSortNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.RepeatNode;
import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ResultSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
@ -329,7 +332,36 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
);
rootFragment.setSink(sink);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
return rootFragment;
}
@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = fileSink.child().accept(this, context);
OutFileClause outFile = new OutFileClause(
fileSink.getFilePath(),
fileSink.getFormat(),
fileSink.getProperties()
);
List<Expr> outputExprs = Lists.newArrayList();
fileSink.getOutput().stream().map(Slot::getExprId)
.forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
rootFragment.setOutputExprs(outputExprs);
try {
outFile.analyze(null, outputExprs,
fileSink.getOutput().stream().map(NamedExpression::getName).collect(Collectors.toList()));
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
ResultFileSink sink = new ResultFileSink(rootFragment.getPlanRoot().getId(), outFile);
rootFragment.setSink(sink);
return rootFragment;
}

View File

@ -37,6 +37,7 @@ import org.apache.doris.nereids.DorisParser.ColumnReferenceContext;
import org.apache.doris.nereids.DorisParser.CommentJoinHintContext;
import org.apache.doris.nereids.DorisParser.CommentRelationHintContext;
import org.apache.doris.nereids.DorisParser.ComparisonContext;
import org.apache.doris.nereids.DorisParser.ConstantContext;
import org.apache.doris.nereids.DorisParser.CreateRowPolicyContext;
import org.apache.doris.nereids.DorisParser.CteContext;
import org.apache.doris.nereids.DorisParser.Date_addContext;
@ -71,6 +72,7 @@ import org.apache.doris.nereids.DorisParser.MultipartIdentifierContext;
import org.apache.doris.nereids.DorisParser.NamedExpressionContext;
import org.apache.doris.nereids.DorisParser.NamedExpressionSeqContext;
import org.apache.doris.nereids.DorisParser.NullLiteralContext;
import org.apache.doris.nereids.DorisParser.OutFileClauseContext;
import org.apache.doris.nereids.DorisParser.ParenthesizedExpressionContext;
import org.apache.doris.nereids.DorisParser.PlanTypeContext;
import org.apache.doris.nereids.DorisParser.PredicateContext;
@ -224,6 +226,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
@ -308,7 +311,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
@Override
public LogicalPlan visitStatementDefault(StatementDefaultContext ctx) {
LogicalPlan plan = plan(ctx.query());
return withExplain(plan, ctx.explain());
return withExplain(withOutFile(plan, ctx.outFileClause()), ctx.explain());
}
@Override
@ -1427,6 +1430,24 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
});
}
private LogicalPlan withOutFile(LogicalPlan plan, OutFileClauseContext ctx) {
if (ctx == null) {
return plan;
}
String format = "csv";
if (ctx.format != null) {
format = ctx.format.getText();
}
Map<String, String> properties = Maps.newHashMap();
for (TvfPropertyContext argument : ctx.properties) {
String key = parseConstant(argument.key.constant());
String value = parseConstant(argument.value.constant());
properties.put(key, value);
}
Literal filePath = (Literal) visit(ctx.filePath);
return new LogicalFileSink<>(filePath.getStringValue(), format, properties, plan);
}
private LogicalPlan withQueryOrganization(LogicalPlan inputPlan, QueryOrganizationContext ctx) {
if (ctx == null) {
return inputPlan;
@ -1856,10 +1877,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
private String parseTVFPropertyItem(TvfPropertyItemContext item) {
if (item.constant() != null) {
Object constant = visit(item.constant());
if (constant instanceof Literal && ((Literal) constant).isStringLikeLiteral()) {
return ((Literal) constant).getStringValue();
}
return parseConstant(item.constant());
}
return item.getText();
}
@ -1910,4 +1928,12 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
}
throw new ParseException("Unsupported function with order expressions" + ctx.getText(), ctx);
}
private String parseConstant(ConstantContext context) {
Object constant = visit(context);
if (constant instanceof Literal && ((Literal) constant).isStringLikeLiteral()) {
return ((Literal) constant).getStringValue();
}
return context.getText();
}
}

View File

@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
@ -95,6 +96,11 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
* sink Node, in lexicographical order
* ******************************************************************************************** */
@Override
public PhysicalProperties visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, PlanContext context) {
return PhysicalProperties.GATHER;
}
@Override
public PhysicalProperties visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> olapTableSink,
PlanContext context) {

View File

@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
@ -219,6 +220,12 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
return null;
}
@Override
public Void visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
return null;
}
private List<PhysicalProperties> createHashRequestAccordingToParent(
Plan plan, DistributionSpecHash distributionRequestFromParent, PlanContext context) {
List<PhysicalProperties> requiredPropertyList =

View File

@ -48,6 +48,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalEmptyRelationToPhysi
import org.apache.doris.nereids.rules.implementation.LogicalEsScanToPhysicalEsScan;
import org.apache.doris.nereids.rules.implementation.LogicalExceptToPhysicalExcept;
import org.apache.doris.nereids.rules.implementation.LogicalFileScanToPhysicalFileScan;
import org.apache.doris.nereids.rules.implementation.LogicalFileSinkToPhysicalFileSink;
import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
import org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate;
import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
@ -167,6 +168,7 @@ public class RuleSet {
.add(new LogicalIntersectToPhysicalIntersect())
.add(new LogicalGenerateToPhysicalGenerate())
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.build();
public static final List<Rule> ZIG_ZAG_TREE_JOIN_REORDER = planRuleFactories()

View File

@ -301,6 +301,7 @@ public enum RuleType {
LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ASSERT_NUM_ROWS_TO_PHYSICAL_ASSERT_NUM_ROWS(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITH_PROJECT(RuleTypeClass.IMPLEMENTATION),

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import java.util.Optional;
/**
* logical file sink to physical file sink
*/
public class LogicalFileSinkToPhysicalFileSink extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalFileSink().thenApply(ctx -> {
LogicalFileSink<? extends Plan> sink = ctx.root;
return new PhysicalFileSink<>(
sink.getFilePath(),
sink.getFormat(),
sink.getProperties(),
Optional.empty(),
sink.getLogicalProperties(),
sink.child());
}).toRule(RuleType.LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE);
}
}

View File

@ -60,6 +60,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
@ -87,6 +88,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
@ -243,6 +245,11 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
return groupExpression.childStatistics(0);
}
@Override
public Statistics visitLogicalFileSink(LogicalFileSink<? extends Plan> olapTableSink, Void context) {
return groupExpression.childStatistics(0);
}
@Override
public Statistics visitLogicalEmptyRelation(LogicalEmptyRelation emptyRelation, Void context) {
return computeEmptyRelation(emptyRelation);
@ -375,6 +382,11 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
return groupExpression.childStatistics(0);
}
@Override
public Statistics visitPhysicalFileSink(PhysicalFileSink<? extends Plan> olapTableSink, Void context) {
return groupExpression.childStatistics(0);
}
@Override
public Statistics visitPhysicalWindow(PhysicalWindow window, Void context) {
return computeWindow(window);

View File

@ -65,6 +65,7 @@ public enum PlanType {
LOGICAL_CTE_ANCHOR,
LOGICAL_CTE_PRODUCER,
LOGICAL_CTE_CONSUMER,
LOGICAL_FILE_SINK,
GROUP_PLAN,
@ -101,11 +102,13 @@ public enum PlanType {
PHYSICAL_UNION,
PHYSICAL_EXCEPT,
PHYSICAL_INTERSECT,
PHYSICAL_FILE_SINK,
COMMAND,
EXPLAIN_COMMAND,
CREATE_POLICY_COMMAND,
INSERT_INTO_TABLE_COMMAND,
UPDATE_COMMAND,
DELETE_COMMAND
DELETE_COMMAND,
SELECT_INTO_OUTFILE_COMMAND
}

View File

@ -0,0 +1,122 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
/**
* logicalFileSink for select into outfile
*/
public class LogicalFileSink<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> {
private final String filePath;
private final String format;
private final Map<String, String> properties;
public LogicalFileSink(String filePath, String format, Map<String, String> properties, CHILD_TYPE child) {
this(filePath, format, properties, Optional.empty(), Optional.empty(), child);
}
public LogicalFileSink(String filePath, String format, Map<String, String> properties,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(PlanType.LOGICAL_FILE_SINK, groupExpression, logicalProperties, child);
this.filePath = Objects.requireNonNull(filePath);
this.format = Objects.requireNonNull(format);
this.properties = ImmutableMap.copyOf(Objects.requireNonNull(properties));
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new LogicalFileSink<>(filePath, format, properties, children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalFileSink(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return ImmutableList.of();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LogicalFileSink<?> that = (LogicalFileSink<?>) o;
return Objects.equals(filePath, that.filePath)
&& Objects.equals(format, that.format)
&& Objects.equals(properties, that.properties);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), filePath, format, properties);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalFileSink<>(filePath, format, properties, groupExpression,
Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new LogicalFileSink<>(filePath, format, properties, groupExpression, logicalProperties, children.get(0));
}
@Override
public List<Slot> computeOutput() {
return child().getOutput();
}
public String getFilePath() {
return filePath;
}
public String getFormat() {
return format;
}
public Map<String, String> getProperties() {
return properties;
}
}

View File

@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
/**
* physicalFileSink for select into outfile
*/
public class PhysicalFileSink<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> {
private final String filePath;
private final String format;
private final Map<String, String> properties;
public PhysicalFileSink(String filePath, String format, Map<String, String> properties,
LogicalProperties logicalProperties, CHILD_TYPE child) {
this(filePath, format, properties, Optional.empty(), logicalProperties, child);
}
public PhysicalFileSink(String filePath, String format, Map<String, String> properties,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_FILE_SINK, groupExpression, logicalProperties, child);
this.filePath = filePath;
this.format = format;
this.properties = properties;
}
public PhysicalFileSink(String filePath, String format, Map<String, String> properties,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) {
super(PlanType.PHYSICAL_FILE_SINK, groupExpression, logicalProperties, physicalProperties, statistics, child);
this.filePath = filePath;
this.format = format;
this.properties = properties;
}
public String getFilePath() {
return filePath;
}
public String getFormat() {
return format;
}
public Map<String, String> getProperties() {
return properties;
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "PhysicalFileSink only accepts one child");
return new PhysicalFileSink<>(filePath, format, properties, getLogicalProperties(), children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalFileSink(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return ImmutableList.of();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PhysicalFileSink<?> that = (PhysicalFileSink<?>) o;
return Objects.equals(filePath, that.filePath)
&& Objects.equals(format, that.format)
&& Objects.equals(properties, that.properties);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), filePath, format, properties);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalFileSink<>(filePath, format, properties, groupExpression, getLogicalProperties(), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalFileSink<>(filePath, format, properties, groupExpression, logicalProperties.get(),
children.get(0));
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalFileSink<>(filePath, format, properties, groupExpression, getLogicalProperties(),
physicalProperties, statistics, child());
}
}

View File

@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
@ -70,6 +71,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
@ -421,4 +423,12 @@ public abstract class PlanVisitor<R, C> implements CommandVisitor<R, C> {
public R visitLogicalCTEAnchor(LogicalCTEAnchor<? extends Plan, ? extends Plan> cteAnchor, C context) {
return visit(cteAnchor, context);
}
public R visitLogicalFileSink(LogicalFileSink<? extends Plan> fileSink, C context) {
return visit(fileSink, context);
}
public R visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, C context) {
return visit(fileSink, context);
}
}

View File

@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.translator.PhysicalPlanTranslator;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanPatternMatchSupported;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.utframe.TestWithFeService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class OutFileTest extends TestWithFeService implements PlanPatternMatchSupported {
private final NereidsParser parser = new NereidsParser();
@Override
protected void runBeforeAll() throws Exception {
createDatabase("test");
connectContext.setDatabase("default_cluster:test");
createTables(
"CREATE TABLE IF NOT EXISTS T1 (\n"
+ " id bigint,\n"
+ " score bigint\n"
+ ")\n"
+ "DUPLICATE KEY(id)\n"
+ "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ")\n",
"CREATE TABLE IF NOT EXISTS T2 (\n"
+ " id bigint,\n"
+ " score bigint\n"
+ ")\n"
+ "DUPLICATE KEY(id)\n"
+ "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ")\n"
);
}
@Test
public void testWriteOutFile() throws Exception {
String sql = "select * from T1 join T2 on T1.id = T2.id into outfile 'file://~/result.csv'\n"
+ " format as csv\n"
+ " properties (\n"
+ " \"column_separator\" = \",\",\n"
+ " \"line_delimiter\" = \"\\n\",\n"
+ " \"broker.name\" = \"my_broker\",\n"
+ " \"max_file_size\" = \"100MB\"\n"
+ ")";
Assertions.assertTrue(getOutputFragment(sql).getExplainString(TExplainLevel.BRIEF)
.contains("FILE SINK"));
}
private PlanFragment getOutputFragment(String sql) throws Exception {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
NereidsPlanner planner = new NereidsPlanner(statementContext);
PhysicalPlan plan = planner.plan(
parser.parseSingle(sql),
PhysicalProperties.ANY
);
return new PhysicalPlanTranslator(new PlanTranslatorContext(planner.getCascadesContext())).translatePlan(plan);
}
}

View File

@ -65,6 +65,7 @@ public class QueryPlanTest extends TestWithFeService {
// create database
createDatabase("test");
connectContext.getSessionVariable().setEnableNereidsPlanner(false);
createTable("create table test.test1\n"
+ "(\n"

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,25 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
1 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 1 1 true 1 1 1 1.1 1.1 char1 1
2 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 2 2 true 2 2 2 2.2 2.2 char2 2
3 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 3 3 true 3 3 3 3.3 3.3 char3 3
4 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 4 4 true 4 4 4 4.4 4.4 char4 4
5 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 5 5 true 5 5 5 5.5 5.5 char5 5
6 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 6 6 true 6 6 6 6.6 6.6 char6 6
7 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 7 7 true 7 7 7 7.7 7.7 char7 7
8 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8
9 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9
10 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 \N \N \N \N \N \N \N \N \N \N \N
-- !select_default --
1 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 1 1 true 1 1 1 1.1 1.1 char1 1
2 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 2 2 true 2 2 2 2.2 2.2 char2 2
3 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 3 3 true 3 3 3 3.3 3.3 char3 3
4 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 4 4 true 4 4 4 4.4 4.4 char4 4
5 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 5 5 true 5 5 5 5.5 5.5 char5 5
6 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 6 6 true 6 6 6 6.6 6.6 char6 6
7 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 7 7 true 7 7 7 7.7 7.7 char7 7
8 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8
9 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9
10 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 \N \N \N \N \N \N \N \N \N \N \N

View File

@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_1 --
1 abc
2 def
-- !select_2 --
1 abc
1 abc
2 def
2 def

View File

@ -0,0 +1,226 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
suite("test_outfile") {
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)
strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe")
String command = strBuilder.toString()
def process = command.toString().execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def response = parseJson(out.trim())
assertEquals(response.code, 0)
assertEquals(response.msg, "success")
def configJson = response.data.rows
boolean enableOutfileToLocal = false
for (Object conf: configJson) {
assert conf instanceof Map
if (((Map<String, String>) conf).get("Name").toLowerCase() == "enable_outfile_to_local") {
enableOutfileToLocal = ((Map<String, String>) conf).get("Value").toLowerCase() == "true"
}
}
if (!enableOutfileToLocal) {
logger.warn("Please set enable_outfile_to_local to true to run test_outfile")
return
}
def tableName = "outfile_test"
def uuid = UUID.randomUUID().toString()
def outFilePath = """/tmp/test_outfile_${uuid}"""
try {
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`date_1` DATEV2 NOT NULL COMMENT "",
`datetime_1` DATETIMEV2 NOT NULL COMMENT "",
`datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
`datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`largeint_col` largeint COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
for (; i < 1000; i ++) {
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', 'Beijing', ${i}, ${i % 128}, true, ${i}, ${i}, ${i}, ${i}.${i}, ${i}.${i}, 'char${i}', ${i}),
""")
}
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
""")
sql """ INSERT INTO ${tableName} VALUES
${sb.toString()}
"""
qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """
// check outfile
File path = new File(outFilePath)
if (!path.exists()) {
assert path.mkdirs()
} else {
throw new IllegalStateException("""${outFilePath} already exists! """)
}
sql """
SELECT * FROM ${tableName} t ORDER BY user_id INTO OUTFILE "file://${outFilePath}/";
"""
File[] files = path.listFiles()
assert files.length == 1
List<String> outLines = Files.readAllLines(Paths.get(files[0].getAbsolutePath()), StandardCharsets.UTF_8);
List<String> baseLines = Files.readAllLines(Paths.get("""${context.config.dataPath}/export_p0/test_outfile.out"""), StandardCharsets.UTF_8)
for (int j = 0; j < outLines.size(); j ++) {
String[] outLine = outLines.get(j).split("\t")
String[] baseLine = baseLines.get(j + 2).split("\t")
for (int slotId = 0; slotId < outLine.size(); slotId ++) {
// FIXME: Correctness validation for Datetime doesn't work in a right way so we just skip it now
if (outLine[slotId].contains("00:00:00")) {
continue
}
if (baseLine[slotId] == "false") {
assert outLine[slotId] == "0"
} else if (baseLine[slotId] == "true") {
assert outLine[slotId] == "1"
} else {
assert outLine[slotId] == baseLine[slotId]
}
}
}
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
File path = new File(outFilePath)
if (path.exists()) {
for (File f: path.listFiles()) {
f.delete();
}
path.delete();
}
}
// test hll column outfile
try {
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k1` int(11) NOT NULL,
`v1` hll HLL_UNION NOT NULL,
`v2` int(11) SUM NOT NULL
) ENGINE=OLAP
AGGREGATE KEY(`k1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
sql """
insert into ${tableName} values (7483648, hll_hash(7483648), 1), (7483648, hll_hash(7483648), 1),
(706432 , hll_hash(706432 ), 1), (706432 , hll_hash(706432 ), 1)
"""
File path = new File(outFilePath)
if (!path.exists()) {
assert path.mkdirs()
} else {
throw new IllegalStateException("""${outFilePath} already exists! """)
}
sql "set return_object_data_as_binary = false"
sql """
SELECT * FROM ${tableName} t ORDER BY k1, v2 INTO OUTFILE "file://${outFilePath}/" properties("success_file_name" = "SUCCESS")
"""
File[] files = path.listFiles()
assert files.length == 2 // one is outfile, the other is SUCCESS file
File dataFile = files[0].getName().contains("SUCCESS") ? files[1] : files[0];
List<String> outLines = Files.readAllLines(Paths.get(dataFile.getAbsolutePath()), StandardCharsets.UTF_8);
assertEquals(2, outLines.size())
String[] outLine1 = outLines.get(0).split("\t")
assertEquals(3, outLine1.size())
assert outLine1[1] == "\\N"
assert outLines.get(1).split("\t")[1] == "\\N"
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
File path = new File(outFilePath)
if (path.exists()) {
for (File f: path.listFiles()) {
f.delete();
}
path.delete();
}
}
// test parallel output
try {
File path = new File(outFilePath)
if (!path.exists()) {
assert path.mkdirs()
} else {
throw new IllegalStateException("""${outFilePath} already exists! """)
}
sql """drop table if exists select_into_file"""
sql """CREATE TABLE `select_into_file` (
`id` int,
`name` varchar(30)
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);"""
sql """insert into select_into_file values(1, "b"),(2, "z"),(3, "a"),
(4, "c"), (5, "睿"), (6, "多"), (7, "丝"), (8, "test"),
(100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");"""
sql "set enable_parallel_outfile = true;"
sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
} finally {
try_sql("DROP TABLE IF EXISTS select_into_file")
File path = new File(outFilePath)
if (path.exists()) {
for (File f: path.listFiles()) {
f.delete();
}
path.delete();
}
}
}

View File

@ -0,0 +1,135 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
suite("test_outfile_expr") {
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)
strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe")
String command = strBuilder.toString()
def process = command.toString().execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def response = parseJson(out.trim())
assertEquals(response.code, 0)
assertEquals(response.msg, "success")
def configJson = response.data.rows
boolean enableOutfileToLocal = false
for (Object conf: configJson) {
assert conf instanceof Map
if (((Map<String, String>) conf).get("Name").toLowerCase() == "enable_outfile_to_local") {
enableOutfileToLocal = ((Map<String, String>) conf).get("Value").toLowerCase() == "true"
}
}
if (!enableOutfileToLocal) {
logger.warn("Please set enable_outfile_to_local to true to run test_outfile_expr")
return
}
def tableName = "outfile_test_expr"
def uuid = UUID.randomUUID().toString()
def outFilePath = """/tmp/test_outfile_expr_${uuid}"""
try {
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`largeint_col` largeint COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
for (; i < 1000; i ++) {
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', 'Beijing', ${i}, ${i % 128}, true, ${i}, ${i}, ${i}, ${i}.${i}, ${i}.${i}, 'char${i}', ${i}),
""")
}
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
""")
sql """ INSERT INTO ${tableName} VALUES
${sb.toString()}
"""
qt_select_default """ SELECT user_id+1, age+sex, repeat(char_col, 10) FROM ${tableName} t ORDER BY user_id; """
// check outfile
File path = new File(outFilePath)
if (!path.exists()) {
assert path.mkdirs()
} else {
throw new IllegalStateException("""${outFilePath} already exists! """)
}
sql """
SELECT user_id+1, age+sex, repeat(char_col, 10) FROM ${tableName} t ORDER BY user_id INTO OUTFILE "file://${outFilePath}/";
"""
File[] files = path.listFiles()
assert files.length == 1
List<String> outLines = Files.readAllLines(Paths.get(files[0].getAbsolutePath()), StandardCharsets.UTF_8);
List<String> baseLines = Files.readAllLines(Paths.get("""${context.config.dataPath}/export_p0/test_outfile_expr.out"""), StandardCharsets.UTF_8)
for (int j = 0; j < outLines.size(); j ++) {
String[] outLine = outLines.get(j).split("\t")
String[] baseLine = baseLines.get(j + 2).split("\t")
for (int slotId = 0; slotId < outLine.size(); slotId ++) {
// FIXME: Correctness validation for Datetime doesn't work in a right way so we just skip it now
if (outLine[slotId].contains("00:00:00")) {
continue
}
if (baseLine[slotId] == "false") {
assert outLine[slotId] == "0"
} else if (baseLine[slotId] == "true") {
assert outLine[slotId] == "1"
} else {
assert outLine[slotId] == baseLine[slotId]
}
}
}
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
File path = new File(outFilePath)
if (path.exists()) {
for (File f: path.listFiles()) {
f.delete();
}
path.delete();
}
}
}

View File

@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
suite("test_outfile_parquet") {
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
def dbName = "test_outfile_parquet"
sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
sql "USE $dbName"
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)
strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe")
String command = strBuilder.toString()
def process = command.toString().execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def response = parseJson(out.trim())
assertEquals(response.code, 0)
assertEquals(response.msg, "success")
def configJson = response.data.rows
boolean enableOutfileToLocal = false
for (Object conf: configJson) {
assert conf instanceof Map
if (((Map<String, String>) conf).get("Name").toLowerCase() == "enable_outfile_to_local") {
enableOutfileToLocal = ((Map<String, String>) conf).get("Value").toLowerCase() == "true"
}
}
if (!enableOutfileToLocal) {
logger.warn("Please set enable_outfile_to_local to true to run test_outfile")
return
}
def tableName = "outfile_parquet_test"
def tableName2 = "outfile_parquet_test2"
def uuid = UUID.randomUUID().toString()
def outFilePath = """/tmp/test_outfile_parquet_${uuid}"""
try {
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`user_id` INT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`date_1` DATEV2 NOT NULL COMMENT "",
`datetime_1` DATETIMEV2 NOT NULL COMMENT "",
`datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
`datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`largeint_col` int COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
for (; i < 10; i ++) {
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', 'Beijing', ${i}, ${i % 128}, true, ${i}, ${i}, ${i}, ${i}.${i}, ${i}.${i}, 'char${i}', ${i}),
""")
}
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
""")
sql """ INSERT INTO ${tableName} VALUES
${sb.toString()}
"""
qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """
// check outfile
File path = new File(outFilePath)
if (!path.exists()) {
assert path.mkdirs()
} else {
throw new IllegalStateException("""${outFilePath} already exists! """)
}
sql """
SELECT * FROM ${tableName} t ORDER BY user_id INTO OUTFILE "file://${outFilePath}/" FORMAT AS PARQUET;
"""
File[] files = path.listFiles()
assert files.length == 1
sql """ DROP TABLE IF EXISTS ${tableName2} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName2} (
`user_id` INT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`date_1` DATEV2 NOT NULL COMMENT "",
`datetime_1` DATETIMEV2 NOT NULL COMMENT "",
`datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
`datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`largeint_col` int COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
StringBuilder commandBuilder = new StringBuilder()
commandBuilder.append("""curl -v --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}""")
commandBuilder.append(""" -H format:parquet -T """ + files[0].getAbsolutePath() + """ http://${context.config.feHttpAddress}/api/""" + dbName + "/" + tableName2 + "/_stream_load")
command = commandBuilder.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())))
out = process.getText()
logger.info("Run command: command=" + command + ",code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
qt_select_default """ SELECT * FROM ${tableName2} t ORDER BY user_id; """
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
try_sql("DROP TABLE IF EXISTS ${tableName2}")
File path = new File(outFilePath)
if (path.exists()) {
for (File f: path.listFiles()) {
f.delete();
}
path.delete();
}
}
}

View File

@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
suite("test_outfile_separator") {
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)
strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe")
String command = strBuilder.toString()
def process = command.toString().execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def response = parseJson(out.trim())
assertEquals(response.code, 0)
assertEquals(response.msg, "success")
def configJson = response.data.rows
boolean enableOutfileToLocal = false
for (Object conf: configJson) {
assert conf instanceof Map
if (((Map<String, String>) conf).get("Name").toLowerCase() == "enable_outfile_to_local") {
enableOutfileToLocal = ((Map<String, String>) conf).get("Value").toLowerCase() == "true"
}
}
if (!enableOutfileToLocal) {
logger.warn("Please set enable_outfile_to_local to true to run test_outfile_separator")
return
}
def dbName = context.config.getDbNameByFile(context.file)
def tableName = "outfile_test_separator"
def uuid = UUID.randomUUID().toString()
def outFilePath = """/tmp/test_outfile_separator_${uuid}"""
try {
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k1 int, k2 varchar(100)
)
DISTRIBUTED BY HASH(k1) PROPERTIES("replication_num" = "1");
"""
sql """ INSERT INTO ${tableName} VALUES
(1, "abc"), (2, "def");
"""
qt_select_1 """ SELECT * FROM ${tableName} t ORDER BY k1; """
// check outfile
File path = new File(outFilePath)
if (!path.exists()) {
assert path.mkdirs()
} else {
throw new IllegalStateException("""${outFilePath} already exists! """)
}
sql """
SELECT * FROM ${tableName} t INTO OUTFILE "file://${outFilePath}/" properties("column_separator" = "\\\\x01");
"""
File[] files = path.listFiles()
assert files.length == 1
List<String> outLines = Files.readAllLines(Paths.get(files[0].getAbsolutePath()), StandardCharsets.UTF_8);
assert outLines.size() == 2
streamLoad {
db """${dbName}"""
table 'outfile_test_separator'
set 'column_separator', '\\x01'
file files[0].getAbsolutePath()
time 10000 // limit inflight 10s
}
qt_select_2 """ SELECT * FROM ${tableName} t ORDER BY k1; """
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
File path = new File(outFilePath)
if (path.exists()) {
for (File f: path.listFiles()) {
f.delete();
}
path.delete();
}
}
}