[branch-2.1](insert-overwrite) Fix insert overwrite auto detect transaction safe (#38103) (#38442)

pick https://github.com/apache/doris/pull/38103
This commit is contained in:
zclllhhjj
2024-07-29 10:21:03 +08:00
committed by GitHub
parent 87cf2d1fb4
commit dab0138567
6 changed files with 142 additions and 52 deletions

View File

@ -1112,10 +1112,13 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
return Sets.newHashSet(nameToPartition.keySet());
}
public List<String> uncheckedGetPartNamesById(List<Long> partitionIds) {
// for those elements equal in partiton ids, get their names.
public List<String> getEqualPartitionNames(List<Long> partitionIds1, List<Long> partitionIds2) {
List<String> names = new ArrayList<String>();
for (Long id : partitionIds) {
names.add(idToPartition.get(id).getName());
for (int i = 0; i < partitionIds1.size(); i++) {
if (partitionIds1.get(i).equals(partitionIds2.get(i))) {
names.add(getPartition(partitionIds1.get(i)).getName());
}
}
return names;
}

View File

@ -58,7 +58,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
// but we only change one time and save the relations in partitionPairs. they're protected by taskLocks
@SerializedName(value = "taskLocks")
private Map<Long, ReentrantLock> taskLocks = Maps.newConcurrentMap();
// <groupId, <oldPartId, newPartId>>
// <groupId, <oldPartId, newPartId>>. no need concern which task it belongs to.
@SerializedName(value = "partitionPairs")
private Map<Long, Map<Long, Long>> partitionPairs = Maps.newConcurrentMap();
@ -91,7 +91,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
*
* @return group id, like a transaction id.
*/
public long preRegisterTask() {
public long registerTaskGroup() {
long groupId = Env.getCurrentEnv().getNextId();
taskGroups.put(groupId, new ArrayList<Long>());
taskLocks.put(groupId, new ReentrantLock());
@ -107,44 +107,81 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
taskGroups.get(groupId).add(taskId);
}
public List<Long> tryReplacePartitionIds(long groupId, List<Long> oldPartitionIds) {
/**
* this func should in lock scope of getLock(groupId)
*
* @param newIds if have replaced, replace with new. otherwise itself.
*/
public boolean tryReplacePartitionIds(long groupId, List<Long> oldPartitionIds, List<Long> newIds) {
Map<Long, Long> relations = partitionPairs.get(groupId);
List<Long> newIds = new ArrayList<Long>();
for (Long id : oldPartitionIds) {
boolean needReplace = false;
for (int i = 0; i < oldPartitionIds.size(); i++) {
long id = oldPartitionIds.get(i);
if (relations.containsKey(id)) {
// if we replaced it. then return new one.
newIds.add(relations.get(id));
} else {
// otherwise itself. we will deal it soon.
newIds.add(id);
needReplace = true;
}
}
return newIds;
return needReplace;
}
// this func should in lock scope of getLock(groupId)
public void recordPartitionPairs(long groupId, List<Long> oldIds, List<Long> newIds) {
Map<Long, Long> relations = partitionPairs.get(groupId);
Preconditions.checkArgument(oldIds.size() == newIds.size());
for (int i = 0; i < oldIds.size(); i++) {
relations.put(oldIds.get(i), newIds.get(i));
if (LOG.isDebugEnabled()) {
LOG.debug("recorded partition pairs: [" + oldIds.get(i) + ", " + newIds.get(i) + "]");
}
}
}
// lock is a symbol of TaskGroup exist. if not, means already failed.
public ReentrantLock getLock(long groupId) {
return taskLocks.get(groupId);
}
// When goes into failure, some BE may still not know and send new request.
// it will cause ConcurrentModification or NullPointer.
public void taskGroupFail(long groupId) {
LOG.info("insert overwrite auto detect partition task group [" + groupId + "] failed");
for (Long taskId : taskGroups.get(groupId)) {
taskFail(taskId);
ReentrantLock lock = getLock(groupId);
lock.lock();
try {
// will rollback temp partitions in `taskFail`
for (Long taskId : taskGroups.get(groupId)) {
taskFail(taskId);
}
cleanTaskGroup(groupId);
} finally {
lock.unlock();
}
cleanTaskGroup(groupId);
}
public void taskGroupSuccess(long groupId) {
// here we will make all raplacement of this group visiable. if someone fails, nothing happen.
public void taskGroupSuccess(long groupId, OlapTable targetTable) throws DdlException {
try {
Map<Long, Long> relations = partitionPairs.get(groupId);
ArrayList<String> oldNames = new ArrayList<>();
ArrayList<String> newNames = new ArrayList<>();
for (Entry<Long, Long> partitionPair : relations.entrySet()) {
oldNames.add(targetTable.getPartition(partitionPair.getKey()).getName());
newNames.add(targetTable.getPartition(partitionPair.getValue()).getName());
}
InsertOverwriteUtil.replacePartition(targetTable, oldNames, newNames);
} catch (Exception e) {
LOG.warn("insert overwrite task making replacement failed because " + e.getMessage()
+ "all new partition will not be visible and will be recycled by partition GC.");
throw e;
}
LOG.info("insert overwrite auto detect partition task group [" + groupId + "] succeed");
for (Long taskId : taskGroups.get(groupId)) {
Env.getCurrentEnv().getEditLog()
.logInsertOverwrite(new InsertOverwriteLog(taskId, tasks.get(taskId), InsertOverwriteOpType.ADD));
taskSuccess(taskId);
}
cleanTaskGroup(groupId);
@ -164,6 +201,9 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
public void taskFail(long taskId) {
LOG.info("insert overwrite task [" + taskId + "] failed");
boolean rollback = rollback(taskId);
if (!rollback) {
LOG.warn("roll back task [" + taskId + "] failed");
}
if (rollback) {
removeTask(taskId);
} else {
@ -192,6 +232,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
}
}
// cancel it. should try to remove them after.
private void cancelTask(long taskId) {
if (tasks.containsKey(taskId)) {
LOG.info("cancel insert overwrite task: {}", tasks.get(taskId));
@ -201,6 +242,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
}
}
// task and partitions has been removed. it's safe to remove task.
private void removeTask(long taskId) {
if (tasks.containsKey(taskId)) {
LOG.info("remove insert overwrite task: {}", tasks.get(taskId));
@ -222,7 +264,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
try {
olapTable = task.getTable();
} catch (DdlException e) {
LOG.warn("can not get table, task: {}", task);
LOG.warn("can not get table, task: {}, reason: {}", task, e.getMessage());
return true;
}
return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames());

View File

@ -54,6 +54,7 @@ public class InsertOverwriteUtil {
for (int i = 0; i < partitionNames.size(); i++) {
Env.getCurrentEnv().addPartitionLike((Database) tableIf.getDatabase(), tableIf.getName(),
new AddPartitionLikeClause(tempPartitionNames.get(i), partitionNames.get(i), true));
LOG.info("successfully add temp partition [{}] for [{}]", tempPartitionNames.get(i), tableIf.getName());
}
}
}

View File

@ -169,11 +169,12 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks made and registered in rpc process.
taskId = Env.getCurrentEnv().getInsertOverwriteManager().preRegisterTask();
// When inserting, BE will call to replace partition by FrontendService. FE do the real
// add&replacement and return replace result. So there's no need to do anything else.
taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
// When inserting, BE will call to replace partition by FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement.
insertInto(ctx, executor, taskId);
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId);
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable);
} else {
List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
taskId = Env.getCurrentEnv().getInsertOverwriteManager()
@ -184,7 +185,7 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
}
} catch (Exception e) {
LOG.warn("insert into overwrite failed");
LOG.warn("insert into overwrite failed with task(or group) id " + taskId);
if (isAutoDetectOverwrite()) {
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
} else {

View File

@ -276,7 +276,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
// Frontend service used to serve all request for this frontend through
// thrift protocol
@ -3568,7 +3567,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
LOG.info("Receive replace partition request: {}", request);
long dbId = request.getDbId();
long tableId = request.getTableId();
List<Long> partitionIds = request.getPartitionIds();
List<Long> reqPartitionIds = request.getPartitionIds();
long taskGroupId = request.getOverwriteGroupId();
TReplacePartitionResult result = new TReplacePartitionResult();
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
@ -3607,41 +3606,60 @@ public class FrontendServiceImpl implements FrontendService.Iface {
OlapTable olapTable = (OlapTable) table;
InsertOverwriteManager overwriteManager = Env.getCurrentEnv().getInsertOverwriteManager();
ReentrantLock taskLock = overwriteManager.getLock(taskGroupId);
List<String> allReqPartNames; // all request partitions
if (taskLock == null) {
errorStatus.setErrorMsgs(Lists
.newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed.")));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
ArrayList<Long> resultPartitionIds = new ArrayList<>(); // [1 2 5 6] -> [7 8 5 6]
ArrayList<Long> pendingPartitionIds = new ArrayList<>(); // pending: [1 2]
ArrayList<Long> newPartitionIds = new ArrayList<>(); // requested temp partition ids. for [7 8]
boolean needReplace = false;
try {
taskLock.lock();
// double check lock. maybe taskLock is not null, but has been removed from the Map. means the task failed.
if (overwriteManager.getLock(taskGroupId) == null) {
errorStatus.setErrorMsgs(Lists
.newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed.")));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
// we dont lock the table. other thread in this txn will be controled by taskLock.
// if we have already replaced. dont do it again, but acquire the recorded new partition directly.
// if we have already replaced, dont do it again, but acquire the recorded new partition directly.
// if not by this txn, just let it fail naturally is ok.
List<Long> replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
// here if replacedPartIds still have null. this will throw exception.
allReqPartNames = olapTable.uncheckedGetPartNamesById(replacedPartIds);
needReplace = overwriteManager.tryReplacePartitionIds(taskGroupId, reqPartitionIds, resultPartitionIds);
// request: [1 2 3 4] result: [1 2 5 6] means ONLY 1 and 2 need replace.
if (needReplace) {
// names for [1 2]
List<String> pendingPartitionNames = olapTable.getEqualPartitionNames(reqPartitionIds,
resultPartitionIds);
for (String name : pendingPartitionNames) {
pendingPartitionIds.add(olapTable.getPartition(name).getId()); // put [1 2]
}
List<Long> pendingPartitionIds = IntStream.range(0, partitionIds.size())
.filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced
.mapToObj(partitionIds::get)
.collect(Collectors.toList());
// from here we ONLY deal the pending partitions. not include the dealed(by others).
if (!pendingPartitionIds.isEmpty()) {
// below two must have same order inner.
List<String> pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
List<String> tempPartitionNames = InsertOverwriteUtil
// names for [7 8]
List<String> newTempNames = InsertOverwriteUtil
.generateTempPartitionNames(pendingPartitionNames);
long taskId = overwriteManager.registerTask(dbId, tableId, tempPartitionNames);
// a task means one time insert overwrite
long taskId = overwriteManager.registerTask(dbId, tableId, newTempNames);
overwriteManager.registerTaskInGroup(taskGroupId, taskId);
InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames);
InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames);
InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, newTempNames);
// now temp partitions are bumped up and use new names. we get their ids and record them.
List<Long> newPartitionIds = new ArrayList<Long>();
for (String newPartName : pendingPartitionNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
for (String newPartName : newTempNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId()); // put [7 8]
}
overwriteManager.recordPartitionPairs(taskGroupId, pendingPartitionIds, newPartitionIds);
if (LOG.isDebugEnabled()) {
LOG.debug("partition replacement: ");
for (int i = 0; i < pendingPartitionIds.size(); i++) {
LOG.debug("[" + pendingPartitionIds.get(i) + ", " + newPartitionIds.get(i) + "], ");
LOG.debug("[" + pendingPartitionIds.get(i) + " - " + pendingPartitionNames.get(i) + ", "
+ newPartitionIds.get(i) + " - " + newTempNames.get(i) + "], ");
}
}
}
@ -3654,15 +3672,38 @@ public class FrontendServiceImpl implements FrontendService.Iface {
taskLock.unlock();
}
// build partition & tablets. now all partitions in allReqPartNames are replaced
// an recorded.
// so they won't be changed again. if other transaction changing it. just let it
// fail.
List<TOlapTablePartition> partitions = Lists.newArrayList();
List<TTabletLocation> tablets = Lists.newArrayList();
// result: [1 2 5 6], make it [7 8 5 6]
int idx = 0;
if (needReplace) {
for (int i = 0; i < reqPartitionIds.size(); i++) {
if (reqPartitionIds.get(i).equals(resultPartitionIds.get(i))) {
resultPartitionIds.set(i, newPartitionIds.get(idx++));
}
}
}
if (idx != newPartitionIds.size()) {
errorStatus.addToErrorMsgs("changed partition number " + idx + " is not correct");
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
if (LOG.isDebugEnabled()) {
LOG.debug("replace partition origin ids: ["
+ String.join(", ", reqPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+ ']');
LOG.debug("replace partition result ids: ["
+ String.join(", ", resultPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+ ']');
}
// build partition & tablets. now all partitions in allReqPartNames are replaced an recorded.
// so they won't be changed again. if other transaction changing it. just let it fail.
List<TOlapTablePartition> partitions = new ArrayList<>();
List<TTabletLocation> tablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
for (String partitionName : allReqPartNames) {
Partition partition = table.getPartition(partitionName);
for (long partitionId : resultPartitionIds) {
Partition partition = olapTable.getPartition(partitionId);
TOlapTablePartition tPartition = new TOlapTablePartition();
tPartition.setId(partition.getId());