[branch-2.1](routine-load) self-adaption backoff timeout (#32734)

* [opt](routine-load) self-adaption backoff timeout (#32227)

* [fix](routine-load) fix timeout backoff can not work (#32661)
This commit is contained in:
HHoflittlefish777
2024-03-24 18:39:58 +08:00
committed by GitHub
parent 55ae41000f
commit 1b0cd4a4db
8 changed files with 73 additions and 15 deletions

View File

@ -35,7 +35,8 @@ public enum InternalErrorCode {
TOO_MANY_FAILURE_ROWS_ERR(102),
CREATE_TASKS_ERR(103),
TASKS_ABORT_ERR(104),
CANNOT_RESUME_ERR(105);
CANNOT_RESUME_ERR(105),
TIMEOUT_TOO_MUCH(106);
private long errCode;

View File

@ -227,7 +227,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
((KafkaProgress) progress).getOffsetByPartition(kafkaPartition));
}
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id,
maxBatchIntervalS * 2 * 1000, taskKafkaProgress, isMultiTable());
maxBatchIntervalS * 2 * 1000, 0, taskKafkaProgress, isMultiTable());
routineLoadTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);
}

View File

@ -48,14 +48,16 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private Map<Integer, Long> partitionIdToOffset;
public KafkaTaskInfo(UUID id, long jobId,
long timeoutMs, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
super(id, jobId, timeoutMs, isMultiTable);
long timeoutMs, int timeoutBackOffCount,
Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
super(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
this.partitionIdToOffset = partitionIdToOffset;
}
public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
super(UUID.randomUUID(), kafkaTaskInfo.getJobId(),
kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable);
kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(),
kafkaTaskInfo.getBeId(), isMultiTable);
this.partitionIdToOffset = partitionIdToOffset;
}
@ -131,6 +133,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
// it needs update timeout to make task timeout backoff work
long timeoutS = this.getTimeoutMs() / 1000;
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS);
tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS);
long wgId = routineLoadJob.getWorkloadId();
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
@ -153,6 +160,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
// it needs update timeout to make task timeout backoff work
long timeoutS = this.getTimeoutMs() / 1000;
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS);
tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS);
long wgId = routineLoadJob.getWorkloadId();
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();

View File

@ -741,6 +741,18 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// and after renew, the previous task is removed from routineLoadTaskInfoList,
// so task can no longer be committed successfully.
// the already committed task will not be handled here.
int timeoutBackOffCount = routineLoadTaskInfo.getTimeoutBackOffCount();
if (timeoutBackOffCount > RoutineLoadTaskInfo.MAX_TIMEOUT_BACK_OFF_COUNT) {
try {
updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TIMEOUT_TOO_MUCH,
"task " + routineLoadTaskInfo.getId() + " timeout too much"), false);
} catch (UserException e) {
LOG.warn("update job state to pause failed", e);
}
return;
}
routineLoadTaskInfo.setTimeoutBackOffCount(timeoutBackOffCount + 1);
routineLoadTaskInfo.setTimeoutMs((routineLoadTaskInfo.getTimeoutMs() << 1));
RoutineLoadTaskInfo newTask = unprotectRenewTask(routineLoadTaskInfo);
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask);
}
@ -1267,6 +1279,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
} else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) {
// step2: update job progress
updateProgress(rlTaskTxnCommitAttachment);
routineLoadTaskInfo.selfAdaptTimeout(rlTaskTxnCommitAttachment);
}
if (rlTaskTxnCommitAttachment != null && !Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) {

View File

@ -72,21 +72,26 @@ public abstract class RoutineLoadTaskInfo {
protected boolean isMultiTable = false;
protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3;
protected int timeoutBackOffCount = 0;
// this status will be set when corresponding transaction's status is changed.
// so that user or other logic can know the status of the corresponding txn.
protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean isMultiTable) {
public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs,
int timeoutBackOffCount, boolean isMultiTable) {
this.id = id;
this.jobId = jobId;
this.createTimeMs = System.currentTimeMillis();
this.timeoutMs = timeoutMs;
this.timeoutBackOffCount = timeoutBackOffCount;
this.isMultiTable = isMultiTable;
}
public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long previousBeId,
boolean isMultiTable) {
this(id, jobId, timeoutMs, isMultiTable);
public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, int timeoutBackOffCount,
long previousBeId, boolean isMultiTable) {
this(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
this.previousBeId = previousBeId;
}
@ -130,6 +135,10 @@ public abstract class RoutineLoadTaskInfo {
this.lastScheduledTime = lastScheduledTime;
}
public void setTimeoutMs(long timeoutMs) {
this.timeoutMs = timeoutMs;
}
public long getTimeoutMs() {
return timeoutMs;
}
@ -142,6 +151,14 @@ public abstract class RoutineLoadTaskInfo {
return txnStatus;
}
public void setTimeoutBackOffCount(int timeoutBackOffCount) {
this.timeoutBackOffCount = timeoutBackOffCount;
}
public int getTimeoutBackOffCount() {
return timeoutBackOffCount;
}
public boolean isTimeout() {
if (txnStatus == TransactionStatus.COMMITTED || txnStatus == TransactionStatus.VISIBLE) {
// the corresponding txn is already finished, this task can not be treated as timeout.
@ -149,13 +166,28 @@ public abstract class RoutineLoadTaskInfo {
}
if (isRunning() && System.currentTimeMillis() - executeStartTimeMs > timeoutMs) {
LOG.info("task {} is timeout. start: {}, timeout: {}", DebugUtil.printId(id),
executeStartTimeMs, timeoutMs);
LOG.info("task {} is timeout. start: {}, timeout: {}, timeoutBackOffCount: {}", DebugUtil.printId(id),
executeStartTimeMs, timeoutMs, timeoutBackOffCount);
return true;
}
return false;
}
public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
long taskExecutionTime = rlTaskTxnCommitAttachment.getTaskExecutionTimeMs();
long timeoutMs = this.timeoutMs;
while (this.timeoutBackOffCount > 0) {
timeoutMs = timeoutMs >> 1;
if (timeoutMs <= taskExecutionTime) {
this.timeoutMs = timeoutMs << 1;
return;
}
this.timeoutBackOffCount--;
}
this.timeoutMs = timeoutMs;
}
abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;
// begin the txn of this task

View File

@ -221,7 +221,7 @@ public class KafkaRoutineLoadJobTest {
Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
partitionIdsToOffset.put(100, 0L);
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L,
maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false);
kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1);
routineLoadTaskInfoList.add(kafkaTaskInfo);

View File

@ -68,7 +68,7 @@ public class RoutineLoadTaskSchedulerTest {
Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset);
LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = new LinkedBlockingDeque<>();
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000,
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000, 0,
partitionIdToOffset, false);
routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1);

View File

@ -318,7 +318,7 @@ public class GlobalTransactionMgrTest {
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0,
partitionIdToOffset, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
@ -390,7 +390,7 @@ public class GlobalTransactionMgrTest {
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0,
partitionIdToOffset, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);