Increase the timeout of publish version task when doing alter job (#1359)
The previous setting of timeout of a publish version task is mess. I change it to a configurable time, default it 30 seconds. And when the table is under rollup or schema change, I double this timeout. This a kind of best-effort-optimization. Because with a short timeout, a replica's publish version task is more likely to fail. And if quorum replicas of a tablet fail to publish, the alter job will fail. If the table is not under rollup or schema change, the failure of a replica's publish version task has a minor effect because the replica can be repaired by tablet repair process very soon. But the tablet repair process will not repair rollup replicas.
This commit is contained in:
@ -250,10 +250,10 @@ public class Config extends ConfigBase {
|
||||
public static int tablet_create_timeout_second = 1;
|
||||
|
||||
/*
|
||||
* Maximal waiting time for publish version message to backend
|
||||
* Maximal waiting time for all publish version tasks of one transaction to be finished
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int publish_version_timeout_second = 3;
|
||||
public static int publish_version_timeout_second = 30; // 30 seconds
|
||||
|
||||
/*
|
||||
* minimal intervals between two publish version action
|
||||
|
||||
@ -639,11 +639,8 @@ public class LoadJob implements Writable {
|
||||
}
|
||||
|
||||
public long getDeleteJobTimeout() {
|
||||
long timeout = Math.max(idToTabletLoadInfo.size()
|
||||
* Config.tablet_delete_timeout_second * 1000L,
|
||||
60000L);
|
||||
timeout = Math.min(timeout, 300000L);
|
||||
return timeout;
|
||||
return Math.min(idToTabletLoadInfo.size() * Config.tablet_delete_timeout_second * 1000L,
|
||||
Config.load_straggler_wait_second * 1000L);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -323,6 +323,25 @@ public class GlobalTransactionMgr {
|
||||
rollupJob = (RollupJob) catalog.getRollupHandler().getAlterJob(tableId);
|
||||
rollingUpIndex = rollupJob.getRollupIndex(partition.getId());
|
||||
}
|
||||
|
||||
if (table.getState() == OlapTableState.ROLLUP || table.getState() == OlapTableState.SCHEMA_CHANGE) {
|
||||
/*
|
||||
* This is just a optimization that do our best to not let publish version tasks
|
||||
* timeout if table is under rollup or schema change. Because with a short
|
||||
* timeout, a replica's publish version task is more likely to fail. And if
|
||||
* quorum replicas of a tablet fail to publish, the alter job will fail.
|
||||
*
|
||||
* If the table is not under rollup or schema change, the failure of a replica's
|
||||
* publish version task has a minor effect because the replica can be repaired
|
||||
* by tablet repair process very soon. But the tablet repair process will not
|
||||
* repair rollup replicas.
|
||||
*
|
||||
* This a kind of best-effort-optimization, if FE restart after commit and
|
||||
* before publish, this 'prolong' information will be lost.
|
||||
*/
|
||||
transactionState.prolongPublishTimeout();
|
||||
}
|
||||
|
||||
// the rolling up index should also be taken care
|
||||
// if the rollup index failed during load, then set its last failed version
|
||||
// if rollup task finished, it should compare version and last failed version,
|
||||
@ -343,8 +362,7 @@ public class GlobalTransactionMgr {
|
||||
Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend);
|
||||
if (replica == null) {
|
||||
throw new TransactionCommitFailedException("could not find replica for tablet ["
|
||||
+ tabletId + "], backend ["
|
||||
+ tabletBackend + "]");
|
||||
+ tabletId + "], backend [" + tabletBackend + "]");
|
||||
}
|
||||
// if the tablet have no replica's to commit or the tablet is a rolling up tablet, the commit backends maybe null
|
||||
// if the commit backends is null, set all replicas as error replicas
|
||||
@ -363,7 +381,7 @@ public class GlobalTransactionMgr {
|
||||
LOG.info("the base replica [{}] has error, remove the related rollup replica from rollupjob [{}]",
|
||||
replica, rollupJob);
|
||||
rollupJob.removeReplicaRelatedTask(partition.getId(),
|
||||
tabletId, replica.getId(), replica.getBackendId());
|
||||
tabletId, replica.getId(), replica.getBackendId());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -118,6 +118,7 @@ public class PublishVersionDaemon extends Daemon {
|
||||
transactionState.addPublishVersionTask(backendId, task);
|
||||
}
|
||||
transactionState.setHasSendTask(true);
|
||||
LOG.info("send publish tasks for transaction: {}", transactionState.getTransactionId());
|
||||
}
|
||||
if (!batchTask.getAllTasks().isEmpty()) {
|
||||
AgentTaskExecutor.submit(batchTask);
|
||||
|
||||
@ -31,6 +31,9 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
@ -40,6 +43,7 @@ import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class TransactionState implements Writable {
|
||||
private static final Logger LOG = LogManager.getLogger(TransactionState.class);
|
||||
|
||||
public enum LoadJobSourceType {
|
||||
FRONTEND(1), // old dpp load, mini load, insert stmt(not streaming type) use this type
|
||||
@ -130,6 +134,9 @@ public class TransactionState implements Writable {
|
||||
private long callbackId = -1;
|
||||
private long timeoutMs = Config.stream_load_default_timeout_second;
|
||||
|
||||
// is set to true, we will double the publish timeout
|
||||
private boolean prolongPublishTimeout = false;
|
||||
|
||||
// optional
|
||||
private TxnCommitAttachment txnCommitAttachment;
|
||||
|
||||
@ -430,13 +437,19 @@ public class TransactionState implements Writable {
|
||||
}
|
||||
|
||||
public boolean isPublishTimeout() {
|
||||
// timeout is between 3 to Config.max_txn_publish_waiting_time_ms seconds.
|
||||
long timeoutMillis = Math.min(Config.publish_version_timeout_second * publishVersionTasks.size() * 1000,
|
||||
Config.load_straggler_wait_second * 1000);
|
||||
timeoutMillis = Math.max(timeoutMillis, 3000);
|
||||
// the max timeout is Config.publish_version_timeout_second * 2;
|
||||
long timeoutMillis = Config.publish_version_interval_ms;
|
||||
if (prolongPublishTimeout) {
|
||||
timeoutMillis *= 2;
|
||||
}
|
||||
return System.currentTimeMillis() - publishVersionTime > timeoutMillis;
|
||||
}
|
||||
|
||||
public void prolongPublishTimeout() {
|
||||
this.prolongPublishTimeout = true;
|
||||
LOG.info("prolong the timeout of publish version task for transaction: {}", transactionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeLong(transactionId);
|
||||
|
||||
Reference in New Issue
Block a user