[Fix](partial update) Fix core when successfully schema change and load during a partial update (#26210)

This commit is contained in:
bobhan1
2023-11-06 23:16:05 +08:00
committed by GitHub
parent b99afcc7b5
commit 6983736cce
11 changed files with 408 additions and 0 deletions

View File

@ -999,6 +999,9 @@ public class NativeInsertStmt extends InsertStmt {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
if (!isFromDeleteOrUpdateStmt && isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
}
}
}

View File

@ -230,6 +230,9 @@ public class BrokerLoadJob extends BulkLoadJob {
throw new UserException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes(table);
if (isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
} finally {
MetaLockUtils.readUnlockTables(tableList);

View File

@ -899,6 +899,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
throw new MetaNotFoundException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
}
return planParams;
} finally {
@ -919,6 +922,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
throw new MetaNotFoundException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
}
return planParams;
} finally {

View File

@ -176,6 +176,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
throw new DdlException("txn does not exist: " + txn.getTxnId());
}
state.addTableIndexes(physicalOlapTableSink.getTargetTable());
if (physicalOlapTableSink.isFromNativeInsertStmt() && physicalOlapTableSink.isPartialUpdate()) {
state.setSchemaForPartialUpdate(physicalOlapTableSink.getTargetTable());
}
executor.setProfileType(ProfileType.LOAD);

View File

@ -2221,6 +2221,9 @@ public class FrontendServiceImpl implements FrontendService.Iface {
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
if (request.isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
plan.setTableName(table.getName());
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
@ -2284,6 +2287,9 @@ public class FrontendServiceImpl implements FrontendService.Iface {
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
if (request.isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
return plan;
} finally {

View File

@ -674,6 +674,35 @@ public class DatabaseTransactionMgr {
+ "] is prepare, not pre-committed.");
}
if (transactionState.isPartialUpdate()) {
if (is2PC) {
Iterator<TableCommitInfo> tableCommitInfoIterator
= transactionState.getIdToTableCommitInfos().values().iterator();
while (tableCommitInfoIterator.hasNext()) {
TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
long tableId = tableCommitInfo.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
if (table != null && table instanceof OlapTable) {
if (!transactionState.checkSchemaCompatibility((OlapTable) table)) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] check schema compatibility failed, partial update can't commit with"
+ " old schema sucessfully .");
}
}
}
} else {
for (Table table : tableList) {
if (table instanceof OlapTable) {
if (!transactionState.checkSchemaCompatibility((OlapTable) table)) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] check schema compatibility failed, partial update can't commit with"
+ " old schema sucessfully .");
}
}
}
}
}
Set<Long> errorReplicaIds = Sets.newHashSet();
Set<Long> totalInvolvedBackends = Sets.newHashSet();
Map<Long, Set<Long>> tableToPartition = new HashMap<>();

View File

@ -17,7 +17,9 @@
package org.apache.doris.transaction;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
@ -45,8 +47,10 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -265,6 +269,24 @@ public class TransactionState implements Writable {
// no need to persist.
private String errMsg = "";
public class SchemaInfo {
public List<Column> schema;
public int schemaVersion;
public SchemaInfo(OlapTable olapTable) {
Map<Long, MaterializedIndexMeta> indexIdToMeta = olapTable.getIndexIdToMeta();
for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) {
schema = indexMeta.getSchema();
schemaVersion = indexMeta.getSchemaVersion();
break;
}
}
}
private boolean isPartialUpdate = false;
// table id -> schema info
private Map<Long, SchemaInfo> txnSchemas = new HashMap<>();
public TransactionState() {
this.dbId = -1;
this.tableIdList = Lists.newArrayList();
@ -725,4 +747,47 @@ public class TransactionState implements Writable {
public String getErrMsg() {
return this.errMsg;
}
public void setSchemaForPartialUpdate(OlapTable olapTable) {
// the caller should hold the read lock of the table
isPartialUpdate = true;
txnSchemas.put(olapTable.getId(), new SchemaInfo(olapTable));
}
public boolean isPartialUpdate() {
return isPartialUpdate;
}
public SchemaInfo getTxnSchema(long id) {
return txnSchemas.get(id);
}
public boolean checkSchemaCompatibility(OlapTable olapTable) {
SchemaInfo currentSchemaInfo = new SchemaInfo(olapTable);
SchemaInfo txnSchemaInfo = txnSchemas.get(olapTable.getId());
if (txnSchemaInfo == null) {
return true;
}
if (txnSchemaInfo.schemaVersion >= currentSchemaInfo.schemaVersion) {
return true;
}
for (Column txnCol : txnSchemaInfo.schema) {
if (!txnCol.isVisible() || !txnCol.getType().isStringType()) {
continue;
}
int uniqueId = txnCol.getUniqueId();
Optional<Column> currentCol = currentSchemaInfo.schema.stream()
.filter(col -> col.getUniqueId() == uniqueId).findFirst();
// for now Doris's light schema change only supports adding columns,
// dropping columns, and type conversions that increase the varchar length
if (currentCol.isPresent() && currentCol.get().getType().isStringType()) {
if (currentCol.get().getStrLen() != txnCol.getStrLen()) {
LOG.warn("Check schema compatibility failed, txnId={}, table={}",
transactionId, olapTable.getName());
return false;
}
}
}
return true;
}
}