From 7dd01b1fa9b3e3def95f8a62b67fe452b16e40d3 Mon Sep 17 00:00:00 2001 From: meiyi Date: Thu, 18 Jul 2024 14:15:49 +0800 Subject: [PATCH] [fix](publish) Pick Catch exception in genPublishTask to make one failed txn does not block the other txns (#37724) (#38042) Pick https://github.com/apache/doris/pull/37724 --- .../transaction/PublishVersionDaemon.java | 104 ++++++++++-------- 1 file changed, 57 insertions(+), 47 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index a1861fb7f4..ac1cbe9154 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -108,60 +108,70 @@ public class PublishVersionDaemon extends MasterDaemon { if (transactionState.hasSendTask()) { continue; } - List partitionCommitInfos = new ArrayList<>(); - for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { - partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values()); - - try { - beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState, tableCommitInfo)); - } catch (MetaNotFoundException e) { - LOG.warn("exception occur when trying to get rollup tablets info", e); - } + try { + genPublishTask(allBackends, transactionState, createPublishVersionTaskTime, beIdToBaseTabletIds, + batchTask); + } catch (Throwable t) { + LOG.error("errors while generate publish task for transaction: {}", transactionState, t); } - - List partitionVersionInfos = new ArrayList<>(partitionCommitInfos.size()); - for (PartitionCommitInfo commitInfo : partitionCommitInfos) { - TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(commitInfo.getPartitionId(), - commitInfo.getVersion(), 0); - partitionVersionInfos.add(versionInfo); - if (LOG.isDebugEnabled()) { - LOG.debug("try to publish version info partitionid [{}], version [{}]", - commitInfo.getPartitionId(), - commitInfo.getVersion()); - } - } - - Set publishBackends = transactionState.getPublishVersionTasks().keySet(); - // public version tasks are not persisted in catalog, so publishBackends may be empty. - // so we have to try publish to all backends; - if (publishBackends.isEmpty()) { - // could not just add to it, should new a new object, or the back map will destroyed - publishBackends = Sets.newHashSet(); - publishBackends.addAll(allBackends); - } - - for (long backendId : publishBackends) { - PublishVersionTask task = new PublishVersionTask(backendId, - transactionState.getTransactionId(), - transactionState.getDbId(), - partitionVersionInfos, - createPublishVersionTaskTime); - task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, Collections.emptySet())); - // add to AgentTaskQueue for handling finish report. - // not check return value, because the add will success - AgentTaskQueue.addTask(task); - batchTask.addTask(task); - transactionState.addPublishVersionTask(backendId, task); - } - transactionState.setSendedTask(); - LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(), - transactionState.getDbId()); } if (!batchTask.getAllTasks().isEmpty()) { AgentTaskExecutor.submit(batchTask); } } + private void genPublishTask(List allBackends, TransactionState transactionState, + long createPublishVersionTaskTime, Map> beIdToBaseTabletIds, AgentBatchTask batchTask) { + List partitionCommitInfos = new ArrayList<>(); + for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { + partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values()); + + try { + beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState, tableCommitInfo)); + } catch (MetaNotFoundException e) { + LOG.warn("exception occur when trying to get rollup tablets info", e); + } + } + + List partitionVersionInfos = new ArrayList<>(partitionCommitInfos.size()); + for (PartitionCommitInfo commitInfo : partitionCommitInfos) { + TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(commitInfo.getPartitionId(), + commitInfo.getVersion(), 0); + partitionVersionInfos.add(versionInfo); + if (LOG.isDebugEnabled()) { + LOG.debug("try to publish version info partitionid [{}], version [{}]", + commitInfo.getPartitionId(), + commitInfo.getVersion()); + } + } + + Set publishBackends = transactionState.getPublishVersionTasks().keySet(); + // public version tasks are not persisted in catalog, so publishBackends may be empty. + // so we have to try publish to all backends; + if (publishBackends.isEmpty()) { + // could not just add to it, should new a new object, or the back map will destroyed + publishBackends = Sets.newHashSet(); + publishBackends.addAll(allBackends); + } + + for (long backendId : publishBackends) { + PublishVersionTask task = new PublishVersionTask(backendId, + transactionState.getTransactionId(), + transactionState.getDbId(), + partitionVersionInfos, + createPublishVersionTaskTime); + task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, Collections.emptySet())); + // add to AgentTaskQueue for handling finish report. + // not check return value, because the add will success + AgentTaskQueue.addTask(task); + batchTask.addTask(task); + transactionState.addPublishVersionTask(backendId, task); + } + transactionState.setSendedTask(); + LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(), + transactionState.getDbId()); + } + private void tryFinishTxn(List readyTransactionStates, SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr, Map partitionVisibleVersions, Map> backendPartitions) {