[fix](publish) Pick Fix publish failed because because task is null (#37546)
## Proposed changes Pick https://github.com/apache/doris/pull/37531 This pr catch the exception to make the failed txn does not block the other txns.
This commit is contained in:
@ -91,6 +91,13 @@ public class PublishVersionDaemon extends MasterDaemon {
|
||||
LOG.warn("some transaction state need to publish, but no backend exists");
|
||||
return;
|
||||
}
|
||||
traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates, allBackends);
|
||||
tryFinishTxn(readyTransactionStates, infoService, globalTransactionMgr, partitionVisibleVersions,
|
||||
backendPartitions);
|
||||
}
|
||||
|
||||
private void traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState> readyTransactionStates,
|
||||
List<Long> allBackends) {
|
||||
long createPublishVersionTaskTime = System.currentTimeMillis();
|
||||
// every backend-transaction identified a single task
|
||||
AgentBatchTask batchTask = new AgentBatchTask();
|
||||
@ -153,62 +160,79 @@ public class PublishVersionDaemon extends MasterDaemon {
|
||||
if (!batchTask.getAllTasks().isEmpty()) {
|
||||
AgentTaskExecutor.submit(batchTask);
|
||||
}
|
||||
}
|
||||
|
||||
Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
|
||||
private void tryFinishTxn(List<TransactionState> readyTransactionStates,
|
||||
SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr,
|
||||
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
|
||||
// try to finish the transaction, if failed just retry in next loop
|
||||
for (TransactionState transactionState : readyTransactionStates) {
|
||||
Stream<PublishVersionTask> publishVersionTaskStream = transactionState
|
||||
.getPublishVersionTasks()
|
||||
.values()
|
||||
.stream()
|
||||
.peek(task -> {
|
||||
if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) {
|
||||
Map<Long, Long> tableIdToDeltaNumRows =
|
||||
task.getTableIdToDeltaNumRows();
|
||||
tableIdToDeltaNumRows.forEach((tableId, numRows) -> {
|
||||
tableIdToTotalDeltaNumRows
|
||||
.computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows);
|
||||
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
|
||||
});
|
||||
}
|
||||
});
|
||||
boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
|
||||
.anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId()));
|
||||
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
|
||||
|
||||
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout()
|
||||
|| DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
|
||||
if (shouldFinishTxn) {
|
||||
try {
|
||||
// one transaction exception should not affect other transaction
|
||||
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
|
||||
transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e);
|
||||
}
|
||||
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
|
||||
// if finish transaction state failed, then update publish version time, should check
|
||||
// to finish after some interval
|
||||
transactionState.updateSendTaskTime();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("publish version for transaction {} failed", transactionState);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
|
||||
for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
|
||||
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
|
||||
}
|
||||
transactionState.pruneAfterVisible();
|
||||
if (MetricRepo.isInit) {
|
||||
long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
|
||||
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
|
||||
}
|
||||
try {
|
||||
// try to finish the transaction, if failed just retry in next loop
|
||||
tryFinishOneTxn(transactionState, infoService, globalTransactionMgr, partitionVisibleVersions,
|
||||
backendPartitions);
|
||||
} catch (Throwable t) {
|
||||
LOG.error("errors while finish transaction: {}, publish tasks: {}", transactionState,
|
||||
transactionState.getPublishVersionTasks(), t);
|
||||
}
|
||||
} // end for readyTransactionStates
|
||||
}
|
||||
|
||||
private void tryFinishOneTxn(TransactionState transactionState,
|
||||
SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr,
|
||||
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
|
||||
Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
|
||||
Stream<PublishVersionTask> publishVersionTaskStream = transactionState
|
||||
.getPublishVersionTasks()
|
||||
.values()
|
||||
.stream()
|
||||
.peek(task -> {
|
||||
if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) {
|
||||
Map<Long, Long> tableIdToDeltaNumRows =
|
||||
task.getTableIdToDeltaNumRows();
|
||||
tableIdToDeltaNumRows.forEach((tableId, numRows) -> {
|
||||
tableIdToTotalDeltaNumRows
|
||||
.computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows);
|
||||
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
|
||||
});
|
||||
}
|
||||
});
|
||||
boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
|
||||
.anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId()));
|
||||
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
|
||||
|
||||
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout()
|
||||
|| DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
|
||||
if (shouldFinishTxn) {
|
||||
try {
|
||||
// one transaction exception should not affect other transaction
|
||||
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
|
||||
transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e);
|
||||
}
|
||||
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
|
||||
// if finish transaction state failed, then update publish version time, should check
|
||||
// to finish after some interval
|
||||
transactionState.updateSendTaskTime();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("publish version for transaction {} failed", transactionState);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
|
||||
for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
|
||||
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
|
||||
}
|
||||
transactionState.pruneAfterVisible();
|
||||
if (MetricRepo.isInit) {
|
||||
long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
|
||||
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Map<Long, Set<Long>> getBaseTabletIdsForEachBe(TransactionState transactionState,
|
||||
TableCommitInfo tableCommitInfo) throws MetaNotFoundException {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user