diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 22ca57f239..a1861fb7f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -91,6 +91,13 @@ public class PublishVersionDaemon extends MasterDaemon { LOG.warn("some transaction state need to publish, but no backend exists"); return; } + traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates, allBackends); + tryFinishTxn(readyTransactionStates, infoService, globalTransactionMgr, partitionVisibleVersions, + backendPartitions); + } + + private void traverseReadyTxnAndDispatchPublishVersionTask(List readyTransactionStates, + List allBackends) { long createPublishVersionTaskTime = System.currentTimeMillis(); // every backend-transaction identified a single task AgentBatchTask batchTask = new AgentBatchTask(); @@ -153,62 +160,79 @@ public class PublishVersionDaemon extends MasterDaemon { if (!batchTask.getAllTasks().isEmpty()) { AgentTaskExecutor.submit(batchTask); } + } - Map tableIdToTotalDeltaNumRows = Maps.newHashMap(); + private void tryFinishTxn(List readyTransactionStates, + SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr, + Map partitionVisibleVersions, Map> backendPartitions) { // try to finish the transaction, if failed just retry in next loop for (TransactionState transactionState : readyTransactionStates) { - Stream publishVersionTaskStream = transactionState - .getPublishVersionTasks() - .values() - .stream() - .peek(task -> { - if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { - Map tableIdToDeltaNumRows = - task.getTableIdToDeltaNumRows(); - tableIdToDeltaNumRows.forEach((tableId, numRows) -> { - tableIdToTotalDeltaNumRows - .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); - tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows); - }); - } - }); - boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream - .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); - transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows); - - boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout() - || DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks"); - if (shouldFinishTxn) { - try { - // one transaction exception should not affect other transaction - globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); - } catch (Exception e) { - LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); - } - if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { - // if finish transaction state failed, then update publish version time, should check - // to finish after some interval - transactionState.updateSendTaskTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("publish version for transaction {} failed", transactionState); - } - } - } - - if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); - } - transactionState.pruneAfterVisible(); - if (MetricRepo.isInit) { - long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime(); - MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); - } + try { + // try to finish the transaction, if failed just retry in next loop + tryFinishOneTxn(transactionState, infoService, globalTransactionMgr, partitionVisibleVersions, + backendPartitions); + } catch (Throwable t) { + LOG.error("errors while finish transaction: {}, publish tasks: {}", transactionState, + transactionState.getPublishVersionTasks(), t); } } // end for readyTransactionStates } + private void tryFinishOneTxn(TransactionState transactionState, + SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr, + Map partitionVisibleVersions, Map> backendPartitions) { + Map tableIdToTotalDeltaNumRows = Maps.newHashMap(); + Stream publishVersionTaskStream = transactionState + .getPublishVersionTasks() + .values() + .stream() + .peek(task -> { + if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { + Map tableIdToDeltaNumRows = + task.getTableIdToDeltaNumRows(); + tableIdToDeltaNumRows.forEach((tableId, numRows) -> { + tableIdToTotalDeltaNumRows + .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); + tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows); + }); + } + }); + boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream + .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); + transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows); + + boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout() + || DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks"); + if (shouldFinishTxn) { + try { + // one transaction exception should not affect other transaction + globalTransactionMgr.finishTransaction(transactionState.getDbId(), + transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); + } catch (Exception e) { + LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); + } + if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { + // if finish transaction state failed, then update publish version time, should check + // to finish after some interval + transactionState.updateSendTaskTime(); + if (LOG.isDebugEnabled()) { + LOG.debug("publish version for transaction {} failed", transactionState); + } + } + } + + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); + } + transactionState.pruneAfterVisible(); + if (MetricRepo.isInit) { + long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime(); + MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); + } + } + } + private Map> getBaseTabletIdsForEachBe(TransactionState transactionState, TableCommitInfo tableCommitInfo) throws MetaNotFoundException {