[feature](insert)add hive table sink definition (#31662) (#32347)

bp #31662
Co-authored-by: slothever <18522955+wsjz@users.noreply.github.com>
This commit is contained in:
Mingyu Chen
2024-03-17 20:52:44 +08:00
committed by GitHub
parent 4732aae628
commit 1645f2e0a7
16 changed files with 617 additions and 8 deletions

View File

@ -119,6 +119,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
@ -427,6 +428,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
return rootFragment;
}
@Override
public PlanFragment visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan> hiveTableSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = hiveTableSink.child().accept(this, context);
return rootFragment;
}
@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
PlanTranslatorContext context) {

View File

@ -24,6 +24,7 @@ import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
@ -52,6 +53,12 @@ public class TurnOffPageCacheForInsertIntoSelect extends PlanPreprocessor {
return tableSink;
}
@Override
public Plan visitLogicalHiveTableSink(LogicalHiveTableSink<? extends Plan> tableSink, StatementContext context) {
turnOffPageCache(context);
return tableSink;
}
private void turnOffPageCache(StatementContext context) {
SessionVariable sessionVariable = context.getConnectContext().getSessionVariable();
// set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute

View File

@ -64,6 +64,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalFileScanToPhysicalFi
import org.apache.doris.nereids.rules.implementation.LogicalFileSinkToPhysicalFileSink;
import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
import org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate;
import org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
@ -187,6 +188,7 @@ public class RuleSet {
.add(new LogicalIntersectToPhysicalIntersect())
.add(new LogicalGenerateToPhysicalGenerate())
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())

View File

@ -30,6 +30,7 @@ public enum RuleType {
// **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. ****
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_EXTERNAL_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
@ -386,6 +387,7 @@ public enum RuleType {
LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),

View File

@ -68,7 +68,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
/**
* bind an unbound logicalOlapTableSink represent the target table of an insert command
* bind an unbound logicalTableSink represent the target table of an insert command
*/
public class BindSink implements AnalysisRuleFactory {
@ -340,6 +340,14 @@ public class BindSink implements AnalysisRuleFactory {
fileSink.child().getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList())))
),
// TODO: bind hive taget table
RuleType.BINDING_INSERT_TARGET_EXTERNAL_TABLE.build(
logicalHiveTableSink().when(s -> s.getOutputExprs().isEmpty())
.then(hiveTableSink -> hiveTableSink.withOutputExprs(
hiveTableSink.child().getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList())))
)
);
}

View File

@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import java.util.Optional;
/**
* Implementation rule that convert logical HiveTableSink to physical HiveTableSink.
*/
public class LogicalHiveTableSinkToPhysicalHiveTableSink extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalHiveTableSink().thenApply(ctx -> {
LogicalHiveTableSink<? extends Plan> sink = ctx.root;
return new PhysicalHiveTableSink<>(
sink.getDatabase(),
sink.getTargetTable(),
sink.getCols(),
sink.getPartitionIds(),
sink.getOutputExprs(),
Optional.empty(),
sink.getLogicalProperties(),
null,
null,
sink.child());
}).toRule(RuleType.LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE);
}
}

View File

@ -80,7 +80,7 @@ public enum PlanType {
LOGICAL_WINDOW,
// physical plans
// logical relations
// physical relations
PHYSICAL_CTE_CONSUMER,
PHYSICAL_EMPTY_RELATION,
PHYSICAL_ES_SCAN,
@ -92,12 +92,13 @@ public enum PlanType {
PHYSICAL_SCHEMA_SCAN,
PHYSICAL_TVF_RELATION,
// logical sinks
// physical sinks
PHYSICAL_FILE_SINK,
PHYSICAL_OLAP_TABLE_SINK,
PHYSICAL_HIVE_TABLE_SINK,
PHYSICAL_RESULT_SINK,
// logical others
// physical others
PHYSICAL_HASH_AGGREGATE,
PHYSICAL_ASSERT_NUM_ROWS,
PHYSICAL_CTE_PRODUCER,

View File

@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.HiveTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.transaction.TransactionState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Optional;
/**
* Insert executor for olap table
*/
public class HiveInsertExecutor extends AbstractInsertExecutor {
private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
private long txnId = INVALID_TXN_ID;
/**
* constructor
*/
public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
}
public long getTxnId() {
return txnId;
}
@Override
public void beginTransaction() {
}
@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
HiveTableSink hiveTableSink = (HiveTableSink) sink;
// PhysicalHiveTableSink physicalHiveTableSink = (PhysicalHiveTableSink) physicalSink;
try {
hiveTableSink.init();
hiveTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId);
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
}
@Override
protected void beforeExec() {
}
@Override
protected void onComplete() throws UserException {
}
@Override
protected void onFail(Throwable t) {
}
@Override
protected void afterExec(StmtExecutor executor) {
}
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
@ -34,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@ -160,6 +162,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
: false;
insertExecutor.getCoordinator().getQueryOptions()
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
} else if (physicalSink instanceof PhysicalHiveTableSink) {
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, insertCtx);
} else {
// TODO: support other table types
throw new AnalysisException("insert into command only support olap table");

View File

@ -25,6 +25,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
@ -252,7 +253,13 @@ public class InsertUtils {
}
}
}
if (table instanceof HMSExternalTable) {
// TODO: check HMSExternalTable
HMSExternalTable hiveTable = (HMSExternalTable) table;
if (hiveTable.isView()) {
throw new AnalysisException("View is not support in hive external table.");
}
}
Plan query = unboundTableSink.child();
if (!(query instanceof LogicalInlineTable)) {
return plan;

View File

@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.PropagateFuncDeps;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* logical hive table sink for insert command
*/
public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE>
implements Sink, PropagateFuncDeps {
// bound data sink
private final HMSExternalDatabase database;
private final HMSExternalTable targetTable;
private final List<Column> cols;
private final List<Long> partitionIds;
private final DMLCommandType dmlCommandType;
/**
* constructor
*/
public LogicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable targetTable, List<Column> cols,
List<Long> partitionIds, List<NamedExpression> outputExprs,
DMLCommandType dmlCommandType, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child);
this.database = Objects.requireNonNull(database, "database != null in LogicalHiveTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalHiveTableSink");
this.cols = Utils.copyRequiredList(cols);
this.dmlCommandType = dmlCommandType;
this.partitionIds = Utils.copyRequiredList(partitionIds);
}
public Plan withChildAndUpdateOutput(Plan child) {
List<NamedExpression> output = child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList());
return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, output,
dmlCommandType, Optional.empty(), Optional.empty(), child);
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "LogicalHiveTableSink only accepts one child");
return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
dmlCommandType, Optional.empty(), Optional.empty(), children.get(0));
}
public LogicalHiveTableSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> outputExprs) {
return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
dmlCommandType, Optional.empty(), Optional.empty(), child());
}
public HMSExternalDatabase getDatabase() {
return database;
}
public HMSExternalTable getTargetTable() {
return targetTable;
}
public List<Column> getCols() {
return cols;
}
public List<Long> getPartitionIds() {
return partitionIds;
}
public DMLCommandType getDmlCommandType() {
return dmlCommandType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
LogicalHiveTableSink<?> that = (LogicalHiveTableSink<?>) o;
return dmlCommandType == that.dmlCommandType
&& Objects.equals(database, that.database)
&& Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols)
&& Objects.equals(partitionIds, that.partitionIds);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, dmlCommandType);
}
@Override
public String toString() {
return Utils.toSqlString("LogicalHiveTableSink[" + id.asInt() + "]",
"outputExprs", outputExprs,
"database", database.getFullName(),
"targetTable", targetTable.getName(),
"cols", cols,
"partitionIds", partitionIds,
"dmlCommandType", dmlCommandType
);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalHiveTableSink(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
}

View File

@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/** abstract physical hive sink */
public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalSink<CHILD_TYPE> implements Sink {
private final HMSExternalDatabase database;
private final HMSExternalTable targetTable;
private final List<Column> cols;
private final List<Long> partitionIds;
/**
* constructor
*/
public PhysicalHiveTableSink(HMSExternalDatabase database,
HMSExternalTable targetTable,
List<Column> cols,
List<Long> partitionIds,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
CHILD_TYPE child) {
this(database, targetTable, cols, partitionIds, outputExprs, groupExpression, logicalProperties,
PhysicalProperties.GATHER, null, child);
}
/**
* constructor
*/
public PhysicalHiveTableSink(HMSExternalDatabase database,
HMSExternalTable targetTable,
List<Column> cols,
List<Long> partitionIds,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
PhysicalProperties physicalProperties,
Statistics statistics,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression,
logicalProperties, physicalProperties, statistics, child);
this.database = Objects.requireNonNull(database, "database != null in PhysicalHiveTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalHiveTableSink");
this.cols = Utils.copyRequiredList(cols);
this.partitionIds = Utils.copyRequiredList(partitionIds);
}
@Override
public Plan withChildren(List<Plan> children) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalHiveTableSink(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return ImmutableList.of();
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
groupExpression, getLogicalProperties(), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
groupExpression, logicalProperties.get(), children.get(0));
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
}

View File

@ -22,12 +22,14 @@ import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
@ -74,6 +76,10 @@ public interface SinkVisitor<R, C> {
return visitLogicalSink(olapTableSink, context);
}
default R visitLogicalHiveTableSink(LogicalHiveTableSink<? extends Plan> hiveTableSink, C context) {
return visitLogicalSink(hiveTableSink, context);
}
default R visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, C context) {
return visitLogicalSink(logicalResultSink, context);
}
@ -99,6 +105,10 @@ public interface SinkVisitor<R, C> {
return visitPhysicalSink(olapTableSink, context);
}
default R visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan> hiveTableSink, C context) {
return visitPhysicalSink(hiveTableSink, context);
}
default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, C context) {
return visitPhysicalSink(physicalResultSink, context);
}

View File

@ -22,8 +22,9 @@ package org.apache.doris.planner;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.odbc.sink.OdbcTableSink;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TExplainLevel;
@ -62,11 +63,13 @@ public abstract class DataSink {
public abstract DataPartition getOutputPartition();
public static DataSink createDataSink(Table table) throws AnalysisException {
public static DataSink createDataSink(TableIf table) throws AnalysisException {
if (table instanceof MysqlTable) {
return new MysqlTableSink((MysqlTable) table);
} else if (table instanceof OdbcTable) {
return new OdbcTableSink((OdbcTable) table);
} else if (table instanceof HMSExternalTable) {
return new HiveTableSink((HMSExternalTable) table);
} else {
throw new AnalysisException("Unknown table type " + table.getType());
}

View File

@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/DataSink.java
// and modified by Doris
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TExplainLevel;
public class HiveTableSink extends DataSink {
protected TDataSink tDataSink;
public HiveTableSink(HMSExternalTable table) {
super();
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(prefix + "HIVE TABLE SINK\n");
if (explainLevel == TExplainLevel.BRIEF) {
return strBuilder.toString();
}
// TODO: explain partitions
return strBuilder.toString();
}
@Override
protected TDataSink toThrift() {
return tDataSink;
}
@Override
public PlanNodeId getExchNodeId() {
return null;
}
@Override
public DataPartition getOutputPartition() {
return DataPartition.RANDOM;
}
public void init() {
}
public void complete(Analyzer analyzer) {
}
}