From 9f7a335d02e2d42ee5e4a0b58eaabf0253387c48 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sat, 22 Jun 2019 14:29:16 +0800 Subject: [PATCH] Increase the timeout of publish version task when doing alter job (#1359) The previous setting of timeout of a publish version task is mess. I change it to a configurable time, default it 30 seconds. And when the table is under rollup or schema change, I double this timeout. This a kind of best-effort-optimization. Because with a short timeout, a replica's publish version task is more likely to fail. And if quorum replicas of a tablet fail to publish, the alter job will fail. If the table is not under rollup or schema change, the failure of a replica's publish version task has a minor effect because the replica can be repaired by tablet repair process very soon. But the tablet repair process will not repair rollup replicas. --- .../java/org/apache/doris/common/Config.java | 4 ++-- .../java/org/apache/doris/load/LoadJob.java | 7 ++---- .../transaction/GlobalTransactionMgr.java | 24 ++++++++++++++++--- .../transaction/PublishVersionDaemon.java | 1 + .../doris/transaction/TransactionState.java | 21 ++++++++++++---- 5 files changed, 43 insertions(+), 14 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 822b5eb67a..69d3e413df 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -250,10 +250,10 @@ public class Config extends ConfigBase { public static int tablet_create_timeout_second = 1; /* - * Maximal waiting time for publish version message to backend + * Maximal waiting time for all publish version tasks of one transaction to be finished */ @ConfField(mutable = true, masterOnly = true) - public static int publish_version_timeout_second = 3; + public static int publish_version_timeout_second = 30; // 30 seconds /* * minimal intervals between two publish version action diff --git a/fe/src/main/java/org/apache/doris/load/LoadJob.java b/fe/src/main/java/org/apache/doris/load/LoadJob.java index 1d2a0205fa..e49d3ebf27 100644 --- a/fe/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/LoadJob.java @@ -639,11 +639,8 @@ public class LoadJob implements Writable { } public long getDeleteJobTimeout() { - long timeout = Math.max(idToTabletLoadInfo.size() - * Config.tablet_delete_timeout_second * 1000L, - 60000L); - timeout = Math.min(timeout, 300000L); - return timeout; + return Math.min(idToTabletLoadInfo.size() * Config.tablet_delete_timeout_second * 1000L, + Config.load_straggler_wait_second * 1000L); } @Override diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index dbd711089b..50f63c2046 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -323,6 +323,25 @@ public class GlobalTransactionMgr { rollupJob = (RollupJob) catalog.getRollupHandler().getAlterJob(tableId); rollingUpIndex = rollupJob.getRollupIndex(partition.getId()); } + + if (table.getState() == OlapTableState.ROLLUP || table.getState() == OlapTableState.SCHEMA_CHANGE) { + /* + * This is just a optimization that do our best to not let publish version tasks + * timeout if table is under rollup or schema change. Because with a short + * timeout, a replica's publish version task is more likely to fail. And if + * quorum replicas of a tablet fail to publish, the alter job will fail. + * + * If the table is not under rollup or schema change, the failure of a replica's + * publish version task has a minor effect because the replica can be repaired + * by tablet repair process very soon. But the tablet repair process will not + * repair rollup replicas. + * + * This a kind of best-effort-optimization, if FE restart after commit and + * before publish, this 'prolong' information will be lost. + */ + transactionState.prolongPublishTimeout(); + } + // the rolling up index should also be taken care // if the rollup index failed during load, then set its last failed version // if rollup task finished, it should compare version and last failed version, @@ -343,8 +362,7 @@ public class GlobalTransactionMgr { Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend); if (replica == null) { throw new TransactionCommitFailedException("could not find replica for tablet [" - + tabletId + "], backend [" - + tabletBackend + "]"); + + tabletId + "], backend [" + tabletBackend + "]"); } // if the tablet have no replica's to commit or the tablet is a rolling up tablet, the commit backends maybe null // if the commit backends is null, set all replicas as error replicas @@ -363,7 +381,7 @@ public class GlobalTransactionMgr { LOG.info("the base replica [{}] has error, remove the related rollup replica from rollupjob [{}]", replica, rollupJob); rollupJob.removeReplicaRelatedTask(partition.getId(), - tabletId, replica.getId(), replica.getBackendId()); + tabletId, replica.getId(), replica.getBackendId()); } } } else { diff --git a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 24ee3d2758..f8dd13218b 100644 --- a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -118,6 +118,7 @@ public class PublishVersionDaemon extends Daemon { transactionState.addPublishVersionTask(backendId, task); } transactionState.setHasSendTask(true); + LOG.info("send publish tasks for transaction: {}", transactionState.getTransactionId()); } if (!batchTask.getAllTasks().isEmpty()) { AgentTaskExecutor.submit(batchTask); diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index 41e1832bf2..edc0d42676 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -31,6 +31,9 @@ import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -40,6 +43,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class TransactionState implements Writable { + private static final Logger LOG = LogManager.getLogger(TransactionState.class); public enum LoadJobSourceType { FRONTEND(1), // old dpp load, mini load, insert stmt(not streaming type) use this type @@ -130,6 +134,9 @@ public class TransactionState implements Writable { private long callbackId = -1; private long timeoutMs = Config.stream_load_default_timeout_second; + // is set to true, we will double the publish timeout + private boolean prolongPublishTimeout = false; + // optional private TxnCommitAttachment txnCommitAttachment; @@ -430,13 +437,19 @@ public class TransactionState implements Writable { } public boolean isPublishTimeout() { - // timeout is between 3 to Config.max_txn_publish_waiting_time_ms seconds. - long timeoutMillis = Math.min(Config.publish_version_timeout_second * publishVersionTasks.size() * 1000, - Config.load_straggler_wait_second * 1000); - timeoutMillis = Math.max(timeoutMillis, 3000); + // the max timeout is Config.publish_version_timeout_second * 2; + long timeoutMillis = Config.publish_version_interval_ms; + if (prolongPublishTimeout) { + timeoutMillis *= 2; + } return System.currentTimeMillis() - publishVersionTime > timeoutMillis; } + public void prolongPublishTimeout() { + this.prolongPublishTimeout = true; + LOG.info("prolong the timeout of publish version task for transaction: {}", transactionId); + } + @Override public void write(DataOutput out) throws IOException { out.writeLong(transactionId);