[fix](jdbc catalog) fix insert into jdbc table column order (#27855)

This commit is contained in:
zy-kkk
2023-12-01 20:46:48 +08:00
committed by GitHub
parent 7e3d6bc9f1
commit 8749e5208f
4 changed files with 41 additions and 5 deletions

View File

@ -998,12 +998,18 @@ public class NativeInsertStmt extends InsertStmt {
brokerDesc);
dataPartition = dataSink.getOutputPartition();
} else if (targetTable instanceof JdbcTable) {
//for JdbcTable,we need to pass the currently written column to `JdbcTableSink`
//to generate the prepare insert statment
List<String> insertCols = Lists.newArrayList();
for (Column column : targetColumns) {
insertCols.add(column.getName());
// For JdbcTable, reorder targetColumns to match the order in targetTable.getFullSchema()
List<String> insertCols = new ArrayList<>();
Set<String> targetColumnNames = targetColumns.stream()
.map(Column::getName)
.collect(Collectors.toSet());
for (Column column : targetTable.getFullSchema()) {
if (targetColumnNames.contains(column.getName())) {
insertCols.add(column.getName());
}
}
dataSink = new JdbcTableSink((JdbcTable) targetTable, insertCols);
dataPartition = DataPartition.UNPARTITIONED;
} else {

View File

@ -73,6 +73,7 @@ public class JdbcTableSink extends DataSink {
strBuilder.append(prefix + "TABLE TYPE: ").append(jdbcType.toString()).append("\n");
strBuilder.append(prefix + "TABLENAME OF EXTERNAL TABLE: ").append(externalTableName).append("\n");
strBuilder.append(prefix + "EnableTransaction: ").append(useTransaction ? "true" : "false").append("\n");
strBuilder.append(prefix + "PreparedStatement SQL: ").append(insertSql).append("\n");
return strBuilder.toString();
}