This reverts commit 0d0f787d3e9901192a403d5eb61ea58c8ea17a8e. Co-authored-by: stephen <hello-stephen@qq.com>
This commit is contained in:
@ -227,7 +227,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
((KafkaProgress) progress).getOffsetByPartition(kafkaPartition));
|
||||
}
|
||||
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id,
|
||||
maxBatchIntervalS * 2 * 1000, 0, taskKafkaProgress, isMultiTable());
|
||||
maxBatchIntervalS * 2 * 1000, taskKafkaProgress, isMultiTable());
|
||||
routineLoadTaskInfoList.add(kafkaTaskInfo);
|
||||
result.add(kafkaTaskInfo);
|
||||
}
|
||||
|
||||
@ -48,16 +48,14 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
|
||||
private Map<Integer, Long> partitionIdToOffset;
|
||||
|
||||
public KafkaTaskInfo(UUID id, long jobId,
|
||||
long timeoutMs, int timeoutBackOffCount,
|
||||
Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
|
||||
super(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
|
||||
long timeoutMs, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
|
||||
super(id, jobId, timeoutMs, isMultiTable);
|
||||
this.partitionIdToOffset = partitionIdToOffset;
|
||||
}
|
||||
|
||||
public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
|
||||
super(UUID.randomUUID(), kafkaTaskInfo.getJobId(),
|
||||
kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(),
|
||||
kafkaTaskInfo.getBeId(), isMultiTable);
|
||||
kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable);
|
||||
this.partitionIdToOffset = partitionIdToOffset;
|
||||
}
|
||||
|
||||
@ -133,11 +131,6 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
|
||||
TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId);
|
||||
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
|
||||
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
|
||||
// it needs update timeout to make task timeout backoff work
|
||||
long timeoutS = this.getTimeoutMs() / 1000;
|
||||
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
|
||||
tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS);
|
||||
tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS);
|
||||
|
||||
long wgId = routineLoadJob.getWorkloadId();
|
||||
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
|
||||
@ -160,11 +153,6 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
|
||||
TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId);
|
||||
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
|
||||
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
|
||||
// it needs update timeout to make task timeout backoff work
|
||||
long timeoutS = this.getTimeoutMs() / 1000;
|
||||
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
|
||||
tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS);
|
||||
tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS);
|
||||
|
||||
long wgId = routineLoadJob.getWorkloadId();
|
||||
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
|
||||
|
||||
@ -76,19 +76,17 @@ public abstract class RoutineLoadTaskInfo {
|
||||
// so that user or other logic can know the status of the corresponding txn.
|
||||
protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
|
||||
|
||||
public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs,
|
||||
int timeoutBackOffCount, boolean isMultiTable) {
|
||||
public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean isMultiTable) {
|
||||
this.id = id;
|
||||
this.jobId = jobId;
|
||||
this.createTimeMs = System.currentTimeMillis();
|
||||
this.timeoutMs = timeoutMs;
|
||||
this.timeoutBackOffCount = timeoutBackOffCount;
|
||||
this.isMultiTable = isMultiTable;
|
||||
}
|
||||
|
||||
public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, int timeoutBackOffCount,
|
||||
long previousBeId, boolean isMultiTable) {
|
||||
this(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
|
||||
public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long previousBeId,
|
||||
boolean isMultiTable) {
|
||||
this(id, jobId, timeoutMs, isMultiTable);
|
||||
this.previousBeId = previousBeId;
|
||||
}
|
||||
|
||||
@ -151,8 +149,8 @@ public abstract class RoutineLoadTaskInfo {
|
||||
}
|
||||
|
||||
if (isRunning() && System.currentTimeMillis() - executeStartTimeMs > timeoutMs) {
|
||||
LOG.info("task {} is timeout. start: {}, timeout: {}, timeoutBackOffCount: {}", DebugUtil.printId(id),
|
||||
executeStartTimeMs, timeoutMs, timeoutBackOffCount);
|
||||
LOG.info("task {} is timeout. start: {}, timeout: {}", DebugUtil.printId(id),
|
||||
executeStartTimeMs, timeoutMs);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
||||
@ -221,7 +221,7 @@ public class KafkaRoutineLoadJobTest {
|
||||
Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
|
||||
partitionIdsToOffset.put(100, 0L);
|
||||
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L,
|
||||
maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false);
|
||||
maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
|
||||
kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1);
|
||||
routineLoadTaskInfoList.add(kafkaTaskInfo);
|
||||
|
||||
|
||||
@ -68,7 +68,7 @@ public class RoutineLoadTaskSchedulerTest {
|
||||
Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset);
|
||||
|
||||
LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = new LinkedBlockingDeque<>();
|
||||
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000, 0,
|
||||
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000,
|
||||
partitionIdToOffset, false);
|
||||
routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1);
|
||||
|
||||
|
||||
@ -318,7 +318,7 @@ public class GlobalTransactionMgrTest {
|
||||
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
|
||||
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
|
||||
partitionIdToOffset.put(1, 0L);
|
||||
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0,
|
||||
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
|
||||
partitionIdToOffset, false);
|
||||
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
|
||||
routineLoadTaskInfoList.add(routineLoadTaskInfo);
|
||||
@ -390,7 +390,7 @@ public class GlobalTransactionMgrTest {
|
||||
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
|
||||
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
|
||||
partitionIdToOffset.put(1, 0L);
|
||||
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0,
|
||||
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
|
||||
partitionIdToOffset, false);
|
||||
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
|
||||
routineLoadTaskInfoList.add(routineLoadTaskInfo);
|
||||
|
||||
Reference in New Issue
Block a user