[fix](group commit) Group commit support sequence column for unique table (#26652)
This commit is contained in:
@ -260,8 +260,20 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
tblName.setDb(olapTable.getDatabase().getFullName());
|
||||
tblName.setTbl(olapTable.getName());
|
||||
if (olapTable.getDeleteSignColumn() != null) {
|
||||
List<Column> columns = olapTable.getBaseSchema(false);
|
||||
List<Column> columns = Lists.newArrayList(olapTable.getBaseSchema(false));
|
||||
// The same order as GroupCommitTableValuedFunction#getTableColumns
|
||||
// delete sign col
|
||||
columns.add(olapTable.getDeleteSignColumn());
|
||||
// version col
|
||||
Column versionColumn = olapTable.getFullSchema().stream().filter(Column::isVersionColumn).findFirst()
|
||||
.orElse(null);
|
||||
if (versionColumn != null) {
|
||||
columns.add(versionColumn);
|
||||
}
|
||||
// sequence col
|
||||
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() == null) {
|
||||
columns.add(olapTable.getSequenceCol());
|
||||
}
|
||||
targetColumnNames = columns.stream().map(c -> c.getName()).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
@ -1136,6 +1148,9 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest();
|
||||
if (targetColumnNames != null) {
|
||||
streamLoadPutRequest.setColumns(String.join(",", targetColumnNames));
|
||||
if (targetColumnNames.stream().anyMatch(col -> col.equalsIgnoreCase(Column.SEQUENCE_COL))) {
|
||||
streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL);
|
||||
}
|
||||
}
|
||||
streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1)
|
||||
.setTbl(getTbl())
|
||||
|
||||
@ -66,13 +66,26 @@ public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunct
|
||||
throw new AnalysisException("Only support OLAP table, but table type of table_id "
|
||||
+ tableId + " is " + table.getType());
|
||||
}
|
||||
Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn();
|
||||
List<Column> tableColumns = table.getBaseSchema(false);
|
||||
for (int i = 1; i <= tableColumns.size(); i++) {
|
||||
fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true));
|
||||
}
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
// delete sign column
|
||||
Column deleteSignColumn = olapTable.getDeleteSignColumn();
|
||||
if (deleteSignColumn != null) {
|
||||
fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getType(), true));
|
||||
fileColumns.add(new Column("c" + (fileColumns.size() + 1), deleteSignColumn.getType(), true));
|
||||
}
|
||||
// version column
|
||||
Column versionColumn = olapTable.getFullSchema().stream().filter(Column::isVersionColumn).findFirst()
|
||||
.orElse(null);
|
||||
if (versionColumn != null) {
|
||||
fileColumns.add(new Column("c" + (fileColumns.size() + 1), deleteSignColumn.getType(), true));
|
||||
}
|
||||
// sequence column
|
||||
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() == null) {
|
||||
Column sequenceCol = olapTable.getSequenceCol();
|
||||
fileColumns.add(new Column("c" + (fileColumns.size() + 1), sequenceCol.getType(), true));
|
||||
}
|
||||
return fileColumns;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user