[feature](nereids) Support group commit insert (#26075)
This commit is contained in:
@ -50,24 +50,14 @@ import org.apache.doris.planner.DataSink;
|
||||
import org.apache.doris.planner.ExportSink;
|
||||
import org.apache.doris.planner.GroupCommitBlockSink;
|
||||
import org.apache.doris.planner.GroupCommitOlapTableSink;
|
||||
import org.apache.doris.planner.GroupCommitPlanner;
|
||||
import org.apache.doris.planner.OlapTableSink;
|
||||
import org.apache.doris.planner.StreamLoadPlanner;
|
||||
import org.apache.doris.planner.external.jdbc.JdbcTableSink;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.rewrite.ExprRewriter;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.tablefunction.GroupCommitTableValuedFunction;
|
||||
import org.apache.doris.task.StreamLoadTask;
|
||||
import org.apache.doris.thrift.TExecPlanFragmentParams;
|
||||
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
|
||||
import org.apache.doris.thrift.TFileCompressType;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.TMergeType;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
import org.apache.doris.thrift.TScanRangeParams;
|
||||
import org.apache.doris.thrift.TStreamLoadPutRequest;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
|
||||
@ -84,10 +74,8 @@ import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.TSerializer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -167,6 +155,7 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
// true if be generates an insert from group commit tvf stmt and executes to load data
|
||||
public boolean isGroupCommitTvf = false;
|
||||
public boolean isGroupCommitStreamLoadSql = false;
|
||||
private GroupCommitPlanner groupCommitPlanner;
|
||||
|
||||
private boolean isFromDeleteOrUpdateStmt = false;
|
||||
|
||||
@ -1134,10 +1123,10 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
return isGroupCommit;
|
||||
}
|
||||
|
||||
public void planForGroupCommit(TUniqueId queryId) throws UserException, TException {
|
||||
public GroupCommitPlanner planForGroupCommit(TUniqueId queryId) throws UserException, TException {
|
||||
OlapTable olapTable = (OlapTable) getTargetTable();
|
||||
if (execPlanFragmentParamsBytes != null && olapTable.getBaseSchemaVersion() == baseSchemaVersion) {
|
||||
return;
|
||||
return groupCommitPlanner;
|
||||
}
|
||||
if (!targetColumns.isEmpty()) {
|
||||
Analyzer analyzerTmp = analyzer;
|
||||
@ -1145,45 +1134,11 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
this.analyzer = analyzerTmp;
|
||||
}
|
||||
analyzeSubquery(analyzer, true);
|
||||
TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest();
|
||||
if (targetColumnNames != null) {
|
||||
streamLoadPutRequest.setColumns(String.join(",", targetColumnNames));
|
||||
if (targetColumnNames.stream().anyMatch(col -> col.equalsIgnoreCase(Column.SEQUENCE_COL))) {
|
||||
streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL);
|
||||
}
|
||||
}
|
||||
streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1)
|
||||
.setTbl(getTbl())
|
||||
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
|
||||
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
|
||||
.setGroupCommit(true);
|
||||
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
|
||||
StreamLoadPlanner planner = new StreamLoadPlanner((Database) getDbObj(), olapTable, streamLoadTask);
|
||||
// Will using load id as query id in fragment
|
||||
TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
|
||||
for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) {
|
||||
for (TScanRangeParams scanRangeParams : entry.getValue()) {
|
||||
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
|
||||
TFileFormatType.FORMAT_PROTO);
|
||||
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
|
||||
TFileCompressType.PLAIN);
|
||||
}
|
||||
}
|
||||
List<TScanRangeParams> scanRangeParams = tRequest.params.per_node_scan_ranges.values().stream()
|
||||
.flatMap(Collection::stream).collect(Collectors.toList());
|
||||
Preconditions.checkState(scanRangeParams.size() == 1);
|
||||
groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable, targetColumnNames, queryId);
|
||||
// save plan message to be reused for prepare stmt
|
||||
loadId = queryId;
|
||||
baseSchemaVersion = olapTable.getBaseSchemaVersion();
|
||||
// see BackendServiceProxy#execPlanFragmentsAsync
|
||||
TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList();
|
||||
paramsList.addToParamsList(tRequest);
|
||||
execPlanFragmentParamsBytes = ByteString.copyFrom(new TSerializer().serialize(paramsList));
|
||||
}
|
||||
|
||||
public InternalService.PExecPlanFragmentRequest getExecPlanFragmentRequest() {
|
||||
return InternalService.PExecPlanFragmentRequest.newBuilder().setRequest(execPlanFragmentParamsBytes)
|
||||
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build();
|
||||
return groupCommitPlanner;
|
||||
}
|
||||
|
||||
public TUniqueId getLoadId() {
|
||||
|
||||
@ -28,7 +28,6 @@ import org.apache.doris.nereids.rules.analysis.CheckAfterRewrite;
|
||||
import org.apache.doris.nereids.rules.analysis.EliminateGroupByConstant;
|
||||
import org.apache.doris.nereids.rules.analysis.LogicalSubQueryAliasToLogicalProject;
|
||||
import org.apache.doris.nereids.rules.analysis.NormalizeAggregate;
|
||||
import org.apache.doris.nereids.rules.analysis.RejectGroupCommitInsert;
|
||||
import org.apache.doris.nereids.rules.expression.CheckLegalityAfterRewrite;
|
||||
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
|
||||
import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
|
||||
@ -269,9 +268,6 @@ public class Rewriter extends AbstractBatchJobExecutor {
|
||||
topDown(new BuildAggForUnion())
|
||||
),
|
||||
|
||||
// TODO remove it after Nereids support group commit insert
|
||||
topDown(new RejectGroupCommitInsert()),
|
||||
|
||||
// topic("Distinct",
|
||||
// costBased(custom(RuleType.PUSH_DOWN_DISTINCT_THROUGH_JOIN, PushdownDistinctThroughJoin::new))
|
||||
// ),
|
||||
|
||||
@ -77,8 +77,6 @@ public enum RuleType {
|
||||
ANALYZE_CTE(RuleTypeClass.REWRITE),
|
||||
RELATION_AUTHENTICATION(RuleTypeClass.VALIDATION),
|
||||
|
||||
REJECT_GROUP_COMMIT_INSERT(RuleTypeClass.REWRITE),
|
||||
|
||||
ADJUST_NULLABLE_FOR_PROJECT_SLOT(RuleTypeClass.REWRITE),
|
||||
ADJUST_NULLABLE_FOR_AGGREGATE_SLOT(RuleTypeClass.REWRITE),
|
||||
ADJUST_NULLABLE_FOR_HAVING_SLOT(RuleTypeClass.REWRITE),
|
||||
|
||||
@ -1,53 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.rules.analysis;
|
||||
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* reject group commit insert added by PR <a href="https://github.com/apache/doris/pull/22829/files">#22829</a>
|
||||
*/
|
||||
public class RejectGroupCommitInsert implements RewriteRuleFactory {
|
||||
|
||||
@Override
|
||||
public List<Rule> buildRules() {
|
||||
return ImmutableList.of(
|
||||
logicalOlapTableSink(logicalOneRowRelation())
|
||||
.thenApply(ctx -> {
|
||||
if (ctx.connectContext.getSessionVariable().enableInsertGroupCommit) {
|
||||
throw new AnalysisException("Nereids do not support group commit now.");
|
||||
}
|
||||
return null;
|
||||
}).toRule(RuleType.REJECT_GROUP_COMMIT_INSERT),
|
||||
logicalOlapTableSink(logicalUnion().when(u -> u.arity() == 0))
|
||||
.thenApply(ctx -> {
|
||||
if (ctx.connectContext.getSessionVariable().enableInsertGroupCommit) {
|
||||
throw new AnalysisException("Nereids do not support group commit now.");
|
||||
}
|
||||
return null;
|
||||
}).toRule(RuleType.REJECT_GROUP_COMMIT_INSERT)
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -22,9 +22,11 @@ import org.apache.doris.analysis.AlterClause;
|
||||
import org.apache.doris.analysis.AlterTableStmt;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.DropPartitionClause;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.PartitionNames;
|
||||
import org.apache.doris.analysis.ReplacePartitionClause;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
@ -39,6 +41,7 @@ import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
|
||||
import org.apache.doris.nereids.trees.TreeNode;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.plans.Explainable;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
@ -46,12 +49,19 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.nereids.txn.Transaction;
|
||||
import org.apache.doris.planner.GroupCommitPlanner;
|
||||
import org.apache.doris.planner.OlapTableSink;
|
||||
import org.apache.doris.planner.UnionNode;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.DdlExecutor;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.rpc.RpcException;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionStatus;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
@ -59,6 +69,7 @@ import com.google.common.collect.Lists;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
@ -68,6 +79,8 @@ import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* insert into select command implementation
|
||||
@ -152,7 +165,13 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
}
|
||||
|
||||
OlapTableSink sink = ((OlapTableSink) planner.getFragments().get(0).getSink());
|
||||
|
||||
if (ctx.getSessionVariable().enableInsertGroupCommit) {
|
||||
// group commit
|
||||
if (analyzeGroupCommit(sink, physicalOlapTableSink)) {
|
||||
handleGroupCommit(ctx, sink, physicalOlapTableSink);
|
||||
return;
|
||||
}
|
||||
}
|
||||
Preconditions.checkArgument(!isTxnBegin, "an insert command cannot create more than one txn");
|
||||
Transaction txn = new Transaction(ctx,
|
||||
physicalOlapTableSink.getDatabase(),
|
||||
@ -352,6 +371,63 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
}
|
||||
}
|
||||
|
||||
private void handleGroupCommit(ConnectContext ctx, OlapTableSink sink,
|
||||
PhysicalOlapTableSink<?> physicalOlapTableSink)
|
||||
throws UserException, RpcException, TException, ExecutionException, InterruptedException {
|
||||
|
||||
List<InternalService.PDataRow> rows = new ArrayList<>();
|
||||
List<List<Expr>> materializedConstExprLists = ((UnionNode) sink.getFragment().getPlanRoot())
|
||||
.getMaterializedConstExprLists();
|
||||
|
||||
int filterSize = 0;
|
||||
for (Slot slot : physicalOlapTableSink.getOutput()) {
|
||||
if (slot.getName().contains(Column.DELETE_SIGN)
|
||||
|| slot.getName().contains(Column.VERSION_COL)) {
|
||||
filterSize += 1;
|
||||
}
|
||||
}
|
||||
for (List<Expr> list : materializedConstExprLists) {
|
||||
rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
|
||||
}
|
||||
GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
|
||||
physicalOlapTableSink.getTargetTable(), null, ctx.queryId());
|
||||
Future<PGroupCommitInsertResponse> future = groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
|
||||
PGroupCommitInsertResponse response = future.get();
|
||||
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
|
||||
if (code == TStatusCode.DATA_QUALITY_ERROR) {
|
||||
LOG.info("group commit insert failed. query id: {}, backend id: {}, status: {}, "
|
||||
+ "schema version: {}", ctx.queryId(),
|
||||
groupCommitPlanner.getBackend(), response.getStatus(),
|
||||
physicalOlapTableSink.getTargetTable().getBaseSchemaVersion());
|
||||
} else if (code != TStatusCode.OK) {
|
||||
String errMsg = "group commit insert failed. backend id: "
|
||||
+ groupCommitPlanner.getBackend().getId() + ", status: "
|
||||
+ response.getStatus();
|
||||
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
|
||||
}
|
||||
TransactionStatus txnStatus = TransactionStatus.PREPARE;
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("{'label':'").append(response.getLabel()).append("', 'status':'").append(txnStatus.name());
|
||||
sb.append("', 'txnId':'").append(response.getTxnId()).append("'");
|
||||
sb.append("', 'optimizer':'").append("nereids").append("'");
|
||||
sb.append("}");
|
||||
|
||||
ctx.getState().setOk(response.getLoadedRows(), (int) response.getFilteredRows(), sb.toString());
|
||||
ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
|
||||
physicalOlapTableSink.getDatabase().getFullName(), physicalOlapTableSink.getTargetTable().getName(),
|
||||
txnStatus, response.getLoadedRows(), (int) response.getFilteredRows());
|
||||
// update it, so that user can get loaded rows in fe.audit.log
|
||||
ctx.updateReturnRows((int) response.getLoadedRows());
|
||||
}
|
||||
|
||||
private boolean analyzeGroupCommit(OlapTableSink sink, PhysicalOlapTableSink<?> physicalOlapTableSink) {
|
||||
return ConnectContext.get().getSessionVariable().enableInsertGroupCommit
|
||||
&& physicalOlapTableSink.getTargetTable() instanceof OlapTable
|
||||
&& !ConnectContext.get().isTxnModel()
|
||||
&& sink.getFragment().getPlanRoot() instanceof UnionNode
|
||||
&& physicalOlapTableSink.getPartitionIds().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan getExplainPlan(ConnectContext ctx) {
|
||||
return this.logicalQuery;
|
||||
|
||||
@ -0,0 +1,196 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
|
||||
import org.apache.doris.analysis.CastExpr;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.NullLiteral;
|
||||
import org.apache.doris.catalog.ArrayType;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
|
||||
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
|
||||
import org.apache.doris.proto.Types;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.rpc.BackendServiceProxy;
|
||||
import org.apache.doris.rpc.RpcException;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.task.StreamLoadTask;
|
||||
import org.apache.doris.thrift.TExecPlanFragmentParams;
|
||||
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
|
||||
import org.apache.doris.thrift.TFileCompressType;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.TMergeType;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TScanRangeParams;
|
||||
import org.apache.doris.thrift.TStreamLoadPutRequest;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.ByteString;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.TSerializer;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
// Used to generate a plan fragment for a group commit
|
||||
// we only support OlapTable now.
|
||||
public class GroupCommitPlanner {
|
||||
private static final Logger LOG = LogManager.getLogger(GroupCommitPlanner.class);
|
||||
|
||||
private Database db;
|
||||
private OlapTable table;
|
||||
private TUniqueId loadId;
|
||||
private Backend backend;
|
||||
private TExecPlanFragmentParamsList paramsList;
|
||||
private ByteString execPlanFragmentParamsBytes;
|
||||
|
||||
public GroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, TUniqueId queryId)
|
||||
throws UserException, TException {
|
||||
this.db = db;
|
||||
this.table = table;
|
||||
TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest();
|
||||
if (targetColumnNames != null) {
|
||||
streamLoadPutRequest.setColumns(String.join(",", targetColumnNames));
|
||||
if (targetColumnNames.stream().anyMatch(col -> col.equalsIgnoreCase(Column.SEQUENCE_COL))) {
|
||||
streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL);
|
||||
}
|
||||
}
|
||||
streamLoadPutRequest
|
||||
.setDb(db.getFullName())
|
||||
.setMaxFilterRatio(1)
|
||||
.setTbl(table.getName())
|
||||
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
|
||||
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
|
||||
.setGroupCommit(true);
|
||||
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
|
||||
StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask);
|
||||
// Will using load id as query id in fragment
|
||||
TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
|
||||
for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) {
|
||||
for (TScanRangeParams scanRangeParams : entry.getValue()) {
|
||||
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
|
||||
TFileFormatType.FORMAT_PROTO);
|
||||
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
|
||||
TFileCompressType.PLAIN);
|
||||
}
|
||||
}
|
||||
List<TScanRangeParams> scanRangeParams = tRequest.params.per_node_scan_ranges.values().stream()
|
||||
.flatMap(Collection::stream).collect(Collectors.toList());
|
||||
Preconditions.checkState(scanRangeParams.size() == 1);
|
||||
loadId = queryId;
|
||||
// see BackendServiceProxy#execPlanFragmentsAsync
|
||||
paramsList = new TExecPlanFragmentParamsList();
|
||||
paramsList.addToParamsList(tRequest);
|
||||
execPlanFragmentParamsBytes = ByteString.copyFrom(new TSerializer().serialize(paramsList));
|
||||
}
|
||||
|
||||
public Future<PGroupCommitInsertResponse> executeGroupCommitInsert(ConnectContext ctx,
|
||||
List<InternalService.PDataRow> rows) throws TException, DdlException, RpcException {
|
||||
backend = ctx.getInsertGroupCommit(this.table.getId());
|
||||
if (backend == null || !backend.isAlive()) {
|
||||
List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
if (allBackendIds.isEmpty()) {
|
||||
throw new DdlException("No alive backend");
|
||||
}
|
||||
Collections.shuffle(allBackendIds);
|
||||
backend = Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0));
|
||||
ctx.setInsertGroupCommit(this.table.getId(), backend);
|
||||
}
|
||||
PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder()
|
||||
.setDbId(db.getId())
|
||||
.setTableId(table.getId())
|
||||
.setBaseSchemaVersion(table.getBaseSchemaVersion())
|
||||
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
|
||||
.setRequest(execPlanFragmentParamsBytes)
|
||||
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build())
|
||||
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
|
||||
.build()).addAllData(rows)
|
||||
.build();
|
||||
Future<PGroupCommitInsertResponse> future = BackendServiceProxy.getInstance()
|
||||
.groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
|
||||
return future;
|
||||
}
|
||||
|
||||
// only for nereids use
|
||||
public static InternalService.PDataRow getRowStringValue(List<Expr> cols, int filterSize) throws UserException {
|
||||
if (cols.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
|
||||
try {
|
||||
List<Expr> exprs = cols.subList(0, cols.size() - filterSize);
|
||||
for (Expr expr : exprs) {
|
||||
if (!expr.isLiteralOrCastExpr() && !(expr instanceof CastExpr)) {
|
||||
if (expr.getChildren().get(0) instanceof NullLiteral) {
|
||||
row.addColBuilder().setValue("\\N");
|
||||
continue;
|
||||
}
|
||||
throw new UserException(
|
||||
"do not support non-literal expr in transactional insert operation: " + expr.toSql());
|
||||
}
|
||||
if (expr instanceof NullLiteral) {
|
||||
row.addColBuilder().setValue("\\N");
|
||||
} else if (expr.getType() instanceof ArrayType) {
|
||||
row.addColBuilder().setValue(expr.getStringValueForArray());
|
||||
} else if (!expr.getChildren().isEmpty()) {
|
||||
expr.getChildren().forEach(child -> processExprVal(child, row));
|
||||
} else {
|
||||
row.addColBuilder().setValue(expr.getStringValue());
|
||||
}
|
||||
}
|
||||
} catch (UserException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return row.build();
|
||||
}
|
||||
|
||||
private static void processExprVal(Expr expr, InternalService.PDataRow.Builder row) {
|
||||
if (expr.getChildren().isEmpty()) {
|
||||
row.addColBuilder().setValue(expr.getStringValue());
|
||||
return;
|
||||
}
|
||||
for (Expr child : expr.getChildren()) {
|
||||
processExprVal(child, row);
|
||||
}
|
||||
}
|
||||
|
||||
public Backend getBackend() {
|
||||
return backend;
|
||||
}
|
||||
|
||||
public TExecPlanFragmentParamsList getParamsList() {
|
||||
return paramsList;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -126,13 +126,13 @@ import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.Forward;
|
||||
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.planner.GroupCommitPlanner;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.OriginalPlanner;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.proto.Data;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
|
||||
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
|
||||
import org.apache.doris.proto.Types;
|
||||
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
|
||||
@ -145,19 +145,16 @@ import org.apache.doris.resource.workloadgroup.QueryQueue;
|
||||
import org.apache.doris.resource.workloadgroup.QueueOfferToken;
|
||||
import org.apache.doris.rewrite.ExprRewriter;
|
||||
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
|
||||
import org.apache.doris.rpc.BackendServiceProxy;
|
||||
import org.apache.doris.rpc.RpcException;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.statistics.ResultRow;
|
||||
import org.apache.doris.statistics.util.InternalQueryBuffer;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.task.LoadEtlTask;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.TLoadTxnBeginRequest;
|
||||
import org.apache.doris.thrift.TLoadTxnBeginResult;
|
||||
import org.apache.doris.thrift.TMergeType;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
import org.apache.doris.thrift.TQueryType;
|
||||
import org.apache.doris.thrift.TResultBatch;
|
||||
@ -1828,19 +1825,9 @@ public class StmtExecutor {
|
||||
txnId = context.getTxnEntry().getTxnConf().getTxnId();
|
||||
} else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) {
|
||||
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
|
||||
Backend backend = context.getInsertGroupCommit(insertStmt.getTargetTable().getId());
|
||||
if (backend == null || !backend.isAlive()) {
|
||||
List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
if (allBackendIds.isEmpty()) {
|
||||
throw new DdlException("No alive backend");
|
||||
}
|
||||
Collections.shuffle(allBackendIds);
|
||||
backend = Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0));
|
||||
context.setInsertGroupCommit(insertStmt.getTargetTable().getId(), backend);
|
||||
}
|
||||
int maxRetry = 3;
|
||||
for (int i = 0; i < maxRetry; i++) {
|
||||
nativeInsertStmt.planForGroupCommit(context.queryId);
|
||||
GroupCommitPlanner groupCommitPlanner = nativeInsertStmt.planForGroupCommit(context.queryId);
|
||||
// handle rows
|
||||
List<InternalService.PDataRow> rows = new ArrayList<>();
|
||||
SelectStmt selectStmt = (SelectStmt) insertStmt.getQueryStmt();
|
||||
@ -1861,23 +1848,15 @@ public class StmtExecutor {
|
||||
InternalService.PDataRow data = getRowStringValue(exprList);
|
||||
rows.add(data);
|
||||
}
|
||||
TUniqueId loadId = nativeInsertStmt.getLoadId();
|
||||
PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder()
|
||||
.setDbId(insertStmt.getTargetTable().getDatabase().getId())
|
||||
.setTableId(insertStmt.getTargetTable().getId())
|
||||
.setBaseSchemaVersion(nativeInsertStmt.getBaseSchemaVersion())
|
||||
.setExecPlanFragmentRequest(nativeInsertStmt.getExecPlanFragmentRequest())
|
||||
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
|
||||
.build()).addAllData(rows)
|
||||
.build();
|
||||
Future<PGroupCommitInsertResponse> future = BackendServiceProxy.getInstance()
|
||||
.groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
|
||||
Future<PGroupCommitInsertResponse> future = groupCommitPlanner
|
||||
.executeGroupCommitInsert(context, rows);
|
||||
PGroupCommitInsertResponse response = future.get();
|
||||
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
|
||||
if (code == TStatusCode.DATA_QUALITY_ERROR) {
|
||||
LOG.info("group commit insert failed. stmt: {}, backend id: {}, status: {}, "
|
||||
+ "schema version: {}, retry: {}", insertStmt.getOrigStmt().originStmt,
|
||||
backend.getId(), response.getStatus(), nativeInsertStmt.getBaseSchemaVersion(), i);
|
||||
groupCommitPlanner.getBackend().getId(),
|
||||
response.getStatus(), nativeInsertStmt.getBaseSchemaVersion(), i);
|
||||
if (i < maxRetry) {
|
||||
List<TableIf> tables = Lists.newArrayList(insertStmt.getTargetTable());
|
||||
MetaLockUtils.readLockTables(tables);
|
||||
@ -1890,12 +1869,12 @@ public class StmtExecutor {
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
errMsg = "group commit insert failed. backend id: " + backend.getId() + ", status: "
|
||||
+ response.getStatus();
|
||||
errMsg = "group commit insert failed. backend id: "
|
||||
+ groupCommitPlanner.getBackend().getId() + ", status: " + response.getStatus();
|
||||
}
|
||||
} else if (code != TStatusCode.OK) {
|
||||
errMsg = "group commit insert failed. backend id: " + backend.getId() + ", status: "
|
||||
+ response.getStatus();
|
||||
errMsg = "group commit insert failed. backend id: " + groupCommitPlanner.getBackend().getId()
|
||||
+ ", status: " + response.getStatus();
|
||||
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
|
||||
}
|
||||
label = response.getLabel();
|
||||
|
||||
Reference in New Issue
Block a user