[improve](group commit) Add a swicth to wait internal group commit lo… (#26734)

* [improve](group commit) Add a swicth to make internal group commit load finish

* modify group commit tvf plan
This commit is contained in:
meiyi
2023-11-13 10:35:35 +08:00
committed by GitHub
parent 7332b1b371
commit c0fda8c5c2
16 changed files with 62 additions and 62 deletions

View File

@ -25,6 +25,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
@ -248,21 +249,8 @@ public class NativeInsertStmt extends InsertStmt {
OlapTable olapTable = (OlapTable) table;
tblName.setDb(olapTable.getDatabase().getFullName());
tblName.setTbl(olapTable.getName());
if (olapTable.getDeleteSignColumn() != null) {
List<Column> columns = Lists.newArrayList(olapTable.getBaseSchema(false));
// The same order as GroupCommitTableValuedFunction#getTableColumns
// delete sign col
columns.add(olapTable.getDeleteSignColumn());
// version col
Column versionColumn = olapTable.getFullSchema().stream().filter(Column::isVersionColumn).findFirst()
.orElse(null);
if (versionColumn != null) {
columns.add(versionColumn);
}
// sequence col
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() == null) {
columns.add(olapTable.getSequenceCol());
}
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
List<Column> columns = Lists.newArrayList(olapTable.getBaseSchema(true));
targetColumnNames = columns.stream().map(c -> c.getName()).collect(Collectors.toList());
}
}
@ -1086,7 +1074,7 @@ public class NativeInsertStmt extends InsertStmt {
LOG.warn("analyze group commit failed", e);
return;
}
if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit
if (ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& targetTable instanceof OlapTable
&& !ConnectContext.get().isTxnModel()
&& getQueryStmt() instanceof SelectStmt

View File

@ -165,7 +165,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
}
OlapTableSink sink = ((OlapTableSink) planner.getFragments().get(0).getSink());
if (ctx.getSessionVariable().enableInsertGroupCommit) {
if (ctx.getSessionVariable().isEnableInsertGroupCommit()) {
// group commit
if (analyzeGroupCommit(sink, physicalOlapTableSink)) {
handleGroupCommit(ctx, sink, physicalOlapTableSink);
@ -421,7 +421,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
}
private boolean analyzeGroupCommit(OlapTableSink sink, PhysicalOlapTableSink<?> physicalOlapTableSink) {
return ConnectContext.get().getSessionVariable().enableInsertGroupCommit
return ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& physicalOlapTableSink.getTargetTable() instanceof OlapTable
&& !ConnectContext.get().isTxnModel()
&& sink.getFragment().getPlanRoot() instanceof UnionNode

View File

@ -2918,4 +2918,8 @@ public class SessionVariable implements Serializable, Writable {
throw new UnsupportedOperationException("sqlDialect value is invalid, the invalid value is " + sqlDialect);
}
}
public boolean isEnableInsertGroupCommit() {
return enableInsertGroupCommit || Config.wait_internal_group_commit_finish;
}
}

View File

@ -1084,7 +1084,7 @@ public class StmtExecutor {
analyzeVariablesInStmt();
}
if (context.getSessionVariable().enableInsertGroupCommit && parsedStmt instanceof NativeInsertStmt) {
if (context.getSessionVariable().isEnableInsertGroupCommit() && parsedStmt instanceof NativeInsertStmt) {
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) parsedStmt;
nativeInsertStmt.analyzeGroupCommit(new Analyzer(context.getEnv(), context));
}

View File

@ -2169,6 +2169,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
result.setDbId(parsedStmt.getTargetTable().getDatabase().getId());
result.setTableId(parsedStmt.getTargetTable().getId());
result.setBaseSchemaVersion(((OlapTable) parsedStmt.getTargetTable()).getBaseSchemaVersion());
result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish);
} catch (UserException e) {
LOG.warn("exec sql error", e);
throw new UserException("exec sql error" + e);

View File

@ -66,27 +66,10 @@ public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunct
throw new AnalysisException("Only support OLAP table, but table type of table_id "
+ tableId + " is " + table.getType());
}
List<Column> tableColumns = table.getBaseSchema(false);
List<Column> tableColumns = table.getBaseSchema(true);
for (int i = 1; i <= tableColumns.size(); i++) {
fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true));
}
OlapTable olapTable = (OlapTable) table;
// delete sign column
Column deleteSignColumn = olapTable.getDeleteSignColumn();
if (deleteSignColumn != null) {
fileColumns.add(new Column("c" + (fileColumns.size() + 1), deleteSignColumn.getType(), true));
}
// version column
Column versionColumn = olapTable.getFullSchema().stream().filter(Column::isVersionColumn).findFirst()
.orElse(null);
if (versionColumn != null) {
fileColumns.add(new Column("c" + (fileColumns.size() + 1), deleteSignColumn.getType(), true));
}
// sequence column
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() == null) {
Column sequenceCol = olapTable.getSequenceCol();
fileColumns.add(new Column("c" + (fileColumns.size() + 1), sequenceCol.getType(), true));
}
return fileColumns;
}