[Feature] Add an indicator called errorRowsAfterResumed to distinguish between … (#6092)

1. [enhancement] add an indicator called errorRowsAfterResumed to distinguish between totalErrorRows(called errorRows) and errorRowsAfterResumed. (#6092)
2. [Refactor] separate some indicators from RoutineLoadJob class to avoid changing FeMetaVersion while modifying indicators of RoutineLoadJob.(#6092)
This commit is contained in:
Henry2SS
2021-07-17 10:43:59 +08:00
committed by GitHub
parent 8de09cbd21
commit e39e1571ec
7 changed files with 163 additions and 91 deletions

View File

@ -212,6 +212,8 @@ public final class FeMetaVersion {
public static final int VERSION_99 = 99;
// for max query instance
public static final int VERSION_100 = 100;
// add errorRowsAfterResumed to distinguish totalErrorRows and currentErrorRows even if the job is paused.
public static final int VERSION_101 = 101;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_100;
public static final int VERSION_CURRENT = VERSION_101;
}

View File

@ -342,17 +342,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
@Override
protected String getStatistic() {
Map<String, Object> summary = Maps.newHashMap();
summary.put("totalRows", Long.valueOf(totalRows));
summary.put("loadedRows", Long.valueOf(totalRows - errorRows - unselectedRows));
summary.put("errorRows", Long.valueOf(errorRows));
summary.put("unselectedRows", Long.valueOf(unselectedRows));
summary.put("receivedBytes", Long.valueOf(receivedBytes));
summary.put("taskExecuteTimeMs", Long.valueOf(totalTaskExcutionTimeMs));
summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000));
summary.put("loadRowsRate", Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000));
summary.put("committedTaskNum", Long.valueOf(committedTaskNum));
summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum));
Map<String, Object> summary = this.jobStatistic.summary();
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(summary);
}

View File

@ -215,24 +215,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected long pauseTimestamp = -1;
protected long endTimestamp = -1;
/*
* The following variables are for statistics
* currentErrorRows/currentTotalRows: the row statistics of current sampling period
* errorRows/totalRows/receivedBytes: cumulative measurement
* totalTaskExcutorTimeMs: cumulative execution time of tasks
*/
/*
* Rows will be updated after txn state changed when txn state has been successfully changed.
*/
protected long currentErrorRows = 0;
protected long currentTotalRows = 0;
protected long errorRows = 0;
protected long totalRows = 0;
protected long unselectedRows = 0;
protected long receivedBytes = 0;
protected long totalTaskExcutionTimeMs = 1; // init as 1 to avoid division by zero
protected long committedTaskNum = 0;
protected long abortedTaskNum = 0;
protected RoutineLoadStatistic jobStatistic = new RoutineLoadStatistic();
// The tasks belong to this job
protected List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Lists.newArrayList();
@ -710,11 +693,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes,
long taskExecutionTime, boolean isReplay) throws UserException {
this.totalRows += numOfTotalRows;
this.errorRows += numOfErrorRows;
this.unselectedRows += unselectedRows;
this.receivedBytes += receivedBytes;
this.totalTaskExcutionTimeMs += taskExecutionTime;
this.jobStatistic.totalRows += numOfTotalRows;
this.jobStatistic.errorRows += numOfErrorRows;
this.jobStatistic.unselectedRows += unselectedRows;
this.jobStatistic.receivedBytes += receivedBytes;
this.jobStatistic.totalTaskExcutionTimeMs += taskExecutionTime;
if (MetricRepo.isInit && !isReplay) {
MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows);
@ -723,16 +706,17 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
// check error rate
currentErrorRows += numOfErrorRows;
currentTotalRows += numOfTotalRows;
if (currentTotalRows > maxBatchRows * 10) {
if (currentErrorRows > maxErrorNum) {
this.jobStatistic.currentErrorRows += numOfErrorRows;
this.jobStatistic.currentTotalRows += numOfTotalRows;
this.jobStatistic.errorRowsAfterResumed = this.jobStatistic.currentErrorRows;
if (this.jobStatistic.currentTotalRows > maxBatchRows * 10) {
if (this.jobStatistic.currentErrorRows > maxErrorNum) {
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_total_rows", currentTotalRows)
.add("current_error_rows", currentErrorRows)
.add("max_error_num", maxErrorNum)
.add("msg", "current error rows is more than max error num, begin to pause job")
.build());
.add("current_total_rows", this.jobStatistic.currentTotalRows)
.add("current_error_rows", this.jobStatistic.currentErrorRows)
.add("max_error_num", maxErrorNum)
.add("msg", "current error rows is more than max error num, begin to pause job")
.build());
// if this is a replay thread, the update state should already be replayed by OP_CHANGE_ROUTINE_LOAD_JOB
if (!isReplay) {
// remove all of task in jobs and change job state to paused
@ -744,23 +728,23 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_total_rows", currentTotalRows)
.add("current_error_rows", currentErrorRows)
.add("max_error_num", maxErrorNum)
.add("msg", "reset current total rows and current error rows "
+ "when current total rows is more than base")
.build());
.add("current_total_rows", this.jobStatistic.currentTotalRows)
.add("current_error_rows", this.jobStatistic.currentErrorRows)
.add("max_error_num", maxErrorNum)
.add("msg", "reset current total rows and current error rows "
+ "when current total rows is more than base")
.build());
}
// reset currentTotalNum and currentErrorNum
currentErrorRows = 0;
currentTotalRows = 0;
} else if (currentErrorRows > maxErrorNum) {
this.jobStatistic.currentErrorRows = 0;
this.jobStatistic.currentTotalRows = 0;
} else if (this.jobStatistic.currentErrorRows > maxErrorNum) {
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_total_rows", currentTotalRows)
.add("current_error_rows", currentErrorRows)
.add("max_error_num", maxErrorNum)
.add("msg", "current error rows is more than max error rows, begin to pause job")
.build());
.add("current_total_rows", this.jobStatistic.currentTotalRows)
.add("current_error_rows", this.jobStatistic.currentErrorRows)
.add("max_error_num", maxErrorNum)
.add("msg", "current error rows is more than max error rows, begin to pause job")
.build());
if (!isReplay) {
// remove all of task in jobs and change job state to paused
updateState(JobState.PAUSED,
@ -768,8 +752,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
isReplay);
}
// reset currentTotalNum and currentErrorNum
currentErrorRows = 0;
currentTotalRows = 0;
this.jobStatistic.currentErrorRows = 0;
this.jobStatistic.currentTotalRows = 0;
}
}
@ -899,7 +883,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
taskBeId = routineLoadTaskInfo.getBeId();
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null);
++committedTaskNum;
++this.jobStatistic.committedTaskNum;
LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id);
}
} catch (Throwable e) {
@ -918,7 +902,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
public void replayOnCommitted(TransactionState txnState) {
Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState);
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
this.committedTaskNum++;
this.jobStatistic.committedTaskNum++;
LOG.debug("replay on committed: {}", txnState);
}
@ -1009,7 +993,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
.add("msg", "txn abort with reason " + txnStatusChangeReasonString)
.build());
}
++abortedTaskNum;
++this.jobStatistic.abortedTaskNum;
TransactionState.TxnStatusChangeReason txnStatusChangeReason = null;
if (txnStatusChangeReasonString != null) {
txnStatusChangeReason =
@ -1053,7 +1037,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
if (txnState.getTxnCommitAttachment() != null) {
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
}
this.abortedTaskNum++;
this.jobStatistic.abortedTaskNum++;
LOG.debug("replay on aborted: {}, has attachment: {}", txnState, txnState.getTxnCommitAttachment() == null);
}
@ -1513,15 +1497,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
out.writeLong(pauseTimestamp);
out.writeLong(endTimestamp);
out.writeLong(currentErrorRows);
out.writeLong(currentTotalRows);
out.writeLong(errorRows);
out.writeLong(totalRows);
out.writeLong(unselectedRows);
out.writeLong(receivedBytes);
out.writeLong(totalTaskExcutionTimeMs);
out.writeLong(committedTaskNum);
out.writeLong(abortedTaskNum);
this.jobStatistic.write(out);
origStmt.write(out);
out.writeInt(jobProperties.size());
for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
@ -1568,15 +1545,20 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
pauseTimestamp = in.readLong();
endTimestamp = in.readLong();
currentErrorRows = in.readLong();
currentTotalRows = in.readLong();
errorRows = in.readLong();
totalRows = in.readLong();
unselectedRows = in.readLong();
receivedBytes = in.readLong();
totalTaskExcutionTimeMs = in.readLong();
committedTaskNum = in.readLong();
abortedTaskNum = in.readLong();
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_101) {
this.jobStatistic.currentErrorRows = in.readLong();
this.jobStatistic.currentTotalRows = in.readLong();
this.jobStatistic.errorRows = in.readLong();
this.jobStatistic.totalRows = in.readLong();
this.jobStatistic.errorRowsAfterResumed = 0;
this.jobStatistic.unselectedRows = in.readLong();
this.jobStatistic.receivedBytes = in.readLong();
this.jobStatistic.totalTaskExcutionTimeMs = in.readLong();
this.jobStatistic.committedTaskNum = in.readLong();
this.jobStatistic.abortedTaskNum = in.readLong();
} else {
this.jobStatistic = RoutineLoadStatistic.read(in);
}
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) {
String stmt = Text.readString(in);
origStmt = new OriginStatement(stmt, 0);

View File

@ -250,6 +250,7 @@ public class RoutineLoadManager implements Writable {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
resumeRoutineLoadStmt.getName());
routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
routineLoadJob.autoResumeCount = 0;
routineLoadJob.firstResumeTimestamp = 0;
routineLoadJob.autoResumeLock = false;

View File

@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
public class RoutineLoadStatistic implements Writable {
/*
* The following variables are for statistics
* currentErrorRows/currentTotalRows: the row statistics of current sampling period
* errorRowsAfterResumed: currentErrorRows that is showed to users in "show routine load;".
* errorRows/totalRows/receivedBytes: cumulative measurement
* totalTaskExcutorTimeMs: cumulative execution time of tasks
*/
/*
* Rows will be updated after txn state changed when txn state has been successfully changed.
*/
@SerializedName(value = "currentErrorRows")
public long currentErrorRows = 0;
@SerializedName(value = "currentTotalRows")
public long currentTotalRows = 0;
@SerializedName(value = "errorRows")
public long errorRows = 0;
@SerializedName(value = "totalRows")
public long totalRows = 0;
@SerializedName(value = "errorRowsAfterResumed")
public long errorRowsAfterResumed = 0;
@SerializedName(value = "unselectedRows")
public long unselectedRows = 0;
@SerializedName(value = "receivedBytes")
public long receivedBytes = 0;
@SerializedName(value = "totalTaskExcutionTimeMs")
public long totalTaskExcutionTimeMs = 1; // init as 1 to avoid division by zero
@SerializedName(value = "committedTaskNum")
public long committedTaskNum = 0;
@SerializedName(value = "abortedTaskNum")
public long abortedTaskNum = 0;
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static RoutineLoadStatistic read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, RoutineLoadStatistic.class);
}
public Map<String, Object> summary() {
Map<String, Object> summary = Maps.newHashMap();
summary.put("totalRows", Long.valueOf(totalRows));
summary.put("loadedRows", Long.valueOf(totalRows - this.errorRows - this.unselectedRows));
summary.put("errorRows", Long.valueOf(this.errorRows));
summary.put("errorRowsAfterResumed", Long.valueOf(this.errorRowsAfterResumed));
summary.put("unselectedRows", Long.valueOf(this.unselectedRows));
summary.put("receivedBytes", Long.valueOf(this.receivedBytes));
summary.put("taskExecuteTimeMs", Long.valueOf(this.totalTaskExcutionTimeMs));
summary.put("receivedBytesRate", Long.valueOf(this.receivedBytes / this.totalTaskExcutionTimeMs * 1000));
summary.put("loadRowsRate", Long.valueOf((this.totalRows - this.errorRows - this.unselectedRows)
/ this.totalTaskExcutionTimeMs * 1000));
summary.put("committedTaskNum", Long.valueOf(this.committedTaskNum));
summary.put("abortedTaskNum", Long.valueOf(this.abortedTaskNum));
return summary;
}
}

View File

@ -139,9 +139,10 @@ public class RoutineLoadJobTest {
Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList);
Deencapsulation.setField(routineLoadJob, "progress", currentProgress);
routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState());
Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
Assert.assertEquals(new Long(1), Deencapsulation.getField(jobStatistic, "abortedTaskNum"));
}
@Test
@ -279,13 +280,14 @@ public class RoutineLoadJobTest {
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING);
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
Deencapsulation.setField(routineLoadJob, "currentErrorRows", 1);
Deencapsulation.setField(routineLoadJob, "currentTotalRows", 99);
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Deencapsulation.setField(jobStatistic, "currentErrorRows", 1);
Deencapsulation.setField(jobStatistic, "currentTotalRows", 99);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, 1L, false);
Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, Deencapsulation.getField(routineLoadJob, "state"));
Assert.assertEquals(new Long(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
Assert.assertEquals(new Long(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
}

View File

@ -41,6 +41,7 @@ import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.load.routineload.RoutineLoadTaskInfo;
import org.apache.doris.load.routineload.RoutineLoadStatistic;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.persist.EditLog;
import org.apache.doris.thrift.TKafkaRLTaskProgress;
@ -361,9 +362,10 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob,"jobStatistic");
Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
Assert.assertEquals(Long.valueOf(101L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1));
// todo(ml): change to assert queue
// Assert.assertEquals(1, routineLoadManager.getNeedScheduleTasksQueue().size());
@ -430,8 +432,9 @@ public class GlobalTransactionMgrTest {
masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
// current total rows and error rows will be reset after job pause, so here they should be 0.
Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob,"jobStatistic");
Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
Assert.assertEquals(Long.valueOf(111L),
((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1));
// todo(ml): change to assert queue