[fix](group_commit) group_commit is not support on table with property light_schema_change=false (#29244)

This commit is contained in:
huanghaibin
2023-12-29 00:26:38 +08:00
committed by GitHub
parent 9be0f04506
commit 99a1e066b5
4 changed files with 18 additions and 6 deletions

View File

@ -1109,7 +1109,12 @@ public class NativeInsertStmt extends InsertStmt {
}
}
public void analyzeGroupCommit(Analyzer analyzer) {
public void analyzeGroupCommit(Analyzer analyzer) throws AnalysisException {
if (isGroupCommitStreamLoadSql && (targetTable instanceof OlapTable)
&& !((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()) {
throw new AnalysisException(
"table light_schema_change is false, can't do http_stream with group commit mode");
}
if (isGroupCommit) {
return;
}
@ -1124,6 +1129,7 @@ public class NativeInsertStmt extends InsertStmt {
if (!partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& targetTable instanceof OlapTable
&& ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()
&& !ConnectContext.get().isTxnModel()
&& getQueryStmt() instanceof SelectStmt
&& ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && targetPartitionNames == null

View File

@ -249,7 +249,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
return ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& physicalOlapTableSink.getTargetTable() instanceof OlapTable && !ConnectContext.get().isTxnModel()
&& sink.getFragment().getPlanRoot() instanceof UnionNode && physicalOlapTableSink.getPartitionIds()
.isEmpty();
.isEmpty() && physicalOlapTableSink.getTargetTable().getTableProperty().getUseSchemaLightChange();
}
@Override

View File

@ -2001,6 +2001,10 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000;
Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP);
if (!((OlapTable) table).getTableProperty().getUseSchemaLightChange() && (request.getGroupCommitMode() != null
&& !request.getGroupCommitMode().equals("off_mode"))) {
throw new UserException("table light_schema_change is false, can't do stream load with group commit mode");
}
result.setDbId(db.getId());
result.setTableId(table.getId());
result.setBaseSchemaVersion(((OlapTable) table).getBaseSchemaVersion());
@ -2062,6 +2066,10 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000;
Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP);
if (!((OlapTable) table).getTableProperty().getUseSchemaLightChange() && (request.getGroupCommitMode() != null
&& !request.getGroupCommitMode().equals("off_mode"))) {
throw new UserException("table light_schema_change is false, can't do stream load with group commit mode");
}
return this.generatePipelineStreamLoadPut(request, db, fullDbName, (OlapTable) table, timeoutMs,
1, false);
}