@ -68,7 +68,6 @@ import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -481,7 +480,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeCallback, Writable
|
||||
this.receivedBytes += receivedBytes;
|
||||
this.totalTaskExcutionTimeMs += taskExecutionTime;
|
||||
|
||||
if (MetricRepo.isInit.get()) {
|
||||
if (MetricRepo.isInit.get() && !isReplay) {
|
||||
MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows);
|
||||
MetricRepo.COUNTER_ROUTINE_LOAD_ERROR_ROWS.increase(numOfErrorRows);
|
||||
MetricRepo.COUNTER_ROUTINE_LOAD_RECEIVED_BYTES.increase(receivedBytes);
|
||||
|
||||
@ -109,7 +109,7 @@ public class RoutineLoadJobTest {
|
||||
routineLoadTaskInfo.getTxnId();
|
||||
result = txnId;
|
||||
transactionState.getTxnCommitAttachment();
|
||||
result = null;
|
||||
result = new RLTaskTxnCommitAttachment();
|
||||
routineLoadTaskInfo.getPartitions();
|
||||
result = Lists.newArrayList();
|
||||
}
|
||||
|
||||
@ -97,10 +97,13 @@ public class RoutineLoadSchedulerTest {
|
||||
result = beIds;
|
||||
routineLoadManager.getSizeOfIdToRoutineLoadTask();
|
||||
result = 1;
|
||||
minTimes = 0;
|
||||
routineLoadManager.getTotalMaxConcurrentTaskNum();
|
||||
result = 10;
|
||||
minTimes = 0;
|
||||
catalog.getRoutineLoadTaskScheduler();
|
||||
result = routineLoadTaskScheduler;
|
||||
minTimes = 0;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user