[branch-2.1][improvement](jdbc catalog) support jdbc external catalog insert stmt in nereids (#40902)

pick  (#39813)
This commit is contained in:
zy-kkk
2024-09-18 14:02:20 +08:00
committed by GitHub
parent 7d23a7c587
commit f3a42faf40
21 changed files with 757 additions and 13 deletions

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.analyzer;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Optional;
/**
* Represent an jdbc table sink plan node that has not been bound.
*/
public class UnboundJdbcTableSink<CHILD_TYPE extends Plan> extends UnboundBaseExternalTableSink<CHILD_TYPE> {
public UnboundJdbcTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, DMLCommandType.NONE,
Optional.empty(), Optional.empty(), child);
}
/**
* constructor
*/
public UnboundJdbcTableSink(List<String> nameParts,
List<String> colNames,
List<String> hints,
List<String> partitions,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(nameParts, PlanType.LOGICAL_UNBOUND_JDBC_TABLE_SINK, ImmutableList.of(), groupExpression,
logicalProperties, colNames, dmlCommandType, child, hints, partitions);
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"UnboundJdbcTableSink should have exactly one child");
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitUnboundJdbcTableSink(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.trees.plans.Plan;
@ -78,6 +79,9 @@ public class UnboundTableSinkCreator {
} else if (curCatalog instanceof IcebergExternalCatalog) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof JdbcExternalCatalog) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported.");
}
@ -109,16 +113,12 @@ public class UnboundTableSinkCreator {
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
// TODO: we need to support insert into other catalog
try {
if (ConnectContext.get() != null) {
ConnectContext.get().getSessionVariable().enableFallbackToOriginalPlannerOnce();
}
} catch (Exception e) {
// ignore this.
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof JdbcExternalCatalog) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
throw new AnalysisException(
(isOverwrite ? "insert overwrite" : "insert") + " data to " + curCatalog.getClass().getSimpleName()
+ " is not supported."

View File

@ -57,6 +57,7 @@ import org.apache.doris.datasource.hudi.source.HudiScanNode;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.datasource.jdbc.sink.JdbcTableSink;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
@ -125,6 +126,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
@ -489,6 +491,24 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
return rootFragment;
}
@Override
public PlanFragment visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<? extends Plan> jdbcTableSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = jdbcTableSink.child().accept(this, context);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
List<Column> targetTableColumns = jdbcTableSink.getCols();
List<String> insertCols = targetTableColumns.stream()
.map(Column::getName)
.collect(Collectors.toList());
JdbcTableSink sink = new JdbcTableSink(
((JdbcExternalTable) jdbcTableSink.getTargetTable()).getJdbcTable(),
insertCols
);
rootFragment.setSink(sink);
return rootFragment;
}
@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
PlanTranslatorContext context) {

View File

@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
@ -67,6 +68,13 @@ public class TurnOffPageCacheForInsertIntoSelect extends PlanPreprocessor {
return tableSink;
}
@Override
public Plan visitLogicalJdbcTableSink(
LogicalJdbcTableSink<? extends Plan> tableSink, StatementContext context) {
turnOffPageCache(context);
return tableSink;
}
private void turnOffPageCache(StatementContext context) {
SessionVariable sessionVariable = context.getConnectContext().getSessionVariable();
// set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute

View File

@ -40,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
@ -152,6 +153,14 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
return null;
}
@Override
public Void visitPhysicalJdbcTableSink(
PhysicalJdbcTableSink<? extends Plan> jdbcTableSink, PlanContext context) {
// Always use gather properties for jdbcTableSink
addRequestPropertyToChildren(PhysicalProperties.GATHER);
return null;
}
@Override
public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);

View File

@ -76,6 +76,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalHudiScanToPhysicalHu
import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
import org.apache.doris.nereids.rules.implementation.LogicalJdbcTableSinkToPhysicalJdbcTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToNestedLoopJoin;
import org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit;
@ -202,6 +203,7 @@ public class RuleSet {
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
.add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink())
.add(new LogicalJdbcTableSinkToPhysicalJdbcTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())

View File

@ -32,6 +32,7 @@ public enum RuleType {
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE),
@ -428,6 +429,7 @@ public enum RuleType {
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),

View File

@ -31,10 +31,13 @@ import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -57,6 +60,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@ -108,7 +112,8 @@ public class BindSink implements AnalysisRuleFactory {
// TODO: bind hive taget table
RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)),
RuleType.BINDING_INSERT_ICEBERG_TABLE.build(
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink))
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)),
RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink))
);
}
@ -502,6 +507,64 @@ public class BindSink implements AnalysisRuleFactory {
return boundSink.withChildAndUpdateOutput(fullOutputProject);
}
private Plan bindJdbcTableSink(MatchingContext<UnboundJdbcTableSink<Plan>> ctx) {
UnboundJdbcTableSink<?> sink = ctx.root;
Pair<JdbcExternalDatabase, JdbcExternalTable> pair = bind(ctx.cascadesContext, sink);
JdbcExternalDatabase database = pair.first;
JdbcExternalTable table = pair.second;
LogicalPlan child = ((LogicalPlan) sink.child());
List<Column> bindColumns;
if (sink.getColNames().isEmpty()) {
bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
} else {
bindColumns = sink.getColNames().stream().map(cn -> {
Column column = table.getColumn(cn);
if (column == null) {
throw new AnalysisException(String.format("column %s is not found in table %s",
cn, table.getName()));
}
return column;
}).collect(ImmutableList.toImmutableList());
}
LogicalJdbcTableSink<?> boundSink = new LogicalJdbcTableSink<>(
database,
table,
bindColumns,
child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()),
sink.getDMLCommandType(),
Optional.empty(),
Optional.empty(),
child);
// we need to insert all the columns of the target table
if (boundSink.getCols().size() != child.getOutput().size()) {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
Map<String, NamedExpression> columnToOutput = getJdbcColumnToOutput(bindColumns, child);
// We don't need to insert unmentioned columns, only user specified columns
LogicalProject<?> outputProject = getOutputProjectByCoercion(bindColumns, child, columnToOutput);
return boundSink.withChildAndUpdateOutput(outputProject);
}
private static Map<String, NamedExpression> getJdbcColumnToOutput(
List<Column> bindColumns, LogicalPlan child) {
Map<String, NamedExpression> columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (int i = 0; i < bindColumns.size(); i++) {
Column column = bindColumns.get(i);
NamedExpression outputExpr = child.getOutput().get(i);
Alias output = new Alias(
TypeCoercionUtils.castIfNotSameType(outputExpr, DataType.fromCatalogType(column.getType())),
column.getName()
);
columnToOutput.put(column.getName(), output);
}
return columnToOutput;
}
private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, UnboundTableSink<? extends Plan> sink) {
List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
sink.getNameParts());
@ -545,6 +608,18 @@ public class BindSink implements AnalysisRuleFactory {
throw new AnalysisException("the target table of insert into is not an iceberg table");
}
private Pair<JdbcExternalDatabase, JdbcExternalTable> bind(CascadesContext cascadesContext,
UnboundJdbcTableSink<? extends Plan> sink) {
List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
sink.getNameParts());
Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
cascadesContext.getConnectContext().getEnv());
if (pair.second instanceof JdbcExternalTable) {
return Pair.of(((JdbcExternalDatabase) pair.first), (JdbcExternalTable) pair.second);
}
throw new AnalysisException("the target table of insert into is not an jdbc table");
}
private List<Long> bindPartitionIds(OlapTable table, List<String> partitions, boolean temp) {
return partitions.isEmpty()
? ImmutableList.of()

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import java.util.Optional;
/**
* Implementation rule that convert logical JdbcTableSink to physical JdbcTableSink.
*/
public class LogicalJdbcTableSinkToPhysicalJdbcTableSink extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalJdbcTableSink().thenApply(ctx -> {
LogicalJdbcTableSink<? extends Plan> sink = ctx.root;
return new PhysicalJdbcTableSink<>(
sink.getDatabase(),
sink.getTargetTable(),
sink.getCols(),
sink.getOutputExprs(),
Optional.empty(),
sink.getLogicalProperties(),
null,
null,
sink.child());
}).toRule(RuleType.LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE);
}
}

View File

@ -49,9 +49,11 @@ public enum PlanType {
LOGICAL_OLAP_TABLE_SINK,
LOGICAL_HIVE_TABLE_SINK,
LOGICAL_ICEBERG_TABLE_SINK,
LOGICAL_JDBC_TABLE_SINK,
LOGICAL_RESULT_SINK,
LOGICAL_UNBOUND_OLAP_TABLE_SINK,
LOGICAL_UNBOUND_HIVE_TABLE_SINK,
LOGICAL_UNBOUND_JDBC_TABLE_SINK,
LOGICAL_UNBOUND_RESULT_SINK,
// logical others
@ -103,6 +105,7 @@ public enum PlanType {
PHYSICAL_OLAP_TABLE_SINK,
PHYSICAL_HIVE_TABLE_SINK,
PHYSICAL_ICEBERG_TABLE_SINK,
PHYSICAL_JDBC_TABLE_SINK,
PHYSICAL_RESULT_SINK,
// physical others

View File

@ -46,8 +46,8 @@ import java.util.Optional;
* Insert executor for base external table
*/
public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExecutor {
protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG = LogManager.getLogger(BaseExternalTableInsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
protected long txnId = INVALID_TXN_ID;
protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
protected final TransactionManager transactionManager;

View File

@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
@ -25,12 +26,14 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
@ -40,8 +43,11 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.planner.DataSink;
import org.apache.doris.qe.ConnectContext;
@ -52,6 +58,7 @@ import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -192,9 +199,27 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf;
insertExecutor = new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner,
Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))), emptyInsert);
} else if (physicalSink instanceof PhysicalJdbcTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
List<Column> cols = ((PhysicalJdbcTableSink<?>) physicalSink).getCols();
List<Slot> slots = ((PhysicalJdbcTableSink<?>) physicalSink).getOutput();
if (physicalSink.children().size() == 1) {
if (physicalSink.child(0) instanceof PhysicalOneRowRelation
|| physicalSink.child(0) instanceof PhysicalUnion) {
for (int i = 0; i < cols.size(); i++) {
if (!(cols.get(i).isAllowNull()) && slots.get(i).nullable()) {
throw new AnalysisException("Column `" + cols.get(i).getName()
+ "` is not nullable, but the inserted value is nullable.");
}
}
}
}
JdbcExternalTable jdbcExternalTable = (JdbcExternalTable) targetTableIf;
insertExecutor = new JdbcInsertExecutor(ctx, jdbcExternalTable, label, planner,
Optional.of(insertCtx.orElse((new JdbcInsertCommandContext()))), emptyInsert);
} else {
// TODO: support other table types
throw new AnalysisException("insert into command only support [olap, hive, iceberg] table");
throw new AnalysisException("insert into command only support [olap, hive, iceberg, jdbc] table");
}
if (!insertExecutor.isEmptyInsert()) {
insertExecutor.beginTransaction();

View File

@ -27,9 +27,11 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -260,6 +262,11 @@ public class InsertUtils {
throw new AnalysisException("View is not support in hive external table.");
}
}
if (table instanceof JdbcExternalTable) {
// todo:
// For JDBC External Table, we always allow certain columns to be missing during insertion
// Specific check for non-nullable columns only if insertion is direct VALUES or SELECT constants
}
if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS) {
if (unboundLogicalSink instanceof UnboundTableSink
&& ((UnboundTableSink<? extends Plan>) unboundLogicalSink).isPartialUpdate()) {
@ -383,6 +390,8 @@ public class InsertUtils {
unboundTableSink = (UnboundHiveTableSink<? extends Plan>) plan;
} else if (plan instanceof UnboundIcebergTableSink) {
unboundTableSink = (UnboundIcebergTableSink<? extends Plan>) plan;
} else if (plan instanceof UnboundJdbcTableSink) {
unboundTableSink = (UnboundJdbcTableSink<? extends Plan>) plan;
} else {
throw new AnalysisException("the root of plan should be"
+ " [UnboundTableSink, UnboundHiveTableSink, UnboundIcebergTableSink],"

View File

@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
/**
* For iceberg External Table
*/
public class JdbcInsertCommandContext extends BaseExternalTableInsertCommandContext {
}

View File

@ -0,0 +1,113 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.transaction.TransactionType;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Optional;
/**
* Insert executor for jdbc table
*/
public class JdbcInsertExecutor extends BaseExternalTableInsertExecutor {
private static final Logger LOG = LogManager.getLogger(JdbcInsertExecutor.class);
/**
* constructor
*/
public JdbcInsertExecutor(ConnectContext ctx, JdbcExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx,
boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}
@Override
public void beginTransaction() {
// do nothing
}
@Override
protected void onComplete() throws UserException {
if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
} else {
summaryProfile.ifPresent(profile -> profile.setTransactionBeginTime(transactionType()));
summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
txnStatus = TransactionStatus.COMMITTED;
}
}
@Override
protected void onFail(Throwable t) {
errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
String queryId = DebugUtil.printId(ctx.queryId());
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
StringBuilder sb = new StringBuilder(t.getMessage());
if (txnId != INVALID_TXN_ID) {
LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId);
if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
sb.append(". url: ").append(coordinator.getTrackingUrl());
}
}
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
}
@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
// do nothing
}
@Override
protected void setCollectCommitInfoFunc() {
// do nothing
}
@Override
protected void doBeforeCommit() throws UserException {
// do nothing
}
@Override
protected TransactionType transactionType() {
return TransactionType.JDBC;
}
@Override
protected void beforeExec() {
String queryId = DebugUtil.printId(ctx.queryId());
LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId);
}
}

View File

@ -0,0 +1,151 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.PropagateFuncDeps;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* logical jdbc table sink for insert command
*/
public class LogicalJdbcTableSink<CHILD_TYPE extends Plan> extends LogicalTableSink<CHILD_TYPE>
implements Sink, PropagateFuncDeps {
// bound data sink
private final JdbcExternalDatabase database;
private final JdbcExternalTable targetTable;
private final DMLCommandType dmlCommandType;
/**
* constructor
*/
public LogicalJdbcTableSink(JdbcExternalDatabase database,
JdbcExternalTable targetTable,
List<Column> cols,
List<NamedExpression> outputExprs,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(PlanType.LOGICAL_JDBC_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child);
this.database = Objects.requireNonNull(database, "database != null in LogicalJdbcTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalJdbcTableSink");
this.dmlCommandType = dmlCommandType;
}
public Plan withChildAndUpdateOutput(Plan child) {
List<NamedExpression> output = child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList());
return new LogicalJdbcTableSink<>(database, targetTable, cols, output,
dmlCommandType, Optional.empty(), Optional.empty(), child);
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "LogicalJdbcTableSink only accepts one child");
return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, Optional.empty(), Optional.empty(), children.get(0));
}
@Override
public LogicalSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> outputExprs) {
return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, Optional.empty(), Optional.empty(), child());
}
public JdbcExternalDatabase getDatabase() {
return database;
}
public JdbcExternalTable getTargetTable() {
return targetTable;
}
public DMLCommandType getDmlCommandType() {
return dmlCommandType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof LogicalJdbcTableSink)) {
return false;
}
if (!super.equals(o)) {
return false;
}
LogicalJdbcTableSink<?> that = (LogicalJdbcTableSink<?>) o;
return dmlCommandType == that.dmlCommandType
&& Objects.equals(database, that.database)
&& Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), database, targetTable, dmlCommandType);
}
@Override
public String toString() {
return Utils.toSqlString("LogicalJdbcTableSink[" + id.asInt() + "]",
"outputExprs", outputExprs,
"database", database.getFullName(),
"targetTable", targetTable.getName(),
"cols", cols,
"dmlCommandType", dmlCommandType
);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalJdbcTableSink(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
}

View File

@ -71,6 +71,10 @@ public abstract class PhysicalBaseExternalTableSink<CHILD_TYPE extends Plan> ext
return targetTable;
}
public List<Column> getCols() {
return cols;
}
@Override
public List<? extends Expression> getExpressions() {
return ImmutableList.of();

View File

@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.statistics.Statistics;
import java.util.List;
import java.util.Optional;
/** physical jdbc sink */
public class PhysicalJdbcTableSink<CHILD_TYPE extends Plan> extends PhysicalBaseExternalTableSink<CHILD_TYPE> {
/**
* constructor
*/
public PhysicalJdbcTableSink(JdbcExternalDatabase database,
JdbcExternalTable targetTable,
List<Column> cols,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
CHILD_TYPE child) {
this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties,
PhysicalProperties.GATHER, null, child);
}
/**
* constructor
*/
public PhysicalJdbcTableSink(JdbcExternalDatabase database,
JdbcExternalTable targetTable,
List<Column> cols,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
PhysicalProperties physicalProperties,
Statistics statistics,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_JDBC_TABLE_SINK, database, targetTable, cols, outputExprs, groupExpression,
logicalProperties, physicalProperties, statistics, child);
}
@Override
public Plan withChildren(List<Plan> children) {
return new PhysicalJdbcTableSink<>(
(JdbcExternalDatabase) database, (JdbcExternalTable) targetTable,
cols, outputExprs, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalJdbcTableSink(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalJdbcTableSink<>(
(JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs,
groupExpression, getLogicalProperties(), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalJdbcTableSink<>(
(JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs,
groupExpression, logicalProperties.get(), children.get(0));
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalJdbcTableSink<>(
(JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
public PhysicalProperties getRequirePhysicalProperties() {
// Since JDBC tables do not have partitioning, return a default physical property.
// GATHER implies that all data is gathered to a single location, which is a common requirement for JDBC sinks.
return PhysicalProperties.GATHER;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.plans.Plan;
@ -26,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResul
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
@ -34,6 +36,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeRes
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
@ -68,6 +71,10 @@ public interface SinkVisitor<R, C> {
return visitLogicalSink(unboundTableSink, context);
}
default R visitUnboundJdbcTableSink(UnboundJdbcTableSink<? extends Plan> unboundTableSink, C context) {
return visitLogicalSink(unboundTableSink, context);
}
default R visitUnboundResultSink(UnboundResultSink<? extends Plan> unboundResultSink, C context) {
return visitLogicalSink(unboundResultSink, context);
}
@ -96,6 +103,10 @@ public interface SinkVisitor<R, C> {
return visitLogicalTableSink(icebergTableSink, context);
}
default R visitLogicalJdbcTableSink(LogicalJdbcTableSink<? extends Plan> jdbcTableSink, C context) {
return visitLogicalTableSink(jdbcTableSink, context);
}
default R visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, C context) {
return visitLogicalSink(logicalResultSink, context);
}
@ -129,6 +140,10 @@ public interface SinkVisitor<R, C> {
return visitPhysicalTableSink(icebergTableSink, context);
}
default R visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<? extends Plan> jdbcTableSink, C context) {
return visitPhysicalTableSink(jdbcTableSink, context);
}
default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, C context) {
return visitPhysicalSink(physicalResultSink, context);
}

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.transaction;
import org.apache.doris.common.UserException;
public class JdbcTransactionManager implements TransactionManager {
@Override
public long begin() {
return 0;
}
@Override
public void commit(long id) throws UserException {
}
@Override
public void rollback(long id) {
}
@Override
public Transaction getTransaction(long id) {
return null;
}
}

View File

@ -20,5 +20,6 @@ package org.apache.doris.transaction;
public enum TransactionType {
UNKNOWN,
HMS,
ICEBERG
ICEBERG,
JDBC
}