From 1645f2e0a77f1a2804a29f71103c6a196e3f03df Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sun, 17 Mar 2024 20:52:44 +0800 Subject: [PATCH] [feature](insert)add hive table sink definition (#31662) (#32347) bp #31662 Co-authored-by: slothever <18522955+wsjz@users.noreply.github.com> --- .../translator/PhysicalPlanTranslator.java | 8 + .../TurnOffPageCacheForInsertIntoSelect.java | 7 + .../apache/doris/nereids/rules/RuleSet.java | 2 + .../apache/doris/nereids/rules/RuleType.java | 2 + .../nereids/rules/analysis/BindSink.java | 10 +- ...lHiveTableSinkToPhysicalHiveTableSink.java | 49 ++++++ .../doris/nereids/trees/plans/PlanType.java | 7 +- .../commands/insert/HiveInsertExecutor.java | 100 +++++++++++ .../insert/InsertIntoTableCommand.java | 5 + .../plans/commands/insert/InsertUtils.java | 9 +- .../plans/logical/LogicalHiveTableSink.java | 160 ++++++++++++++++++ .../plans/physical/PhysicalHiveTableSink.java | 119 +++++++++++++ .../trees/plans/visitor/SinkVisitor.java | 10 ++ .../org/apache/doris/planner/DataSink.java | 7 +- .../apache/doris/planner/HiveTableSink.java | 67 ++++++++ gensrc/thrift/DataSinks.thrift | 63 ++++++- 16 files changed, 617 insertions(+), 8 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 9d24e22934..37d0e25f5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -119,6 +119,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect; import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; @@ -427,6 +428,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor hiveTableSink, + PlanTranslatorContext context) { + PlanFragment rootFragment = hiveTableSink.child().accept(this, context); + return rootFragment; + } + @Override public PlanFragment visitPhysicalFileSink(PhysicalFileSink fileSink, PlanTranslatorContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java index 77955a9411..67f0c1c3ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java @@ -24,6 +24,7 @@ import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.VariableMgr; @@ -52,6 +53,12 @@ public class TurnOffPageCacheForInsertIntoSelect extends PlanPreprocessor { return tableSink; } + @Override + public Plan visitLogicalHiveTableSink(LogicalHiveTableSink tableSink, StatementContext context) { + turnOffPageCache(context); + return tableSink; + } + private void turnOffPageCache(StatementContext context) { SessionVariable sessionVariable = context.getConnectContext().getSessionVariable(); // set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index f2b5091fd3..408b0d7355 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -64,6 +64,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalFileScanToPhysicalFi import org.apache.doris.nereids.rules.implementation.LogicalFileSinkToPhysicalFileSink; import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter; import org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate; +import org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink; import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect; import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan; import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin; @@ -187,6 +188,7 @@ public class RuleSet { .add(new LogicalIntersectToPhysicalIntersect()) .add(new LogicalGenerateToPhysicalGenerate()) .add(new LogicalOlapTableSinkToPhysicalOlapTableSink()) + .add(new LogicalHiveTableSinkToPhysicalHiveTableSink()) .add(new LogicalFileSinkToPhysicalFileSink()) .add(new LogicalResultSinkToPhysicalResultSink()) .add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 4515aaf55f..9650a9147b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -30,6 +30,7 @@ public enum RuleType { // **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. **** BINDING_RESULT_SINK(RuleTypeClass.REWRITE), + BINDING_INSERT_TARGET_EXTERNAL_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_FILE(RuleTypeClass.REWRITE), BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE), @@ -386,6 +387,7 @@ public enum RuleType { LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index 5c8a74f00e..06ad6921de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -68,7 +68,7 @@ import java.util.Optional; import java.util.stream.Collectors; /** - * bind an unbound logicalOlapTableSink represent the target table of an insert command + * bind an unbound logicalTableSink represent the target table of an insert command */ public class BindSink implements AnalysisRuleFactory { @@ -340,6 +340,14 @@ public class BindSink implements AnalysisRuleFactory { fileSink.child().getOutput().stream() .map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()))) + ), + // TODO: bind hive taget table + RuleType.BINDING_INSERT_TARGET_EXTERNAL_TABLE.build( + logicalHiveTableSink().when(s -> s.getOutputExprs().isEmpty()) + .then(hiveTableSink -> hiveTableSink.withOutputExprs( + hiveTableSink.child().getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()))) ) ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java new file mode 100644 index 0000000000..13329a5d55 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; + +import java.util.Optional; + +/** + * Implementation rule that convert logical HiveTableSink to physical HiveTableSink. + */ +public class LogicalHiveTableSinkToPhysicalHiveTableSink extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalHiveTableSink().thenApply(ctx -> { + LogicalHiveTableSink sink = ctx.root; + return new PhysicalHiveTableSink<>( + sink.getDatabase(), + sink.getTargetTable(), + sink.getCols(), + sink.getPartitionIds(), + sink.getOutputExprs(), + Optional.empty(), + sink.getLogicalProperties(), + null, + null, + sink.child()); + }).toRule(RuleType.LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index af27885fce..11a6a7b568 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -80,7 +80,7 @@ public enum PlanType { LOGICAL_WINDOW, // physical plans - // logical relations + // physical relations PHYSICAL_CTE_CONSUMER, PHYSICAL_EMPTY_RELATION, PHYSICAL_ES_SCAN, @@ -92,12 +92,13 @@ public enum PlanType { PHYSICAL_SCHEMA_SCAN, PHYSICAL_TVF_RELATION, - // logical sinks + // physical sinks PHYSICAL_FILE_SINK, PHYSICAL_OLAP_TABLE_SINK, + PHYSICAL_HIVE_TABLE_SINK, PHYSICAL_RESULT_SINK, - // logical others + // physical others PHYSICAL_HASH_AGGREGATE, PHYSICAL_ASSERT_NUM_ROWS, PHYSICAL_CTE_PRODUCER, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java new file mode 100644 index 0000000000..f8bc8f2db4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.HiveTableSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.transaction.TransactionState; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Insert executor for olap table + */ +public class HiveInsertExecutor extends AbstractInsertExecutor { + private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class); + private static final long INVALID_TXN_ID = -1L; + private long txnId = INVALID_TXN_ID; + + /** + * constructor + */ + public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table, + String labelName, NereidsPlanner planner, + Optional insertCtx) { + super(ctx, table, labelName, planner, insertCtx); + } + + public long getTxnId() { + return txnId; + } + + @Override + public void beginTransaction() { + + } + + @Override + protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { + HiveTableSink hiveTableSink = (HiveTableSink) sink; + // PhysicalHiveTableSink physicalHiveTableSink = (PhysicalHiveTableSink) physicalSink; + try { + hiveTableSink.init(); + hiveTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx)); + TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId); + if (state == null) { + throw new AnalysisException("txn does not exist: " + txnId); + } + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e); + } + } + + @Override + protected void beforeExec() { + + } + + @Override + protected void onComplete() throws UserException { + + } + + @Override + protected void onFail(Throwable t) { + + } + + @Override + protected void afterExec(StmtExecutor executor) { + + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 06cd427568..29d96ae4ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.util.ProfileManager.ProfileType; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; @@ -34,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -160,6 +162,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, : false; insertExecutor.getCoordinator().getQueryOptions() .setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); + } else if (physicalSink instanceof PhysicalHiveTableSink) { + HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf; + insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, insertCtx); } else { // TODO: support other table types throw new AnalysisException("insert into command only support olap table"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index ab65b065e3..007fd48ccf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundTableSink; @@ -252,7 +253,13 @@ public class InsertUtils { } } } - + if (table instanceof HMSExternalTable) { + // TODO: check HMSExternalTable + HMSExternalTable hiveTable = (HMSExternalTable) table; + if (hiveTable.isView()) { + throw new AnalysisException("View is not support in hive external table."); + } + } Plan query = unboundTableSink.child(); if (!(query instanceof LogicalInlineTable)) { return plan; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java new file mode 100644 index 0000000000..9d31a39b3e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.hive.HMSExternalDatabase; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.PropagateFuncDeps; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * logical hive table sink for insert command + */ +public class LogicalHiveTableSink extends LogicalSink + implements Sink, PropagateFuncDeps { + // bound data sink + private final HMSExternalDatabase database; + private final HMSExternalTable targetTable; + private final List cols; + private final List partitionIds; + private final DMLCommandType dmlCommandType; + + /** + * constructor + */ + public LogicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable targetTable, List cols, + List partitionIds, List outputExprs, + DMLCommandType dmlCommandType, Optional groupExpression, + Optional logicalProperties, CHILD_TYPE child) { + super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child); + this.database = Objects.requireNonNull(database, "database != null in LogicalHiveTableSink"); + this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalHiveTableSink"); + this.cols = Utils.copyRequiredList(cols); + this.dmlCommandType = dmlCommandType; + this.partitionIds = Utils.copyRequiredList(partitionIds); + } + + public Plan withChildAndUpdateOutput(Plan child) { + List output = child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()); + return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, output, + dmlCommandType, Optional.empty(), Optional.empty(), child); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, "LogicalHiveTableSink only accepts one child"); + return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + } + + public LogicalHiveTableSink withOutputExprs(List outputExprs) { + return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), child()); + } + + public HMSExternalDatabase getDatabase() { + return database; + } + + public HMSExternalTable getTargetTable() { + return targetTable; + } + + public List getCols() { + return cols; + } + + public List getPartitionIds() { + return partitionIds; + } + + public DMLCommandType getDmlCommandType() { + return dmlCommandType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalHiveTableSink that = (LogicalHiveTableSink) o; + return dmlCommandType == that.dmlCommandType + && Objects.equals(database, that.database) + && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) + && Objects.equals(partitionIds, that.partitionIds); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, dmlCommandType); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalHiveTableSink[" + id.asInt() + "]", + "outputExprs", outputExprs, + "database", database.getFullName(), + "targetTable", targetTable.getName(), + "cols", cols, + "partitionIds", partitionIds, + "dmlCommandType", dmlCommandType + ); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLogicalHiveTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java new file mode 100644 index 0000000000..eee55e3c28 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.hive.HMSExternalDatabase; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.statistics.Statistics; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** abstract physical hive sink */ +public class PhysicalHiveTableSink extends PhysicalSink implements Sink { + + private final HMSExternalDatabase database; + private final HMSExternalTable targetTable; + private final List cols; + private final List partitionIds; + + /** + * constructor + */ + public PhysicalHiveTableSink(HMSExternalDatabase database, + HMSExternalTable targetTable, + List cols, + List partitionIds, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + CHILD_TYPE child) { + this(database, targetTable, cols, partitionIds, outputExprs, groupExpression, logicalProperties, + PhysicalProperties.GATHER, null, child); + } + + /** + * constructor + */ + public PhysicalHiveTableSink(HMSExternalDatabase database, + HMSExternalTable targetTable, + List cols, + List partitionIds, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, + Statistics statistics, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, + logicalProperties, physicalProperties, statistics, child); + this.database = Objects.requireNonNull(database, "database != null in PhysicalHiveTableSink"); + this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalHiveTableSink"); + this.cols = Utils.copyRequiredList(cols); + this.partitionIds = Utils.copyRequiredList(partitionIds); + } + + @Override + public Plan withChildren(List children) { + return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, groupExpression, + getLogicalProperties(), physicalProperties, statistics, children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPhysicalHiveTableSink(this, context); + } + + @Override + public List getExpressions() { + return ImmutableList.of(); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + groupExpression, getLogicalProperties(), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + groupExpression, logicalProperties.get(), children.get(0)); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java index fcb0b474e4..b88cd910a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java @@ -22,12 +22,14 @@ import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; @@ -74,6 +76,10 @@ public interface SinkVisitor { return visitLogicalSink(olapTableSink, context); } + default R visitLogicalHiveTableSink(LogicalHiveTableSink hiveTableSink, C context) { + return visitLogicalSink(hiveTableSink, context); + } + default R visitLogicalResultSink(LogicalResultSink logicalResultSink, C context) { return visitLogicalSink(logicalResultSink, context); } @@ -99,6 +105,10 @@ public interface SinkVisitor { return visitPhysicalSink(olapTableSink, context); } + default R visitPhysicalHiveTableSink(PhysicalHiveTableSink hiveTableSink, C context) { + return visitPhysicalSink(hiveTableSink, context); + } + default R visitPhysicalResultSink(PhysicalResultSink physicalResultSink, C context) { return visitPhysicalSink(physicalResultSink, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java index b813093f78..c769bbea78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java @@ -22,8 +22,9 @@ package org.apache.doris.planner; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.odbc.sink.OdbcTableSink; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TExplainLevel; @@ -62,11 +63,13 @@ public abstract class DataSink { public abstract DataPartition getOutputPartition(); - public static DataSink createDataSink(Table table) throws AnalysisException { + public static DataSink createDataSink(TableIf table) throws AnalysisException { if (table instanceof MysqlTable) { return new MysqlTableSink((MysqlTable) table); } else if (table instanceof OdbcTable) { return new OdbcTableSink((OdbcTable) table); + } else if (table instanceof HMSExternalTable) { + return new HiveTableSink((HMSExternalTable) table); } else { throw new AnalysisException("Unknown table type " + table.getType()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java new file mode 100644 index 0000000000..99d0c6b1b0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/DataSink.java +// and modified by Doris + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.thrift.TDataSink; +import org.apache.doris.thrift.TExplainLevel; + +public class HiveTableSink extends DataSink { + + protected TDataSink tDataSink; + + public HiveTableSink(HMSExternalTable table) { + super(); + } + + @Override + public String getExplainString(String prefix, TExplainLevel explainLevel) { + StringBuilder strBuilder = new StringBuilder(); + strBuilder.append(prefix + "HIVE TABLE SINK\n"); + if (explainLevel == TExplainLevel.BRIEF) { + return strBuilder.toString(); + } + // TODO: explain partitions + return strBuilder.toString(); + } + + @Override + protected TDataSink toThrift() { + return tDataSink; + } + + @Override + public PlanNodeId getExchNodeId() { + return null; + } + + @Override + public DataPartition getOutputPartition() { + return DataPartition.RANDOM; + } + + public void init() { + } + + public void complete(Analyzer analyzer) { + } +} diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 602943b420..7c9d5e8f8c 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -277,6 +277,67 @@ struct TOlapTableSink { 23: optional double max_filter_ratio } +struct THiveLocationParams { + 1: optional string write_path + 2: optional string target_path +} + +struct TSortedColumn { + 1: optional string sort_column_name + 2: optional i32 order // asc(1) or desc(0) +} + +struct TBucketingMode { + 1: optional i32 bucket_version +} + +struct THiveBucket { + 1: optional list bucketed_by + 2: optional TBucketingMode bucket_mode + 3: optional i32 bucket_count + 4: optional list sorted_by +} + +enum THiveCompressionType { + SNAPPY = 3, + LZ4 = 4, + ZLIB = 6, + ZSTD = 7, +} + +struct THivePartition { + 1: optional list values + 2: optional THiveLocationParams location + 3: optional PlanNodes.TFileFormatType file_format +} + +struct THiveTableSink { + 1: optional string db_name + 2: optional string table_name + 3: optional list data_column_names + 4: optional list partition_column_names + 5: optional list partitions + 6: optional list buckets + 7: optional PlanNodes.TFileFormatType file_format + 8: optional THiveCompressionType compression_type + 9: optional THiveLocationParams location +} + +enum TUpdateMode { + NEW = 0, // add partition + APPEND = 1, // alter partition + OVERWRITE = 2 // insert overwrite +} + +struct THivePartitionUpdate { + 1: optional string name + 2: optional TUpdateMode update_mode + 3: optional THiveLocationParams location + 4: optional list file_names + 5: optional i64 row_count + 6: optional i64 file_size +} + struct TDataSink { 1: required TDataSinkType type 2: optional TDataStreamSink stream_sink @@ -289,5 +350,5 @@ struct TDataSink { 10: optional TResultFileSink result_file_sink 11: optional TJdbcTableSink jdbc_table_sink 12: optional TMultiCastDataStreamSink multi_cast_stream_sink + 13: optional THiveTableSink hive_table_sink } -