[improve](group commit) Group commit support skip wal (#27957)

This commit is contained in:
meiyi
2023-12-11 19:38:32 +08:00
committed by GitHub
parent 9b8de017df
commit d4f89389e3
41 changed files with 770 additions and 147 deletions

View File

@ -969,7 +969,8 @@ public class NativeInsertStmt extends InsertStmt {
OlapTableSink sink;
if (isGroupCommitStreamLoadSql) {
sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple,
targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(),
ConnectContext.get().getSessionVariable().getGroupCommit());
} else {
sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
@ -1155,7 +1156,8 @@ public class NativeInsertStmt extends InsertStmt {
this.analyzer = analyzerTmp;
}
analyzeSubquery(analyzer, true);
groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable, targetColumnNames, queryId);
groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable, targetColumnNames, queryId,
ConnectContext.get().getSessionVariable().getGroupCommit());
// save plan message to be reused for prepare stmt
loadId = queryId;
baseSchemaVersion = olapTable.getBaseSchemaVersion();

View File

@ -186,7 +186,8 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
}
GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), null, ctx.queryId());
physicalOlapTableSink.getTargetTable(), null, ctx.queryId(),
ConnectContext.get().getSessionVariable().getGroupCommit());
PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
if (code == TStatusCode.DATA_QUALITY_ERROR) {

View File

@ -19,18 +19,44 @@ package org.apache.doris.planner;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TGroupCommitMode;
import com.google.common.base.Preconditions;
import java.util.List;
public class GroupCommitBlockSink extends OlapTableSink {
private String groupCommit;
public GroupCommitBlockSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
boolean singleReplicaLoad) {
boolean singleReplicaLoad, String groupCommit) {
super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad);
this.groupCommit = groupCommit;
}
protected TDataSinkType getDataSinkType() {
return TDataSinkType.GROUP_COMMIT_BLOCK_SINK;
}
@Override
protected TDataSink toThrift() {
TGroupCommitMode groupCommitMode = parseGroupCommit(groupCommit);
Preconditions.checkNotNull(groupCommitMode, "Group commit is: " + groupCommit);
tDataSink.olap_table_sink.setGroupCommitMode(groupCommitMode);
return tDataSink;
}
public static TGroupCommitMode parseGroupCommit(String groupCommit) {
if (groupCommit == null) {
return null;
} else if (groupCommit.equalsIgnoreCase("async_mode")) {
return TGroupCommitMode.ASYNC_MODE;
} else if (groupCommit.equalsIgnoreCase("sync_mode")) {
return TGroupCommitMode.SYNC_MODE;
} else {
return null;
}
}
}

View File

@ -79,7 +79,8 @@ public class GroupCommitPlanner {
private TExecPlanFragmentParamsList paramsList;
private ByteString execPlanFragmentParamsBytes;
public GroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, TUniqueId queryId)
public GroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, TUniqueId queryId,
String groupCommit)
throws UserException, TException {
this.db = db;
this.table = table;
@ -101,7 +102,7 @@ public class GroupCommitPlanner {
.setTbl(table.getName())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
.setGroupCommit(true).setTrimDoubleQuotes(true);
.setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit);
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask);
// Will using load id as query id in fragment

View File

@ -105,7 +105,7 @@ public class OlapTableSink extends DataSink {
private HashSet<String> partialUpdateInputColumns;
// set after init called
private TDataSink tDataSink;
protected TDataSink tDataSink;
private boolean singleReplicaLoad;

View File

@ -259,9 +259,9 @@ public class StreamLoadPlanner {
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink;
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).isGroupCommit()) {
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) {
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds,
Config.enable_single_replica_load);
Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit());
} else {
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
}
@ -479,9 +479,9 @@ public class StreamLoadPlanner {
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink;
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).isGroupCommit()) {
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) {
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds,
Config.enable_single_replica_load);
Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit());
} else {
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
}

View File

@ -31,6 +31,7 @@ import org.apache.doris.nereids.metrics.EventSwitchParser;
import org.apache.doris.nereids.parser.ParseDialect;
import org.apache.doris.nereids.parser.ParseDialect.Dialect;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.planner.GroupCommitBlockSink;
import org.apache.doris.qe.VariableMgr.VarAttr;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TResourceLimit;
@ -398,7 +399,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String EXTERNAL_TABLE_ANALYZE_PART_NUM = "external_table_analyze_part_num";
public static final String ENABLE_STRONG_CONSISTENCY = "enable_strong_consistency_read";
public static final String ENABLE_INSERT_GROUP_COMMIT = "enable_insert_group_commit";
public static final String GROUP_COMMIT = "group_commit";
public static final String PARALLEL_SYNC_ANALYZE_TASK_NUM = "parallel_sync_analyze_task_num";
@ -1323,8 +1324,8 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = LOAD_STREAM_PER_NODE)
public int loadStreamPerNode = 20;
@VariableMgr.VarAttr(name = ENABLE_INSERT_GROUP_COMMIT)
public boolean enableInsertGroupCommit = false;
@VariableMgr.VarAttr(name = GROUP_COMMIT)
public String groupCommit = "off_mode";
@VariableMgr.VarAttr(name = INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD,
description = {"在match_all中求取多个倒排索引的交集时,如果最大的倒排索引中的总数是最小倒排索引中的总数的整数倍,"
@ -3095,7 +3096,14 @@ public class SessionVariable implements Serializable, Writable {
}
public boolean isEnableInsertGroupCommit() {
return enableInsertGroupCommit || Config.wait_internal_group_commit_finish;
return Config.wait_internal_group_commit_finish || GroupCommitBlockSink.parseGroupCommit(groupCommit) != null;
}
public String getGroupCommit() {
if (Config.wait_internal_group_commit_finish) {
return "sync_mode";
}
return groupCommit;
}
public boolean isEnableMaterializedViewRewrite() {

View File

@ -227,6 +227,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@ -1992,10 +1993,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
NativeInsertStmt parsedStmt = (NativeInsertStmt) SqlParserUtils.getFirstStmt(parser);
parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0));
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
if (request.isGroupCommit()) {
if (!StringUtils.isEmpty(request.getGroupCommitMode())) {
if (parsedStmt.getLabel() != null) {
throw new AnalysisException("label and group_commit can't be set at the same time");
}
ctx.getSessionVariable().groupCommit = request.getGroupCommitMode();
parsedStmt.isGroupCommitStreamLoadSql = true;
}
StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
@ -2083,7 +2085,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask);
TExecPlanFragmentParams plan = planner.plan(streamLoadTask.getId(), multiTableFragmentInstanceIdIndex);
if (!request.isGroupCommit()) {
if (StringUtils.isEmpty(streamLoadTask.getGroupCommit())) {
// add table indexes to transaction state
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), request.getTxnId());
@ -2149,7 +2151,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask);
TPipelineFragmentParams plan = planner.planForPipeline(streamLoadTask.getId(),
multiTableFragmentInstanceIdIndex);
if (!request.isGroupCommit()) {
if (StringUtils.isEmpty(streamLoadTask.getGroupCommit())) {
// add table indexes to transaction state
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), request.getTxnId());

View File

@ -95,7 +95,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private byte escape = 0;
private boolean groupCommit = false;
private String groupCommit;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType,
TFileCompressType compressType) {
@ -324,7 +324,7 @@ public class StreamLoadTask implements LoadTaskInfo {
request.getFileType(), request.getFormatType(),
request.getCompressType());
streamLoadTask.setOptionalFromTSLPutRequest(request);
streamLoadTask.setGroupCommit(request.isGroupCommit());
streamLoadTask.setGroupCommit(request.getGroupCommitMode());
if (request.isSetFileSize()) {
streamLoadTask.fileSize = request.getFileSize();
}
@ -538,11 +538,11 @@ public class StreamLoadTask implements LoadTaskInfo {
return maxFilterRatio;
}
public void setGroupCommit(boolean groupCommit) {
public void setGroupCommit(String groupCommit) {
this.groupCommit = groupCommit;
}
public boolean isGroupCommit() {
public String getGroupCommit() {
return groupCommit;
}
}