From e39e1571ecdbe907fc755c01c268784c64c2377b Mon Sep 17 00:00:00 2001 From: Henry2SS <45096548+Henry2SS@users.noreply.github.com> Date: Sat, 17 Jul 2021 10:43:59 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]=20Add=20an=20indicator=20called=20err?= =?UTF-8?q?orRowsAfterResumed=20to=20distinguish=20between=20=E2=80=A6=20(?= =?UTF-8?q?#6092)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. [enhancement] add an indicator called errorRowsAfterResumed to distinguish between totalErrorRows(called errorRows) and errorRowsAfterResumed. (#6092) 2. [Refactor] separate some indicators from RoutineLoadJob class to avoid changing FeMetaVersion while modifying indicators of RoutineLoadJob.(#6092) --- .../apache/doris/common/FeMetaVersion.java | 4 +- .../load/routineload/KafkaRoutineLoadJob.java | 12 +- .../load/routineload/RoutineLoadJob.java | 122 ++++++++---------- .../load/routineload/RoutineLoadManager.java | 1 + .../routineload/RoutineLoadStatistic.java | 92 +++++++++++++ .../load/routineload/RoutineLoadJobTest.java | 12 +- .../transaction/GlobalTransactionMgrTest.java | 11 +- 7 files changed, 163 insertions(+), 91 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 02a7b3063d..17b3e3c37b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -212,6 +212,8 @@ public final class FeMetaVersion { public static final int VERSION_99 = 99; // for max query instance public static final int VERSION_100 = 100; + // add errorRowsAfterResumed to distinguish totalErrorRows and currentErrorRows even if the job is paused. + public static final int VERSION_101 = 101; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_100; + public static final int VERSION_CURRENT = VERSION_101; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index a4e0d8beac..049617377f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -342,17 +342,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { @Override protected String getStatistic() { - Map summary = Maps.newHashMap(); - summary.put("totalRows", Long.valueOf(totalRows)); - summary.put("loadedRows", Long.valueOf(totalRows - errorRows - unselectedRows)); - summary.put("errorRows", Long.valueOf(errorRows)); - summary.put("unselectedRows", Long.valueOf(unselectedRows)); - summary.put("receivedBytes", Long.valueOf(receivedBytes)); - summary.put("taskExecuteTimeMs", Long.valueOf(totalTaskExcutionTimeMs)); - summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000)); - summary.put("loadRowsRate", Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000)); - summary.put("committedTaskNum", Long.valueOf(committedTaskNum)); - summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum)); + Map summary = this.jobStatistic.summary(); Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(summary); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 8213514dcc..a9b7083216 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -215,24 +215,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected long pauseTimestamp = -1; protected long endTimestamp = -1; - /* - * The following variables are for statistics - * currentErrorRows/currentTotalRows: the row statistics of current sampling period - * errorRows/totalRows/receivedBytes: cumulative measurement - * totalTaskExcutorTimeMs: cumulative execution time of tasks - */ - /* - * Rows will be updated after txn state changed when txn state has been successfully changed. - */ - protected long currentErrorRows = 0; - protected long currentTotalRows = 0; - protected long errorRows = 0; - protected long totalRows = 0; - protected long unselectedRows = 0; - protected long receivedBytes = 0; - protected long totalTaskExcutionTimeMs = 1; // init as 1 to avoid division by zero - protected long committedTaskNum = 0; - protected long abortedTaskNum = 0; + protected RoutineLoadStatistic jobStatistic = new RoutineLoadStatistic(); // The tasks belong to this job protected List routineLoadTaskInfoList = Lists.newArrayList(); @@ -710,11 +693,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes, long taskExecutionTime, boolean isReplay) throws UserException { - this.totalRows += numOfTotalRows; - this.errorRows += numOfErrorRows; - this.unselectedRows += unselectedRows; - this.receivedBytes += receivedBytes; - this.totalTaskExcutionTimeMs += taskExecutionTime; + this.jobStatistic.totalRows += numOfTotalRows; + this.jobStatistic.errorRows += numOfErrorRows; + this.jobStatistic.unselectedRows += unselectedRows; + this.jobStatistic.receivedBytes += receivedBytes; + this.jobStatistic.totalTaskExcutionTimeMs += taskExecutionTime; if (MetricRepo.isInit && !isReplay) { MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows); @@ -723,16 +706,17 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } // check error rate - currentErrorRows += numOfErrorRows; - currentTotalRows += numOfTotalRows; - if (currentTotalRows > maxBatchRows * 10) { - if (currentErrorRows > maxErrorNum) { + this.jobStatistic.currentErrorRows += numOfErrorRows; + this.jobStatistic.currentTotalRows += numOfTotalRows; + this.jobStatistic.errorRowsAfterResumed = this.jobStatistic.currentErrorRows; + if (this.jobStatistic.currentTotalRows > maxBatchRows * 10) { + if (this.jobStatistic.currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_rows", currentTotalRows) - .add("current_error_rows", currentErrorRows) - .add("max_error_num", maxErrorNum) - .add("msg", "current error rows is more than max error num, begin to pause job") - .build()); + .add("current_total_rows", this.jobStatistic.currentTotalRows) + .add("current_error_rows", this.jobStatistic.currentErrorRows) + .add("max_error_num", maxErrorNum) + .add("msg", "current error rows is more than max error num, begin to pause job") + .build()); // if this is a replay thread, the update state should already be replayed by OP_CHANGE_ROUTINE_LOAD_JOB if (!isReplay) { // remove all of task in jobs and change job state to paused @@ -744,23 +728,23 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_rows", currentTotalRows) - .add("current_error_rows", currentErrorRows) - .add("max_error_num", maxErrorNum) - .add("msg", "reset current total rows and current error rows " - + "when current total rows is more than base") - .build()); + .add("current_total_rows", this.jobStatistic.currentTotalRows) + .add("current_error_rows", this.jobStatistic.currentErrorRows) + .add("max_error_num", maxErrorNum) + .add("msg", "reset current total rows and current error rows " + + "when current total rows is more than base") + .build()); } // reset currentTotalNum and currentErrorNum - currentErrorRows = 0; - currentTotalRows = 0; - } else if (currentErrorRows > maxErrorNum) { + this.jobStatistic.currentErrorRows = 0; + this.jobStatistic.currentTotalRows = 0; + } else if (this.jobStatistic.currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_rows", currentTotalRows) - .add("current_error_rows", currentErrorRows) - .add("max_error_num", maxErrorNum) - .add("msg", "current error rows is more than max error rows, begin to pause job") - .build()); + .add("current_total_rows", this.jobStatistic.currentTotalRows) + .add("current_error_rows", this.jobStatistic.currentErrorRows) + .add("max_error_num", maxErrorNum) + .add("msg", "current error rows is more than max error rows, begin to pause job") + .build()); if (!isReplay) { // remove all of task in jobs and change job state to paused updateState(JobState.PAUSED, @@ -768,8 +752,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl isReplay); } // reset currentTotalNum and currentErrorNum - currentErrorRows = 0; - currentTotalRows = 0; + this.jobStatistic.currentErrorRows = 0; + this.jobStatistic.currentTotalRows = 0; } } @@ -899,7 +883,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); taskBeId = routineLoadTaskInfo.getBeId(); executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null); - ++committedTaskNum; + ++this.jobStatistic.committedTaskNum; LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id); } } catch (Throwable e) { @@ -918,7 +902,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl public void replayOnCommitted(TransactionState txnState) { Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState); replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); - this.committedTaskNum++; + this.jobStatistic.committedTaskNum++; LOG.debug("replay on committed: {}", txnState); } @@ -1009,7 +993,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl .add("msg", "txn abort with reason " + txnStatusChangeReasonString) .build()); } - ++abortedTaskNum; + ++this.jobStatistic.abortedTaskNum; TransactionState.TxnStatusChangeReason txnStatusChangeReason = null; if (txnStatusChangeReasonString != null) { txnStatusChangeReason = @@ -1053,7 +1037,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl if (txnState.getTxnCommitAttachment() != null) { replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); } - this.abortedTaskNum++; + this.jobStatistic.abortedTaskNum++; LOG.debug("replay on aborted: {}, has attachment: {}", txnState, txnState.getTxnCommitAttachment() == null); } @@ -1513,15 +1497,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl out.writeLong(pauseTimestamp); out.writeLong(endTimestamp); - out.writeLong(currentErrorRows); - out.writeLong(currentTotalRows); - out.writeLong(errorRows); - out.writeLong(totalRows); - out.writeLong(unselectedRows); - out.writeLong(receivedBytes); - out.writeLong(totalTaskExcutionTimeMs); - out.writeLong(committedTaskNum); - out.writeLong(abortedTaskNum); + this.jobStatistic.write(out); + origStmt.write(out); out.writeInt(jobProperties.size()); for (Map.Entry entry : jobProperties.entrySet()) { @@ -1568,15 +1545,20 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl pauseTimestamp = in.readLong(); endTimestamp = in.readLong(); - currentErrorRows = in.readLong(); - currentTotalRows = in.readLong(); - errorRows = in.readLong(); - totalRows = in.readLong(); - unselectedRows = in.readLong(); - receivedBytes = in.readLong(); - totalTaskExcutionTimeMs = in.readLong(); - committedTaskNum = in.readLong(); - abortedTaskNum = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_101) { + this.jobStatistic.currentErrorRows = in.readLong(); + this.jobStatistic.currentTotalRows = in.readLong(); + this.jobStatistic.errorRows = in.readLong(); + this.jobStatistic.totalRows = in.readLong(); + this.jobStatistic.errorRowsAfterResumed = 0; + this.jobStatistic.unselectedRows = in.readLong(); + this.jobStatistic.receivedBytes = in.readLong(); + this.jobStatistic.totalTaskExcutionTimeMs = in.readLong(); + this.jobStatistic.committedTaskNum = in.readLong(); + this.jobStatistic.abortedTaskNum = in.readLong(); + } else { + this.jobStatistic = RoutineLoadStatistic.read(in); + } if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) { String stmt = Text.readString(in); origStmt = new OriginStatement(stmt, 0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 13386842de..05ebeb3519 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -250,6 +250,7 @@ public class RoutineLoadManager implements Writable { RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(), resumeRoutineLoadStmt.getName()); + routineLoadJob.jobStatistic.errorRowsAfterResumed = 0; routineLoadJob.autoResumeCount = 0; routineLoadJob.firstResumeTimestamp = 0; routineLoadJob.autoResumeLock = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java new file mode 100644 index 0000000000..c0b3b06546 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +public class RoutineLoadStatistic implements Writable { + /* + * The following variables are for statistics + * currentErrorRows/currentTotalRows: the row statistics of current sampling period + * errorRowsAfterResumed: currentErrorRows that is showed to users in "show routine load;". + * errorRows/totalRows/receivedBytes: cumulative measurement + * totalTaskExcutorTimeMs: cumulative execution time of tasks + */ + /* + * Rows will be updated after txn state changed when txn state has been successfully changed. + */ + + @SerializedName(value = "currentErrorRows") + public long currentErrorRows = 0; + @SerializedName(value = "currentTotalRows") + public long currentTotalRows = 0; + @SerializedName(value = "errorRows") + public long errorRows = 0; + @SerializedName(value = "totalRows") + public long totalRows = 0; + @SerializedName(value = "errorRowsAfterResumed") + public long errorRowsAfterResumed = 0; + @SerializedName(value = "unselectedRows") + public long unselectedRows = 0; + @SerializedName(value = "receivedBytes") + public long receivedBytes = 0; + @SerializedName(value = "totalTaskExcutionTimeMs") + public long totalTaskExcutionTimeMs = 1; // init as 1 to avoid division by zero + @SerializedName(value = "committedTaskNum") + public long committedTaskNum = 0; + @SerializedName(value = "abortedTaskNum") + public long abortedTaskNum = 0; + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static RoutineLoadStatistic read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, RoutineLoadStatistic.class); + } + + public Map summary() { + Map summary = Maps.newHashMap(); + summary.put("totalRows", Long.valueOf(totalRows)); + summary.put("loadedRows", Long.valueOf(totalRows - this.errorRows - this.unselectedRows)); + summary.put("errorRows", Long.valueOf(this.errorRows)); + summary.put("errorRowsAfterResumed", Long.valueOf(this.errorRowsAfterResumed)); + summary.put("unselectedRows", Long.valueOf(this.unselectedRows)); + summary.put("receivedBytes", Long.valueOf(this.receivedBytes)); + summary.put("taskExecuteTimeMs", Long.valueOf(this.totalTaskExcutionTimeMs)); + summary.put("receivedBytesRate", Long.valueOf(this.receivedBytes / this.totalTaskExcutionTimeMs * 1000)); + summary.put("loadRowsRate", Long.valueOf((this.totalRows - this.errorRows - this.unselectedRows) + / this.totalTaskExcutionTimeMs * 1000)); + summary.put("committedTaskNum", Long.valueOf(this.committedTaskNum)); + summary.put("abortedTaskNum", Long.valueOf(this.abortedTaskNum)); + return summary; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 8f049ba9dc..23aeb17688 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -139,9 +139,10 @@ public class RoutineLoadJobTest { Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList); Deencapsulation.setField(routineLoadJob, "progress", currentProgress); routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString); + RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic"); Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState()); - Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum")); + Assert.assertEquals(new Long(1), Deencapsulation.getField(jobStatistic, "abortedTaskNum")); } @Test @@ -279,13 +280,14 @@ public class RoutineLoadJobTest { Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10); - Deencapsulation.setField(routineLoadJob, "currentErrorRows", 1); - Deencapsulation.setField(routineLoadJob, "currentTotalRows", 99); + RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic"); + Deencapsulation.setField(jobStatistic, "currentErrorRows", 1); + Deencapsulation.setField(jobStatistic, "currentTotalRows", 99); Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, 1L, false); Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, Deencapsulation.getField(routineLoadJob, "state")); - Assert.assertEquals(new Long(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); - Assert.assertEquals(new Long(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); + Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentErrorRows")); + Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentTotalRows")); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index d891f48c7b..6efa5b7d7e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -41,6 +41,7 @@ import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.load.routineload.RoutineLoadTaskInfo; +import org.apache.doris.load.routineload.RoutineLoadStatistic; import org.apache.doris.meta.MetaContext; import org.apache.doris.persist.EditLog; import org.apache.doris.thrift.TKafkaRLTaskProgress; @@ -361,9 +362,10 @@ public class GlobalTransactionMgrTest { Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); Table testTable1 = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1); masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); + RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob,"jobStatistic"); - Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); - Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); + Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows")); + Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(jobStatistic, "currentErrorRows")); Assert.assertEquals(Long.valueOf(101L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); // todo(ml): change to assert queue // Assert.assertEquals(1, routineLoadManager.getNeedScheduleTasksQueue().size()); @@ -430,8 +432,9 @@ public class GlobalTransactionMgrTest { masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); // current total rows and error rows will be reset after job pause, so here they should be 0. - Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); - Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); + RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob,"jobStatistic"); + Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(jobStatistic, "currentTotalRows")); + Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(jobStatistic, "currentErrorRows")); Assert.assertEquals(Long.valueOf(111L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); // todo(ml): change to assert queue