[Refactor](insert) refactor insert command to support other type of table (#31610) (#32345)

bp #31610
This commit is contained in:
Mingyu Chen
2024-03-17 20:46:07 +08:00
committed by GitHub
parent 47019133c0
commit 4732aae628
25 changed files with 930 additions and 796 deletions

View File

@ -55,35 +55,59 @@ import java.util.stream.Collectors;
public interface TableIf {
Logger LOG = LogManager.getLogger(TableIf.class);
void readLock();
default void readLock() {
}
boolean tryReadLock(long timeout, TimeUnit unit);
default boolean tryReadLock(long timeout, TimeUnit unit) {
return true;
}
void readUnlock();
default void readUnlock() {
}
void writeLock();
;
boolean writeLockIfExist();
default void writeLock() {
}
boolean tryWriteLock(long timeout, TimeUnit unit);
default boolean writeLockIfExist() {
return true;
}
void writeUnlock();
default boolean tryWriteLock(long timeout, TimeUnit unit) {
return true;
}
boolean isWriteLockHeldByCurrentThread();
default void writeUnlock() {
}
<E extends Exception> void writeLockOrException(E e) throws E;
default boolean isWriteLockHeldByCurrentThread() {
return true;
}
void writeLockOrDdlException() throws DdlException;
default <E extends Exception> void writeLockOrException(E e) throws E {
}
void writeLockOrMetaException() throws MetaNotFoundException;
default void writeLockOrDdlException() throws DdlException {
}
void writeLockOrAlterCancelException() throws AlterCancelException;
default void writeLockOrMetaException() throws MetaNotFoundException {
}
boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException;
default void writeLockOrAlterCancelException() throws AlterCancelException {
}
<E extends Exception> boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E;
default boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException {
return true;
}
boolean tryWriteLockIfExist(long timeout, TimeUnit unit);
default <E extends Exception> boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E {
return true;
}
default boolean tryWriteLockIfExist(long timeout, TimeUnit unit) {
return true;
}
long getId();

View File

@ -17,7 +17,6 @@
package org.apache.doris.datasource;
import org.apache.doris.alter.AlterCancelException;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@ -25,9 +24,6 @@ import org.apache.doris.catalog.TableAttributes;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.constraint.Constraint;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Util;
@ -55,8 +51,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@ -86,7 +80,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
protected long dbId;
protected boolean objectCreated;
protected ExternalCatalog catalog;
protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
/**
* No args constructor for persist.
@ -132,102 +125,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
}
}
@Override
public void readLock() {
this.rwLock.readLock().lock();
}
@Override
public boolean tryReadLock(long timeout, TimeUnit unit) {
try {
return this.rwLock.readLock().tryLock(timeout, unit);
} catch (InterruptedException e) {
LOG.warn("failed to try read lock at table[" + name + "]", e);
return false;
}
}
@Override
public void readUnlock() {
this.rwLock.readLock().unlock();
}
@Override
public void writeLock() {
this.rwLock.writeLock().lock();
}
@Override
public boolean writeLockIfExist() {
writeLock();
return true;
}
@Override
public boolean tryWriteLock(long timeout, TimeUnit unit) {
try {
return this.rwLock.writeLock().tryLock(timeout, unit);
} catch (InterruptedException e) {
LOG.warn("failed to try write lock at table[" + name + "]", e);
return false;
}
}
@Override
public void writeUnlock() {
this.rwLock.writeLock().unlock();
}
@Override
public boolean isWriteLockHeldByCurrentThread() {
return this.rwLock.writeLock().isHeldByCurrentThread();
}
@Override
public <E extends Exception> void writeLockOrException(E e) throws E {
writeLock();
}
@Override
public void writeLockOrDdlException() throws DdlException {
writeLockOrException(new DdlException("unknown table, tableName=" + name,
ErrorCode.ERR_BAD_TABLE_ERROR));
}
@Override
public void writeLockOrMetaException() throws MetaNotFoundException {
writeLockOrException(new MetaNotFoundException("unknown table, tableName=" + name,
ErrorCode.ERR_BAD_TABLE_ERROR));
}
@Override
public void writeLockOrAlterCancelException() throws AlterCancelException {
writeLockOrException(new AlterCancelException("unknown table, tableName=" + name,
ErrorCode.ERR_BAD_TABLE_ERROR));
}
@Override
public boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException {
return tryWriteLockOrException(timeout, unit, new MetaNotFoundException("unknown table, tableName=" + name,
ErrorCode.ERR_BAD_TABLE_ERROR));
}
@Override
public <E extends Exception> boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E {
if (tryWriteLock(timeout, unit)) {
return true;
}
return false;
}
@Override
public boolean tryWriteLockIfExist(long timeout, TimeUnit unit) {
if (tryWriteLock(timeout, unit)) {
return true;
}
return false;
}
@Override
public long getId() {
return id;
@ -417,7 +314,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
@Override
public void gsonPostProcess() throws IOException {
rwLock = new ReentrantReadWriteLock(true);
objectCreated = false;
}

View File

@ -47,7 +47,7 @@ import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;

View File

@ -28,7 +28,7 @@ import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TCell;

View File

@ -351,7 +351,6 @@ import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CallCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
@ -368,8 +367,6 @@ import org.apache.doris.nereids.trees.plans.commands.DropProcedureCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
@ -406,6 +403,9 @@ import org.apache.doris.nereids.trees.plans.commands.info.RollupDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.SimpleColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.StepPartition;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
@ -542,7 +542,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
if (ConnectContext.get() != null && ConnectContext.get().isTxnModel()) {
command = new BatchInsertIntoTableCommand(sink);
} else {
command = new InsertIntoTableCommand(sink, labelName);
command = new InsertIntoTableCommand(sink, labelName, Optional.empty());
}
}
if (ctx.explain() != null) {

View File

@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.types.CharType;
@ -155,7 +156,7 @@ public class CreateTableCommand extends Command implements ForwardWithSync {
query = new UnboundTableSink<>(createTableInfo.getTableNameParts(), ImmutableList.of(), ImmutableList.of(),
ImmutableList.of(), query);
try {
new InsertIntoTableCommand(query, Optional.empty()).run(ctx, executor);
new InsertIntoTableCommand(query, Optional.empty(), Optional.empty()).run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
handleFallbackFailedCtas(ctx);
}

View File

@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@ -76,7 +77,8 @@ public class DeleteFromUsingCommand extends Command implements ForwardWithSync,
+ " Please check the following session variables: "
+ String.join(", ", SessionVariable.DEBUG_VARIABLES));
}
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty()).run(ctx, executor);
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty()).run(ctx,
executor);
}
/**

View File

@ -1,291 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
/**
* insert into select command implementation
* insert into select command support the grammar: explain? insert into table columns? partitions? hints? query
* InsertIntoTableCommand is a command to represent insert the answer of a query into a table.
* class structure's:
* InsertIntoTableCommand(Query())
* ExplainCommand(Query())
*/
public class InsertIntoTableCommand extends Command implements ForwardWithSync, Explainable {
public static final Logger LOG = LogManager.getLogger(InsertIntoTableCommand.class);
private LogicalPlan logicalQuery;
private Optional<String> labelName;
/**
* When source it's from job scheduler,it will be set.
*/
private long jobId;
private boolean allowAutoPartition;
/**
* constructor
*/
public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName) {
super(PlanType.INSERT_INTO_TABLE_COMMAND);
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
// only insert overwrite will disable it.
this.allowAutoPartition = true;
}
public Optional<String> getLabelName() {
return labelName;
}
public void setLabelName(Optional<String> labelName) {
this.labelName = labelName;
}
public void setJobId(long jobId) {
this.jobId = jobId;
}
public void setAllowAutoPartition(boolean allowAutoPartition) {
this.allowAutoPartition = allowAutoPartition;
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
runInternal(ctx, executor);
}
public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor,
LoadStatistic loadStatistic) throws Exception {
// TODO: add coordinator statistic
runInternal(ctx, executor);
}
private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
} catch (Exception e) {
throw new AnalysisException("failed to set fallback to original planner to true", e);
}
throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner");
}
PhysicalOlapTableSink<?> physicalOlapTableSink;
DataSink sink;
InsertExecutor insertExecutor;
Table targetTable;
TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, ctx);
// should lock target table until we begin transaction.
targetTableIf.readLock();
try {
// 1. process inline table (default values, empty values)
this.logicalQuery = (LogicalPlan) InsertExecutor.normalizePlan(logicalQuery, targetTableIf);
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
executor.setPlanner(planner);
executor.checkBlockRules();
if (ctx.getMysqlChannel() != null) {
ctx.getMysqlChannel().reset();
}
// TODO: support other type table insert into
Optional<PhysicalOlapTableSink<?>> plan = (planner.getPhysicalPlan()
.<Set<PhysicalOlapTableSink<?>>>collect(PhysicalOlapTableSink.class::isInstance)).stream()
.findAny();
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode");
physicalOlapTableSink = plan.get();
targetTable = physicalOlapTableSink.getTargetTable();
// check auth
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), targetTable.getQualifiedDbName(), targetTable.getName(),
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
targetTable.getQualifiedDbName() + ": " + targetTable.getName());
}
sink = planner.getFragments().get(0).getSink();
// group commit
if (analyzeGroupCommit(ctx, sink, physicalOlapTableSink)) {
// handleGroupCommit(ctx, sink, physicalOlapTableSink);
// return;
throw new AnalysisException("group commit is not supported in nereids now");
}
String label = this.labelName.orElse(String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo));
insertExecutor = new InsertExecutor(ctx,
physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), label, planner);
insertExecutor.beginTransaction();
insertExecutor.finalizeSink(planner.getFragments().get(0), sink, physicalOlapTableSink.isPartialUpdate(),
physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT, this.allowAutoPartition);
} finally {
targetTableIf.readUnlock();
}
boolean isEnableMemtableOnSinkNode =
((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()
? insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode() : false;
insertExecutor.getCoordinator().getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
executor.setProfileType(ProfileType.LOAD);
// We exposed @StmtExecutor#cancel as a unified entry point for statement interruption
// so we need to set this here
executor.setCoord(insertExecutor.getCoordinator());
insertExecutor.executeSingleInsertTransaction(executor, jobId);
}
private void handleGroupCommit(ConnectContext ctx, DataSink sink,
PhysicalOlapTableSink<?> physicalOlapTableSink)
throws UserException, RpcException, TException, ExecutionException, InterruptedException {
// TODO we should refactor this to remove rely on UnionNode
List<InternalService.PDataRow> rows = new ArrayList<>();
List<List<Expr>> materializedConstExprLists = ((UnionNode) sink.getFragment()
.getPlanRoot()).getMaterializedConstExprLists();
int filterSize = 0;
for (Slot slot : physicalOlapTableSink.getOutput()) {
if (slot.getName().contains(Column.DELETE_SIGN)
|| slot.getName().contains(Column.VERSION_COL)) {
filterSize += 1;
}
}
for (List<Expr> list : materializedConstExprLists) {
rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
}
GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), null, ctx.queryId(),
ConnectContext.get().getSessionVariable().getGroupCommit());
PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
if (code == TStatusCode.DATA_QUALITY_ERROR) {
LOG.info("group commit insert failed. query id: {}, backend id: {}, status: {}, "
+ "schema version: {}", ctx.queryId(),
groupCommitPlanner.getBackend(), response.getStatus(),
physicalOlapTableSink.getTargetTable().getBaseSchemaVersion());
} else if (code != TStatusCode.OK) {
String errMsg = "group commit insert failed. backend id: "
+ groupCommitPlanner.getBackend().getId() + ", status: "
+ response.getStatus();
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
TransactionStatus txnStatus = TransactionStatus.PREPARE;
String sb = "{'label':'" + response.getLabel() + "', 'status':'" + txnStatus.name()
+ "', 'txnId':'" + response.getTxnId() + "'"
+ "', 'optimizer':'" + "nereids" + "'"
+ "}";
ctx.getState().setOk(response.getLoadedRows(), (int) response.getFilteredRows(), sb);
ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
physicalOlapTableSink.getDatabase().getFullName(), physicalOlapTableSink.getTargetTable().getName(),
txnStatus, response.getLoadedRows(), (int) response.getFilteredRows());
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) response.getLoadedRows());
}
private boolean analyzeGroupCommit(ConnectContext ctx, DataSink sink,
PhysicalOlapTableSink<?> physicalOlapTableSink) {
if (!(sink instanceof OlapTableSink) || !ctx.getSessionVariable().isEnableInsertGroupCommit()
|| ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
return false;
}
OlapTable targetTable = physicalOlapTableSink.getTargetTable();
return ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& !ctx.isTxnModel() && isGroupCommitAvailablePlan(physicalOlapTableSink)
&& physicalOlapTableSink.getPartitionIds().isEmpty() && targetTable.getTableProperty()
.getUseSchemaLightChange() && !targetTable.getQualifiedDbName()
.equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME);
}
private boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<? extends Plan> sink) {
Plan child = sink.child();
if (child instanceof PhysicalDistribute) {
child = child.child(0);
}
return child instanceof OneRowRelation || (child instanceof PhysicalUnion && child.arity() == 0);
}
@Override
public Plan getExplainPlan(ConnectContext ctx) {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
} catch (Exception e) {
throw new AnalysisException("failed to set fallback to original planner to true", e);
}
throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner");
}
return InsertExecutor.normalizePlan(this.logicalQuery, InsertExecutor.getTargetTable(this.logicalQuery, ctx));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitInsertIntoTableCommand(this, context);
}
}

View File

@ -49,6 +49,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@ -130,7 +131,7 @@ public class LoadCommand extends Command implements ForwardWithSync {
profile.getSummaryProfile().setQueryBeginTime();
if (sourceInfos.size() == 1) {
plans = ImmutableList.of(new InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
Optional.of(labelName)));
Optional.of(labelName), Optional.empty()));
} else {
throw new AnalysisException("Multi insert into statements are unsupported.");
}

View File

@ -35,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@ -93,7 +94,8 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty()).run(ctx, executor);
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty()).run(ctx,
executor);
}
/**

View File

@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;

View File

@ -0,0 +1,180 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Optional;
/**
* Abstract insert executor.
* The derived class should implement the abstract method for certain type of target table
*/
public abstract class AbstractInsertExecutor {
private static final Logger LOG = LogManager.getLogger(AbstractInsertExecutor.class);
protected long jobId;
protected final ConnectContext ctx;
protected final Coordinator coordinator;
protected final String labelName;
protected final DatabaseIf database;
protected final TableIf table;
protected final long createTime = System.currentTimeMillis();
protected long loadedRows = 0;
protected int filteredRows = 0;
protected String errMsg = "";
protected Optional<InsertCommandContext> insertCtx;
/**
* Constructor
*/
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
this.ctx = ctx;
this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator());
this.labelName = labelName;
this.table = table;
this.database = table.getDatabase();
this.insertCtx = insertCtx;
}
public Coordinator getCoordinator() {
return coordinator;
}
/**
* begin transaction if necessary
*/
public abstract void beginTransaction();
/**
* finalize sink to complete enough info for sink execution
*/
protected abstract void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink);
/**
* Do something before exec
*/
protected abstract void beforeExec();
/**
* Do something after exec finished
*/
protected abstract void onComplete() throws UserException;
/**
* Do something when exec throw exception
*/
protected abstract void onFail(Throwable t);
/**
* Do something after exec
*/
protected abstract void afterExec(StmtExecutor executor);
protected final void execImpl(StmtExecutor executor, long jobId) throws Exception {
String queryId = DebugUtil.printId(ctx.queryId());
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile());
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator);
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
coordinator.exec();
int execTimeout = ctx.getExecTimeout();
if (LOG.isDebugEnabled()) {
LOG.debug("insert [{}] with query id {} execution timeout is {}", labelName, queryId, execTimeout);
}
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
coordinator.cancel();
if (notTimeout) {
errMsg = coordinator.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("there exists unhealthy backend. "
+ errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
}
}
if (!coordinator.getExecStatus().ok()) {
errMsg = coordinator.getExecStatus().getErrorMsg();
LOG.warn("insert [{}] with query id {} failed, {}", labelName, queryId, errMsg);
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
if (LOG.isDebugEnabled()) {
LOG.debug("insert [{}] with query id {} delta files is {}",
labelName, queryId, coordinator.getDeltaUrls());
}
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
loadedRows = Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
}
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}
}
private boolean checkStrictMode() {
// if in strict mode, insert will fail if there are filtered rows
if (ctx.getSessionVariable().getEnableInsertStrict()) {
if (filteredRows > 0) {
ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
"Insert has filtered data in strict mode, tracking_url=" + coordinator.getTrackingUrl());
return false;
}
}
return true;
}
/**
* execute insert txn for insert into select command.
*/
public void executeSingleInsert(StmtExecutor executor, long jobId) {
beforeExec();
try {
execImpl(executor, jobId);
if (!checkStrictMode()) {
return;
}
onComplete();
} catch (Throwable t) {
onFail(t);
return;
} finally {
executor.updateProfile(true);
QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId());
}
afterExec(executor);
}
}

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
@ -32,6 +32,8 @@ import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
@ -69,15 +71,7 @@ public class BatchInsertIntoTableCommand extends Command implements ForwardWithS
@Override
public Plan getExplainPlan(ConnectContext ctx) throws Exception {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
} catch (Exception e) {
throw new AnalysisException("failed to set fallback to original planner to true", e);
}
throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner");
}
return InsertExecutor.normalizePlan(this.logicalQuery, InsertExecutor.getTargetTable(this.logicalQuery, ctx));
return InsertUtils.getPlanForExplain(ctx, this.logicalQuery);
}
@Override
@ -103,10 +97,10 @@ public class BatchInsertIntoTableCommand extends Command implements ForwardWithS
}
PhysicalOlapTableSink<?> sink;
TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, ctx);
TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
targetTableIf.readLock();
try {
this.logicalQuery = (LogicalPlan) InsertExecutor.normalizePlan(logicalQuery, targetTableIf);
this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf);
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
@ -147,14 +141,14 @@ public class BatchInsertIntoTableCommand extends Command implements ForwardWithS
Optional<PhysicalUnion> union = planner.getPhysicalPlan()
.<Set<PhysicalUnion>>collect(PhysicalUnion.class::isInstance).stream().findAny();
if (union.isPresent()) {
InsertExecutor.executeBatchInsertTransaction(ctx, targetTable.getQualifiedDbName(),
InsertUtils.executeBatchInsertTransaction(ctx, targetTable.getQualifiedDbName(),
targetTable.getName(), targetSchema, union.get().getConstantExprsList());
return;
}
Optional<PhysicalOneRowRelation> oneRowRelation = planner.getPhysicalPlan()
.<Set<PhysicalOneRowRelation>>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny();
if (oneRowRelation.isPresent()) {
InsertExecutor.executeBatchInsertTransaction(ctx, targetTable.getQualifiedDbName(),
InsertUtils.executeBatchInsertTransaction(ctx, targetTable.getQualifiedDbName(),
targetTable.getName(), targetSchema, ImmutableList.of(oneRowRelation.get().getProjects()));
return;
}

View File

@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
/**
* Handle group commit
*/
public class GroupCommitInserter {
public static final Logger LOG = LogManager.getLogger(GroupCommitInserter.class);
/**
* Handle group commit
*/
public static boolean groupCommit(ConnectContext ctx, DataSink sink, PhysicalSink physicalSink) {
PhysicalOlapTableSink<?> olapSink = (PhysicalOlapTableSink<?>) physicalSink;
// TODO: implement group commit
if (canGroupCommit(ctx, sink, olapSink)) {
// handleGroupCommit(ctx, sink, physicalOlapTableSink);
// return;
throw new AnalysisException("group commit is not supported in Nereids now");
}
return false;
}
private static boolean canGroupCommit(ConnectContext ctx, DataSink sink,
PhysicalOlapTableSink<?> physicalOlapTableSink) {
if (!(sink instanceof OlapTableSink) || !ctx.getSessionVariable().isEnableInsertGroupCommit()
|| ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
return false;
}
OlapTable targetTable = physicalOlapTableSink.getTargetTable();
return ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& !ctx.isTxnModel() && isGroupCommitAvailablePlan(physicalOlapTableSink)
&& physicalOlapTableSink.getPartitionIds().isEmpty() && targetTable.getTableProperty()
.getUseSchemaLightChange() && !targetTable.getQualifiedDbName()
.equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME);
}
private static boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<? extends Plan> sink) {
Plan child = sink.child();
if (child instanceof PhysicalDistribute) {
child = child.child(0);
}
return child instanceof OneRowRelation || (child instanceof PhysicalUnion && child.arity() == 0);
}
private void handleGroupCommit(ConnectContext ctx, DataSink sink,
PhysicalOlapTableSink<?> physicalOlapTableSink)
throws UserException, RpcException, TException, ExecutionException, InterruptedException {
// TODO we should refactor this to remove rely on UnionNode
List<InternalService.PDataRow> rows = new ArrayList<>();
List<List<Expr>> materializedConstExprLists = ((UnionNode) sink.getFragment()
.getPlanRoot()).getMaterializedConstExprLists();
int filterSize = 0;
for (Slot slot : physicalOlapTableSink.getOutput()) {
if (slot.getName().contains(Column.DELETE_SIGN)
|| slot.getName().contains(Column.VERSION_COL)) {
filterSize += 1;
}
}
for (List<Expr> list : materializedConstExprLists) {
rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
}
GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), null, ctx.queryId(),
ConnectContext.get().getSessionVariable().getGroupCommit());
PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
if (code == TStatusCode.DATA_QUALITY_ERROR) {
LOG.info("group commit insert failed. query id: {}, backend id: {}, status: {}, "
+ "schema version: {}", ctx.queryId(),
groupCommitPlanner.getBackend(), response.getStatus(),
physicalOlapTableSink.getTargetTable().getBaseSchemaVersion());
} else if (code != TStatusCode.OK) {
String errMsg = "group commit insert failed. backend id: "
+ groupCommitPlanner.getBackend().getId() + ", status: "
+ response.getStatus();
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
TransactionStatus txnStatus = TransactionStatus.PREPARE;
String sb = "{'label':'" + response.getLabel() + "', 'status':'" + txnStatus.name()
+ "', 'txnId':'" + response.getTxnId() + "'"
+ "', 'optimizer':'" + "nereids" + "'"
+ "}";
ctx.getState().setOk(response.getLoadedRows(), (int) response.getFilteredRows(), sb);
ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
physicalOlapTableSink.getDatabase().getFullName(), physicalOlapTableSink.getTargetTable().getName(),
txnStatus, response.getLoadedRows(), (int) response.getFilteredRows());
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) response.getLoadedRows());
}
}

View File

@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
/**
* The context of insert command.
* You can add some fields or methods here if you need in derived classed
*/
public abstract class InsertCommandContext {
}

View File

@ -0,0 +1,190 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.planner.DataSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* insert into select command implementation
* insert into select command support the grammar: explain? insert into table columns? partitions? hints? query
* InsertIntoTableCommand is a command to represent insert the answer of a query into a table.
* class structure's:
* InsertIntoTableCommand(Query())
* ExplainCommand(Query())
*/
public class InsertIntoTableCommand extends Command implements ForwardWithSync, Explainable {
public static final Logger LOG = LogManager.getLogger(InsertIntoTableCommand.class);
private LogicalPlan logicalQuery;
private Optional<String> labelName;
/**
* When source it's from job scheduler,it will be set.
*/
private long jobId;
private Optional<InsertCommandContext> insertCtx;
/**
* constructor
*/
public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName,
Optional<InsertCommandContext> insertCtx) {
super(PlanType.INSERT_INTO_TABLE_COMMAND);
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
this.insertCtx = insertCtx;
}
public Optional<String> getLabelName() {
return labelName;
}
public void setLabelName(Optional<String> labelName) {
this.labelName = labelName;
}
public void setJobId(long jobId) {
this.jobId = jobId;
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
runInternal(ctx, executor);
}
public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor,
LoadStatistic loadStatistic) throws Exception {
// TODO: add coordinator statistic
runInternal(ctx, executor);
}
private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
} catch (Exception e) {
throw new AnalysisException("failed to set fallback to original planner to true", e);
}
throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner");
}
TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
// check auth
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), targetTableIf.getDatabase().getCatalog().getName(),
targetTableIf.getDatabase().getFullName(), targetTableIf.getName(),
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
targetTableIf.getDatabase().getFullName() + "." + targetTableIf.getName());
}
AbstractInsertExecutor insertExecutor;
// should lock target table until we begin transaction.
targetTableIf.readLock();
try {
// 1. process inline table (default values, empty values)
this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf);
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
executor.setPlanner(planner);
executor.checkBlockRules();
if (ctx.getMysqlChannel() != null) {
ctx.getMysqlChannel().reset();
}
Optional<PhysicalOlapTableSink<?>> plan = (planner.getPhysicalPlan()
.<Set<PhysicalOlapTableSink<?>>>collect(PhysicalSink.class::isInstance)).stream()
.findAny();
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain target table");
PhysicalSink physicalSink = plan.get();
DataSink sink = planner.getFragments().get(0).getSink();
String label = this.labelName.orElse(String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo));
if (physicalSink instanceof PhysicalOlapTableSink) {
if (GroupCommitInserter.groupCommit(ctx, sink, physicalSink)) {
return;
}
OlapTable olapTable = (OlapTable) targetTableIf;
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx);
boolean isEnableMemtableOnSinkNode =
olapTable.getTableProperty().getUseSchemaLightChange()
? insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode()
: false;
insertExecutor.getCoordinator().getQueryOptions()
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
} else {
// TODO: support other table types
throw new AnalysisException("insert into command only support olap table");
}
insertExecutor.beginTransaction();
insertExecutor.finalizeSink(planner.getFragments().get(0), sink, physicalSink);
} finally {
targetTableIf.readUnlock();
}
executor.setProfileType(ProfileType.LOAD);
// We exposed @StmtExecutor#cancel as a unified entry point for statement interruption,
// so we need to set this here
executor.setCoord(insertExecutor.getCoordinator());
insertExecutor.executeSingleInsert(executor, jobId);
}
@Override
public Plan getExplainPlan(ConnectContext ctx) {
return InsertUtils.getPlanForExplain(ctx, this.logicalQuery);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitInsertIntoTableCommand(this, context);
}
}

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
@ -34,6 +34,8 @@ import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@ -91,13 +93,13 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
}
throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner");
}
// insert overwrite only support
TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, ctx);
TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
if (!(targetTableIf instanceof OlapTable)) {
throw new AnalysisException("insert into overwrite only support OLAP table."
+ " But current table type is " + targetTableIf.getType());
}
this.logicalQuery = (LogicalPlan) InsertExecutor.normalizePlan(logicalQuery, targetTableIf);
this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf);
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
@ -165,8 +167,9 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
sink.getDMLCommandType(),
(LogicalPlan) (sink.child(0)));
// for overwrite situation, we disable auto create partition.
InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName);
insertCommand.setAllowAutoPartition(false);
OlapInsertCommandContext insertCtx = new OlapInsertCommandContext();
insertCtx.setAllowAutoPartition(false);
InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx));
insertCommand.run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
@ -177,15 +180,7 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
@Override
public Plan getExplainPlan(ConnectContext ctx) {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
} catch (Exception e) {
throw new AnalysisException("failed to set fallback to original planner to true", e);
}
throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner");
}
return InsertExecutor.normalizePlan(this.logicalQuery, InsertExecutor.getTargetTable(this.logicalQuery, ctx));
return InsertUtils.getPlanForExplain(ctx, this.logicalQuery);
}
@Override

View File

@ -15,11 +15,8 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@ -27,15 +24,7 @@ import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
@ -59,328 +48,33 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* transaction wrapper for Nereids
* The helper class for insert operation.
*/
public class InsertExecutor {
private static final Logger LOG = LogManager.getLogger(InsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
private final ConnectContext ctx;
private final Coordinator coordinator;
private final String labelName;
private final Database database;
private final Table table;
private final long createAt = System.currentTimeMillis();
private long loadedRows = 0;
private int filteredRows = 0;
private long txnId = INVALID_TXN_ID;
private TransactionStatus txnStatus = TransactionStatus.ABORTED;
private String errMsg = "";
/**
* constructor
*/
public InsertExecutor(ConnectContext ctx, Database database, Table table,
String labelName, NereidsPlanner planner) {
this.ctx = ctx;
this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator());
this.labelName = labelName;
this.database = database;
this.table = table;
}
public long getTxnId() {
return txnId;
}
/**
* begin transaction if necessary
*/
public void beginTransaction() {
if (!(table instanceof OlapTable)) {
return;
}
try {
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " + e.getMessage(), e);
}
}
/**
* finalize sink to complete enough info for sink execution
*/
public void finalizeSink(PlanFragment fragment, DataSink sink,
boolean isPartialUpdate, boolean isFromInsert, boolean allowAutoPartition) {
if (!(sink instanceof OlapTableSink)) {
return;
}
Preconditions.checkState(table instanceof OlapTable,
"sink is OlapTableSink, but table type is " + table.getType());
OlapTableSink olapTableSink = (OlapTableSink) sink;
boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict()
&& isPartialUpdate
&& isFromInsert;
try {
// TODO refactor this to avoid call legacy planner's function
olapTableSink.init(ctx.queryId(), txnId, database.getId(),
ctx.getExecTimeout(),
ctx.getSessionVariable().getSendBatchParallelism(),
false,
isStrictMode);
olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
if (!allowAutoPartition) {
olapTableSink.setAutoPartition(false);
}
// update
// set schema and partition info for tablet id shuffle exchange
if (fragment.getPlanRoot() instanceof ExchangeNode
&& fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink());
Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get());
dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
database.getId(), olapTableSink.getDstTable(), analyzer));
dataStreamSink.setTabletSinkPartitionParam(olapTableSink.createPartition(
database.getId(), olapTableSink.getDstTable(), analyzer));
dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor());
List<TOlapTableLocationParam> locationParams = olapTableSink
.createLocation(olapTableSink.getDstTable());
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId);
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
state.addTableIndexes((OlapTable) table);
if (isPartialUpdate) {
state.setSchemaForPartialUpdate((OlapTable) table);
}
}
/**
* execute insert txn for insert into select command.
*/
public void executeSingleInsertTransaction(StmtExecutor executor, long jobId) {
String queryId = DebugUtil.printId(ctx.queryId());
LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId);
Throwable throwable = null;
try {
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile());
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator);
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
coordinator.exec();
int execTimeout = ctx.getExecTimeout();
if (LOG.isDebugEnabled()) {
LOG.debug("insert [{}] with query id {} execution timeout is {}", labelName, queryId, execTimeout);
}
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
coordinator.cancel();
if (notTimeout) {
errMsg = coordinator.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("there exists unhealthy backend. "
+ errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
}
}
if (!coordinator.getExecStatus().ok()) {
errMsg = coordinator.getExecStatus().getErrorMsg();
LOG.warn("insert [{}] with query id {} failed, {}", labelName, queryId, errMsg);
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
if (LOG.isDebugEnabled()) {
LOG.debug("insert [{}] with query id {} delta files is {}",
labelName, queryId, coordinator.getDeltaUrls());
}
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
loadedRows = Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
}
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}
// if in strict mode, insert will fail if there are filtered rows
if (ctx.getSessionVariable().getEnableInsertStrict()) {
if (filteredRows > 0) {
ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
"Insert has filtered data in strict mode, tracking_url=" + coordinator.getTrackingUrl());
return;
}
}
if (table.getType() != TableType.OLAP && table.getType() != TableType.MATERIALIZED_VIEW) {
// no need to add load job.
// MySQL table is already being inserted.
ctx.getState().setOk(loadedRows, filteredRows, null);
return;
}
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
try {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
Env.getCurrentGlobalTransactionMgr().abortTransaction(
database.getId(), txnId,
(errMsg == null ? "unknown reason" : errMsg));
} catch (Exception abortTxnException) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier(), abortTxnException);
}
} else if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
database, Lists.newArrayList(table),
txnId,
TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
ctx.getSessionVariable().getInsertVisibleTimeoutMs())) {
txnStatus = TransactionStatus.VISIBLE;
} else {
txnStatus = TransactionStatus.COMMITTED;
}
} catch (Throwable t) {
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
if (txnId != INVALID_TXN_ID) {
try {
Env.getCurrentGlobalTransactionMgr().abortTransaction(
database.getId(), txnId,
t.getMessage() == null ? "unknown reason" : t.getMessage());
} catch (Exception abortTxnException) {
// just print a log if abort txn failed. This failure do not need to pass to user.
// user only concern abort how txn failed.
LOG.warn("insert [{}] with query id {} abort txn {} failed",
labelName, queryId, txnId, abortTxnException);
}
}
if (!Config.using_old_load_usage_pattern) {
// if not using old load usage pattern, error will be returned directly to user
StringBuilder sb = new StringBuilder(t.getMessage());
if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
sb.append(". url: ").append(coordinator.getTrackingUrl());
}
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
return;
}
/*
* If config 'using_old_load_usage_pattern' is true.
* Doris will return a label to user, and user can use this label to check load job's status,
* which exactly like the old insert stmt usage pattern.
*/
throwable = t;
} finally {
executor.updateProfile(true);
QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId());
}
// Go here, which means:
// 1. transaction is finished successfully (COMMITTED or VISIBLE), or
// 2. transaction failed but Config.using_old_load_usage_pattern is true.
// we will record the load job info for these 2 cases
try {
// the statement parsed by Nereids is saved at executor::parsedStmt.
StatementBase statement = executor.getParsedStmt();
UserIdentity userIdentity;
//if we use job scheduler, parse statement will not set user identity,so we need to get it from context
if (null == statement) {
userIdentity = ctx.getCurrentUserIdentity();
} else {
userIdentity = statement.getUserInfo();
}
EtlJobType etlJobType = EtlJobType.INSERT;
if (0 != jobId) {
etlJobType = EtlJobType.INSERT_JOB;
}
if (!Config.enable_nereids_load) {
// just record for loadv2 here
ctx.getEnv().getLoadManager()
.recordFinishedLoadJob(labelName, txnId, database.getFullName(),
table.getId(),
etlJobType, createAt, throwable == null ? "" : throwable.getMessage(),
coordinator.getTrackingUrl(), userIdentity, jobId);
}
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
errMsg = "Record info of insert load with error " + e.getMessage();
}
// {'label':'my_label1', 'status':'visible', 'txnId':'123'}
// {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'}
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name());
sb.append("', 'txnId':'").append(txnId).append("'");
if (table.getType() == TableType.MATERIALIZED_VIEW) {
sb.append("', 'rows':'").append(loadedRows).append("'");
}
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}
sb.append("}");
ctx.getState().setOk(loadedRows, filteredRows, sb.toString());
// set insert result in connection context,
// so that user can use `show insert result` to get info of the last insert operation.
ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(),
txnStatus, loadedRows, filteredRows);
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
}
public class InsertUtils {
/**
* execute insert values in transaction.
@ -670,7 +364,18 @@ public class InsertExecutor {
}
}
public Coordinator getCoordinator() {
return coordinator;
/**
* get plan for explain.
*/
public static Plan getPlanForExplain(ConnectContext ctx, LogicalPlan logicalQuery) {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
} catch (Exception e) {
throw new AnalysisException("failed to set fallback to original planner to true", e);
}
throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner");
}
return InsertUtils.normalizePlan(logicalQuery, InsertUtils.getTargetTable(logicalQuery, ctx));
}
}

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
/**
* For Olap Table
*/
public class OlapInsertCommandContext extends InsertCommandContext {
private boolean allowAutoPartition = true;
public boolean isAllowAutoPartition() {
return allowAutoPartition;
}
public void setAllowAutoPartition(boolean allowAutoPartition) {
this.allowAutoPartition = allowAutoPartition;
}
}

View File

@ -0,0 +1,257 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Optional;
/**
* Insert executor for olap table
*/
public class OlapInsertExecutor extends AbstractInsertExecutor {
private static final Logger LOG = LogManager.getLogger(OlapInsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
private long txnId = INVALID_TXN_ID;
private TransactionStatus txnStatus = TransactionStatus.ABORTED;
/**
* constructor
*/
public OlapInsertExecutor(ConnectContext ctx, Table table,
String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
}
public long getTxnId() {
return txnId;
}
@Override
public void beginTransaction() {
try {
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " + e.getMessage(), e);
}
}
@Override
public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
OlapTableSink olapTableSink = (OlapTableSink) sink;
PhysicalOlapTableSink physicalOlapTableSink = (PhysicalOlapTableSink) physicalSink;
OlapInsertCommandContext olapInsertCtx = (OlapInsertCommandContext) insertCtx.orElse(
new OlapInsertCommandContext());
boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict()
&& physicalOlapTableSink.isPartialUpdate()
&& physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT;
try {
// TODO refactor this to avoid call legacy planner's function
int timeout = ctx.getExecTimeout();
olapTableSink.init(ctx.queryId(), txnId, database.getId(),
timeout,
ctx.getSessionVariable().getSendBatchParallelism(),
false,
isStrictMode);
olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
if (!olapInsertCtx.isAllowAutoPartition()) {
olapTableSink.setAutoPartition(false);
}
// update
// set schema and partition info for tablet id shuffle exchange
if (fragment.getPlanRoot() instanceof ExchangeNode
&& fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink());
Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get());
dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
database.getId(), olapTableSink.getDstTable(), analyzer));
dataStreamSink.setTabletSinkPartitionParam(olapTableSink.createPartition(
database.getId(), olapTableSink.getDstTable(), analyzer));
dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor());
List<TOlapTableLocationParam> locationParams = olapTableSink
.createLocation(olapTableSink.getDstTable());
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId);
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
state.addTableIndexes((OlapTable) table);
if (physicalOlapTableSink.isPartialUpdate()) {
state.setSchemaForPartialUpdate((OlapTable) table);
}
}
@Override
protected void beforeExec() {
String queryId = DebugUtil.printId(ctx.queryId());
LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId);
}
@Override
protected void onComplete() throws UserException {
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
try {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
Env.getCurrentGlobalTransactionMgr().abortTransaction(
database.getId(), txnId,
(errMsg == null ? "unknown reason" : errMsg));
} catch (Exception abortTxnException) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier(), abortTxnException);
}
} else if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
database, Lists.newArrayList((Table) table),
txnId,
TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
ctx.getSessionVariable().getInsertVisibleTimeoutMs())) {
txnStatus = TransactionStatus.VISIBLE;
} else {
txnStatus = TransactionStatus.COMMITTED;
}
}
@Override
protected void onFail(Throwable t) {
errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
String queryId = DebugUtil.printId(ctx.queryId());
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
if (txnId != INVALID_TXN_ID) {
try {
Env.getCurrentGlobalTransactionMgr().abortTransaction(
database.getId(), txnId, errMsg);
} catch (Exception abortTxnException) {
// just print a log if abort txn failed. This failure do not need to pass to user.
// user only concern abort how txn failed.
LOG.warn("insert [{}] with query id {} abort txn {} failed",
labelName, queryId, txnId, abortTxnException);
}
}
StringBuilder sb = new StringBuilder(t.getMessage());
if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
sb.append(". url: ").append(coordinator.getTrackingUrl());
}
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
}
@Override
protected void afterExec(StmtExecutor executor) {
// Go here, which means:
// 1. transaction is finished successfully (COMMITTED or VISIBLE), or
// 2. transaction failed but Config.using_old_load_usage_pattern is true.
// we will record the load job info for these 2 cases
try {
// the statement parsed by Nereids is saved at executor::parsedStmt.
StatementBase statement = executor.getParsedStmt();
UserIdentity userIdentity;
// if we use job scheduler, parse statement will not set user identity,so we need to get it from context
if (null == statement) {
userIdentity = ctx.getCurrentUserIdentity();
} else {
userIdentity = statement.getUserInfo();
}
EtlJobType etlJobType = EtlJobType.INSERT;
if (0 != jobId) {
etlJobType = EtlJobType.INSERT_JOB;
}
if (!Config.enable_nereids_load) {
// just record for loadv2 here
ctx.getEnv().getLoadManager()
.recordFinishedLoadJob(labelName, txnId, database.getFullName(),
table.getId(),
etlJobType, createTime, errMsg,
coordinator.getTrackingUrl(), userIdentity, jobId);
}
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
errMsg = "Record info of insert load with error " + e.getMessage();
}
// {'label':'my_label1', 'status':'visible', 'txnId':'123'}
// {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'}
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name());
sb.append("', 'txnId':'").append(txnId).append("'");
if (table.getType() == TableType.MATERIALIZED_VIEW) {
sb.append("', 'rows':'").append(loadedRows).append("'");
}
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}
sb.append("}");
ctx.getState().setOk(loadedRows, filteredRows, sb.toString());
// set insert result in connection context,
// so that user can use `show insert result` to get info of the last insert operation.
ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(),
txnStatus, loadedRows, filteredRows);
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
}
}

View File

@ -19,7 +19,6 @@ package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CallCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
@ -34,8 +33,6 @@ import org.apache.doris.nereids.trees.plans.commands.DropMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.DropProcedureCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
@ -44,6 +41,9 @@ import org.apache.doris.nereids.trees.plans.commands.ShowConstraintsCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowCreateProcedureCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowProcedureStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
/** CommandVisitor. */
public interface CommandVisitor<R, C> {

View File

@ -128,13 +128,13 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook;
import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.Forward;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback;
import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapScanNode;
@ -2137,22 +2137,12 @@ public class StmtExecutor {
LOG.warn("errors when abort txn", abortTxnException);
}
if (!Config.using_old_load_usage_pattern) {
// if not using old load usage pattern, error will be returned directly to user
StringBuilder sb = new StringBuilder(t.getMessage());
if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) {
sb.append(". url: " + coord.getTrackingUrl());
}
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
return;
StringBuilder sb = new StringBuilder(t.getMessage());
if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) {
sb.append(". url: " + coord.getTrackingUrl());
}
/*
* If config 'using_old_load_usage_pattern' is true.
* Doris will return a label to user, and user can use this label to check load job's status,
* which exactly like the old insert stmt usage pattern.
*/
throwable = t;
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
return;
} finally {
if (coord != null) {
coord.close();