[Fix](Txn) Fix transaction write to sequence column error (#26748)

This commit is contained in:
abmdocrt
2023-11-14 10:30:10 +08:00
committed by GitHub
parent 44f49c687d
commit fef627c0ba
8 changed files with 199 additions and 0 deletions

View File

@ -241,6 +241,10 @@ public class NativeInsertStmt extends InsertStmt {
return tblName.getTbl();
}
public List<String> getTargetColumnNames() {
return targetColumnNames;
}
public void getTables(Analyzer analyzer, Map<Long, TableIf> tableMap, Set<String> parentViewNameSet)
throws AnalysisException {
if (tableId != -1) {

View File

@ -1704,6 +1704,18 @@ public class StmtExecutor {
if (selectStmt.getValueList() != null) {
Table tbl = txnEntry.getTable();
int schemaSize = tbl.getBaseSchema(false).size();
if (parsedStmt instanceof NativeInsertStmt
&& ((NativeInsertStmt) parsedStmt).getTargetColumnNames() != null) {
if (((NativeInsertStmt) parsedStmt).getTargetColumnNames()
.contains(Column.SEQUENCE_COL)) {
schemaSize++;
}
if (((NativeInsertStmt) parsedStmt).getTargetColumnNames()
.contains(Column.DELETE_SIGN)) {
schemaSize++;
}
}
for (List<Expr> row : selectStmt.getValueList().getRows()) {
// the value columns are columns which are visible to user, so here we use
// getBaseSchema(), not getFullSchema()
@ -1777,6 +1789,22 @@ public class StmtExecutor {
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId())
.setExecMemLimit(maxExecMemByte).setTimeout((int) timeoutSecond)
.setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism);
if (parsedStmt instanceof NativeInsertStmt && ((NativeInsertStmt) parsedStmt).getTargetColumnNames() != null) {
List<String> targetColumnNames = ((NativeInsertStmt) parsedStmt).getTargetColumnNames();
if (targetColumnNames.contains(Column.SEQUENCE_COL) || targetColumnNames.contains(Column.DELETE_SIGN)) {
if (targetColumnNames.contains(Column.SEQUENCE_COL)) {
request.setSequenceCol(Column.SEQUENCE_COL);
}
StringBuilder allCols = new StringBuilder();
for (String col : ((NativeInsertStmt) parsedStmt).getTargetColumnNames()) {
allCols.append(col);
allCols.append(",");
}
allCols.deleteCharAt(allCols.length() - 1);
request.setColumns(String.valueOf(allCols));
request.setColumnSeparator(",");
}
}
// execute begin txn
InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry);