[fix](publish) Pick Catch exception in genPublishTask to make one failed txn does not block the other txns (#37724) (#38042)
Pick https://github.com/apache/doris/pull/37724
This commit is contained in:
@ -108,60 +108,70 @@ public class PublishVersionDaemon extends MasterDaemon {
|
||||
if (transactionState.hasSendTask()) {
|
||||
continue;
|
||||
}
|
||||
List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
|
||||
for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
|
||||
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());
|
||||
|
||||
try {
|
||||
beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState, tableCommitInfo));
|
||||
} catch (MetaNotFoundException e) {
|
||||
LOG.warn("exception occur when trying to get rollup tablets info", e);
|
||||
}
|
||||
try {
|
||||
genPublishTask(allBackends, transactionState, createPublishVersionTaskTime, beIdToBaseTabletIds,
|
||||
batchTask);
|
||||
} catch (Throwable t) {
|
||||
LOG.error("errors while generate publish task for transaction: {}", transactionState, t);
|
||||
}
|
||||
|
||||
List<TPartitionVersionInfo> partitionVersionInfos = new ArrayList<>(partitionCommitInfos.size());
|
||||
for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
|
||||
TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(commitInfo.getPartitionId(),
|
||||
commitInfo.getVersion(), 0);
|
||||
partitionVersionInfos.add(versionInfo);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("try to publish version info partitionid [{}], version [{}]",
|
||||
commitInfo.getPartitionId(),
|
||||
commitInfo.getVersion());
|
||||
}
|
||||
}
|
||||
|
||||
Set<Long> publishBackends = transactionState.getPublishVersionTasks().keySet();
|
||||
// public version tasks are not persisted in catalog, so publishBackends may be empty.
|
||||
// so we have to try publish to all backends;
|
||||
if (publishBackends.isEmpty()) {
|
||||
// could not just add to it, should new a new object, or the back map will destroyed
|
||||
publishBackends = Sets.newHashSet();
|
||||
publishBackends.addAll(allBackends);
|
||||
}
|
||||
|
||||
for (long backendId : publishBackends) {
|
||||
PublishVersionTask task = new PublishVersionTask(backendId,
|
||||
transactionState.getTransactionId(),
|
||||
transactionState.getDbId(),
|
||||
partitionVersionInfos,
|
||||
createPublishVersionTaskTime);
|
||||
task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, Collections.emptySet()));
|
||||
// add to AgentTaskQueue for handling finish report.
|
||||
// not check return value, because the add will success
|
||||
AgentTaskQueue.addTask(task);
|
||||
batchTask.addTask(task);
|
||||
transactionState.addPublishVersionTask(backendId, task);
|
||||
}
|
||||
transactionState.setSendedTask();
|
||||
LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(),
|
||||
transactionState.getDbId());
|
||||
}
|
||||
if (!batchTask.getAllTasks().isEmpty()) {
|
||||
AgentTaskExecutor.submit(batchTask);
|
||||
}
|
||||
}
|
||||
|
||||
private void genPublishTask(List<Long> allBackends, TransactionState transactionState,
|
||||
long createPublishVersionTaskTime, Map<Long, Set<Long>> beIdToBaseTabletIds, AgentBatchTask batchTask) {
|
||||
List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
|
||||
for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
|
||||
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());
|
||||
|
||||
try {
|
||||
beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState, tableCommitInfo));
|
||||
} catch (MetaNotFoundException e) {
|
||||
LOG.warn("exception occur when trying to get rollup tablets info", e);
|
||||
}
|
||||
}
|
||||
|
||||
List<TPartitionVersionInfo> partitionVersionInfos = new ArrayList<>(partitionCommitInfos.size());
|
||||
for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
|
||||
TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(commitInfo.getPartitionId(),
|
||||
commitInfo.getVersion(), 0);
|
||||
partitionVersionInfos.add(versionInfo);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("try to publish version info partitionid [{}], version [{}]",
|
||||
commitInfo.getPartitionId(),
|
||||
commitInfo.getVersion());
|
||||
}
|
||||
}
|
||||
|
||||
Set<Long> publishBackends = transactionState.getPublishVersionTasks().keySet();
|
||||
// public version tasks are not persisted in catalog, so publishBackends may be empty.
|
||||
// so we have to try publish to all backends;
|
||||
if (publishBackends.isEmpty()) {
|
||||
// could not just add to it, should new a new object, or the back map will destroyed
|
||||
publishBackends = Sets.newHashSet();
|
||||
publishBackends.addAll(allBackends);
|
||||
}
|
||||
|
||||
for (long backendId : publishBackends) {
|
||||
PublishVersionTask task = new PublishVersionTask(backendId,
|
||||
transactionState.getTransactionId(),
|
||||
transactionState.getDbId(),
|
||||
partitionVersionInfos,
|
||||
createPublishVersionTaskTime);
|
||||
task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, Collections.emptySet()));
|
||||
// add to AgentTaskQueue for handling finish report.
|
||||
// not check return value, because the add will success
|
||||
AgentTaskQueue.addTask(task);
|
||||
batchTask.addTask(task);
|
||||
transactionState.addPublishVersionTask(backendId, task);
|
||||
}
|
||||
transactionState.setSendedTask();
|
||||
LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(),
|
||||
transactionState.getDbId());
|
||||
}
|
||||
|
||||
private void tryFinishTxn(List<TransactionState> readyTransactionStates,
|
||||
SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr,
|
||||
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
|
||||
|
||||
Reference in New Issue
Block a user