[Bug] Fix bug that routine load may lost some data (#5093)
In the previous implementation, whether a subtask is in commit or abort state, we will try to update the job progress, such as the consumed offset of kafka. Under normal circumstances, the aborted transaction does not consume any data, and all progress is 0, so even we update the progress, the progress will remain unchanged. However, in the case of high cluster load, the subtask may fail half of the execution on the BE side. At this time, although the task is aborted, part of the progress is updated. Cause the next subtask to skip these data for consumption, resulting in data loss.
This commit is contained in:
@ -28,11 +28,11 @@ import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TTabletStat;
|
||||
import org.apache.doris.thrift.TTabletStatResult;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -61,7 +61,7 @@ public class TabletStatMgr extends MasterDaemon {
|
||||
client = ClientPool.backendPool.borrowObject(address);
|
||||
TTabletStatResult result = client.getTabletStat();
|
||||
|
||||
LOG.info("get tablet stat from backend: {}, num: {}", backend.getId(), result.getTabletsStatsSize());
|
||||
LOG.debug("get tablet stat from backend: {}, num: {}", backend.getId(), result.getTabletsStatsSize());
|
||||
updateTabletStat(backend.getId(), result);
|
||||
|
||||
ok = true;
|
||||
@ -112,7 +112,7 @@ public class TabletStatMgr extends MasterDaemon {
|
||||
index.setRowCount(indexRowCount);
|
||||
} // end for indices
|
||||
} // end for partitions
|
||||
LOG.info("finished to set row num for table: {} in database: {}",
|
||||
LOG.debug("finished to set row num for table: {} in database: {}",
|
||||
table.getName(), db.getFullName());
|
||||
}
|
||||
} finally {
|
||||
|
||||
@ -68,7 +68,8 @@ public class Config extends ConfigBase {
|
||||
public static String sys_log_dir = PaloFe.DORIS_HOME_DIR + "/log";
|
||||
@ConfField public static String sys_log_level = "INFO";
|
||||
@ConfField public static int sys_log_roll_num = 10;
|
||||
@ConfField public static String[] sys_log_verbose_modules = {"org.apache.thrift", "org.apache.doris.thrift", "org.apache.doris.http", "org.apache.doris.service.FrontendServiceImpl"};
|
||||
@ConfField
|
||||
public static String[] sys_log_verbose_modules = {};
|
||||
@ConfField public static String sys_log_roll_interval = "DAY";
|
||||
@ConfField public static String sys_log_delete_age = "7d";
|
||||
@Deprecated
|
||||
|
||||
@ -19,8 +19,12 @@ package org.apache.doris.load.routineload;
|
||||
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.thrift.TKafkaRLTaskProgress;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
@ -38,6 +42,8 @@ import java.util.Map;
|
||||
*/
|
||||
// {"partitionIdToOffset": {}}
|
||||
public class KafkaProgress extends RoutineLoadProgress {
|
||||
private static final Logger LOG = LogManager.getLogger(KafkaProgress.class);
|
||||
|
||||
public static final String OFFSET_BEGINNING = "OFFSET_BEGINNING"; // -2
|
||||
public static final String OFFSET_END = "OFFSET_END"; // -1
|
||||
// OFFSET_ZERO is just for show info, if user specified offset is 0
|
||||
@ -47,7 +53,7 @@ public class KafkaProgress extends RoutineLoadProgress {
|
||||
public static final long OFFSET_END_VAL = -1;
|
||||
|
||||
// (partition id, begin offset)
|
||||
// the offset the next msg to be consumed
|
||||
// the offset saved here is the next offset need to be consumed
|
||||
private Map<Integer, Long> partitionIdToOffset = Maps.newConcurrentMap();
|
||||
|
||||
public KafkaProgress() {
|
||||
@ -101,12 +107,13 @@ public class KafkaProgress extends RoutineLoadProgress {
|
||||
} else if (entry.getValue() == -2) {
|
||||
showPartitionIdToOffset.put(entry.getKey(), OFFSET_BEGINNING);
|
||||
} else {
|
||||
// The offset saved in partitionIdToOffset is the next offset to be consumed.
|
||||
// So here we minus 1 to return the "already consumed" offset.
|
||||
showPartitionIdToOffset.put(entry.getKey(), "" + (entry.getValue() - 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// modify the partition offset of this progress.
|
||||
// throw exception is the specified partition does not exist in progress.
|
||||
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
|
||||
@ -138,11 +145,13 @@ public class KafkaProgress extends RoutineLoadProgress {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(RoutineLoadProgress progress) {
|
||||
KafkaProgress newProgress = (KafkaProgress) progress;
|
||||
public void update(RLTaskTxnCommitAttachment attachment) {
|
||||
KafkaProgress newProgress = (KafkaProgress) attachment.getProgress();
|
||||
// + 1 to point to the next msg offset to be consumed
|
||||
newProgress.partitionIdToOffset.entrySet().stream()
|
||||
.forEach(entity -> this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1));
|
||||
LOG.debug("update kafka progress: {}, task: {}, job: {}",
|
||||
newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -42,17 +42,19 @@ import org.apache.doris.common.util.SmallFileMgr;
|
||||
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
|
||||
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionStatus;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
@ -145,7 +147,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException {
|
||||
List<RoutineLoadTaskInfo> result = new ArrayList<>();
|
||||
@ -198,46 +199,45 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
return currentTaskConcurrentNum;
|
||||
}
|
||||
|
||||
// case1: BE execute the task successfully and commit it to FE, but failed on FE(such as db renamed, not found),
|
||||
// after commit failed, BE try to rollback this txn, and loaded rows in its attachment is larger than 0.
|
||||
// In this case, FE should not update the progress.
|
||||
//
|
||||
// case2: partitionIdToOffset must be not empty when loaded rows > 0
|
||||
// be commit txn but fe throw error when committing txn,
|
||||
// fe rollback txn without partitionIdToOffset by itself
|
||||
// this task should not be commit
|
||||
// otherwise currentErrorNum and currentTotalNum is updated when progress is not updated
|
||||
// Through the transaction status and attachment information, to determine whether the progress needs to be updated.
|
||||
@Override
|
||||
protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
|
||||
TransactionStatus txnStatus) {
|
||||
if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && txnStatus == TransactionStatus.ABORTED) {
|
||||
// case 1
|
||||
return false;
|
||||
TransactionState txnState,
|
||||
TransactionState.TxnStatusChangeReason txnStatusChangeReason) {
|
||||
if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED) {
|
||||
// For committed txn, update the progress.
|
||||
return true;
|
||||
}
|
||||
|
||||
if (rlTaskTxnCommitAttachment.getLoadedRows() > 0
|
||||
&& (!((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).hasPartition())) {
|
||||
// case 2
|
||||
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()))
|
||||
.add("job_id", id)
|
||||
.add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows())
|
||||
.add("progress_partition_offset_size", 0)
|
||||
.add("msg", "commit attachment info is incorrect"));
|
||||
return false;
|
||||
if (txnStatusChangeReason != null && txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) {
|
||||
// Because the max_filter_ratio of routine load task is always 1.
|
||||
// Therefore, under normal circumstances, routine load task will not return the error "too many filtered rows".
|
||||
// If no data is imported, the error "all partitions have no load data" may only be returned.
|
||||
// In this case, the status of the transaction is ABORTED,
|
||||
// but we still need to update the offset to skip these error lines.
|
||||
Preconditions.checkState(txnState.getTransactionStatus() == TransactionStatus.ABORTED, txnState.getTransactionStatus());
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
|
||||
// Running here, the status of the transaction should be ABORTED,
|
||||
// and it is caused by other errors. In this case, we should not update the offset.
|
||||
LOG.debug("no need to update the progress of kafka routine load. txn status: {}, " +
|
||||
"txnStatusChangeReason: {}, task: {}, job: {}",
|
||||
txnState.getTransactionStatus(), txnStatusChangeReason,
|
||||
DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()), id);
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
|
||||
super.updateProgress(attachment);
|
||||
this.progress.update(attachment.getProgress());
|
||||
this.progress.update(attachment);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
|
||||
super.replayUpdateProgress(attachment);
|
||||
this.progress.update(attachment.getProgress());
|
||||
this.progress.update(attachment);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -67,6 +67,10 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment {
|
||||
}
|
||||
}
|
||||
|
||||
public long getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
public TUniqueId getTaskId() {
|
||||
return taskId;
|
||||
}
|
||||
|
||||
@ -69,6 +69,10 @@ import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
|
||||
import org.apache.doris.transaction.TransactionException;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionStatus;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
@ -77,8 +81,6 @@ import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
@ -878,8 +880,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
|
||||
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
|
||||
taskBeId = routineLoadTaskInfo.getBeId();
|
||||
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED);
|
||||
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null);
|
||||
++committedTaskNum;
|
||||
LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("after committed failed", e);
|
||||
@ -989,8 +992,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
.build());
|
||||
}
|
||||
++abortedTaskNum;
|
||||
TransactionState.TxnStatusChangeReason txnStatusChangeReason = null;
|
||||
if (txnStatusChangeReasonString != null) {
|
||||
TransactionState.TxnStatusChangeReason txnStatusChangeReason =
|
||||
txnStatusChangeReason =
|
||||
TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString);
|
||||
if (txnStatusChangeReason != null) {
|
||||
switch (txnStatusChangeReason) {
|
||||
@ -1009,7 +1013,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
// TODO(ml): use previous be id depend on change reason
|
||||
}
|
||||
// step2: commit task , update progress, maybe create a new task
|
||||
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED);
|
||||
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED, txnStatusChangeReason);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String msg = "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage();
|
||||
@ -1037,7 +1041,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
|
||||
// check task exists or not before call method
|
||||
private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState,
|
||||
TransactionStatus txnStatus) throws UserException {
|
||||
TransactionStatus txnStatus, TransactionState.TxnStatusChangeReason txnStatusChangeReason) throws UserException {
|
||||
// step0: get progress from transaction state
|
||||
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment();
|
||||
if (rlTaskTxnCommitAttachment == null) {
|
||||
@ -1049,7 +1053,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
+ " maybe task was aborted by master when timeout")
|
||||
.build());
|
||||
}
|
||||
} else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState.getTransactionStatus())) {
|
||||
} else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) {
|
||||
// step2: update job progress
|
||||
updateProgress(rlTaskTxnCommitAttachment);
|
||||
}
|
||||
@ -1256,7 +1260,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
|
||||
// check the correctness of commit info
|
||||
protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
|
||||
TransactionStatus txnStatus);
|
||||
TransactionState txnState,
|
||||
TransactionState.TxnStatusChangeReason txnStatusChangeReason);
|
||||
|
||||
protected abstract String getStatistic();
|
||||
|
||||
|
||||
@ -37,7 +37,7 @@ public abstract class RoutineLoadProgress implements Writable {
|
||||
this.loadDataSourceType = loadDataSourceType;
|
||||
}
|
||||
|
||||
abstract void update(RoutineLoadProgress progress);
|
||||
abstract void update(RLTaskTxnCommitAttachment attachment);
|
||||
|
||||
abstract String toJsonString();
|
||||
|
||||
|
||||
@ -36,13 +36,13 @@ import org.apache.doris.thrift.TRoutineLoadTask;
|
||||
import org.apache.doris.thrift.TStatus;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Queues;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
@ -130,6 +130,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
|
||||
if (!allocateTaskToBe(routineLoadTaskInfo)) {
|
||||
// allocate failed, push it back to the queue to wait next scheduling
|
||||
needScheduleTasksQueue.put(routineLoadTaskInfo);
|
||||
return;
|
||||
}
|
||||
} catch (UserException e) {
|
||||
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).
|
||||
@ -152,6 +153,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
|
||||
// set BE id to -1 to release the BE slot
|
||||
routineLoadTaskInfo.setBeId(-1);
|
||||
needScheduleTasksQueue.put(routineLoadTaskInfo);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// exception happens, PAUSE the job
|
||||
@ -196,6 +198,10 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
|
||||
submitTask(routineLoadTaskInfo.getBeId(), tRoutineLoadTask);
|
||||
LOG.debug("send routine load task cost(ms): {}, job id: {}",
|
||||
(System.currentTimeMillis() - startTime), routineLoadTaskInfo.getJobId());
|
||||
if (tRoutineLoadTask.isSetKafkaLoadInfo()) {
|
||||
LOG.debug("send kafka routine load task {} with partition offset: {}, job: {}",
|
||||
tRoutineLoadTask.label, tRoutineLoadTask.kafka_load_info.partition_begin_offset, tRoutineLoadTask.getJobId());
|
||||
}
|
||||
} catch (LoadException e) {
|
||||
// submit task failed (such as TOO_MANY_TASKS error), but txn has already begun.
|
||||
// Here we will still set the ExecuteStartTime of this task, which means
|
||||
|
||||
@ -54,6 +54,10 @@ import org.apache.doris.task.PublishVersionTask;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -61,10 +65,6 @@ import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
@ -291,7 +291,8 @@ public class DatabaseTransactionMgr {
|
||||
checkRunningTxnExceedLimit(sourceType);
|
||||
|
||||
long tid = idGenerator.getNextTransactionId();
|
||||
LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator);
|
||||
LOG.info("begin transaction: txn id {} with label {} from coordinator {}, listner id: {}",
|
||||
tid, label, coordinator, listenerId);
|
||||
TransactionState transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType,
|
||||
coordinator, listenerId, timeoutSecond * 1000);
|
||||
transactionState.setPrepareTime(System.currentTimeMillis());
|
||||
|
||||
@ -100,7 +100,8 @@ public class TransactionState implements Writable {
|
||||
DB_DROPPED,
|
||||
TIMEOUT,
|
||||
OFFSET_OUT_OF_RANGE,
|
||||
PAUSE;
|
||||
PAUSE,
|
||||
NO_PARTITIONS;
|
||||
|
||||
public static TxnStatusChangeReason fromString(String reasonString) {
|
||||
for (TxnStatusChangeReason txnStatusChangeReason : TxnStatusChangeReason.values()) {
|
||||
@ -116,6 +117,8 @@ public class TransactionState implements Writable {
|
||||
switch (this) {
|
||||
case OFFSET_OUT_OF_RANGE:
|
||||
return "Offset out of range";
|
||||
case NO_PARTITIONS:
|
||||
return "all partitions have no load data";
|
||||
default:
|
||||
return this.name();
|
||||
}
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.load.routineload;
|
||||
|
||||
|
||||
import org.apache.doris.analysis.CreateRoutineLoadStmt;
|
||||
import org.apache.doris.analysis.SqlParser;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
@ -28,16 +27,18 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.common.util.KafkaUtil;
|
||||
import org.apache.doris.persist.EditLog;
|
||||
import org.apache.doris.thrift.TKafkaRLTaskProgress;
|
||||
import org.apache.doris.transaction.TransactionException;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -96,12 +97,19 @@ public class RoutineLoadJobTest {
|
||||
|
||||
@Test
|
||||
public void testAfterAborted(@Injectable TransactionState transactionState,
|
||||
@Injectable KafkaTaskInfo routineLoadTaskInfo,
|
||||
@Injectable KafkaProgress progress) throws UserException {
|
||||
@Injectable KafkaTaskInfo routineLoadTaskInfo) throws UserException {
|
||||
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Lists.newArrayList();
|
||||
routineLoadTaskInfoList.add(routineLoadTaskInfo);
|
||||
long txnId = 1L;
|
||||
|
||||
RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment();
|
||||
TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress();
|
||||
tKafkaRLTaskProgress.partitionCmtOffset = Maps.newHashMap();
|
||||
KafkaProgress kafkaProgress = new KafkaProgress(tKafkaRLTaskProgress);
|
||||
Deencapsulation.setField(attachment, "progress", kafkaProgress);
|
||||
|
||||
KafkaProgress currentProgress = new KafkaProgress(tKafkaRLTaskProgress);
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
transactionState.getTransactionId();
|
||||
@ -112,7 +120,7 @@ public class RoutineLoadJobTest {
|
||||
result = txnId;
|
||||
transactionState.getTxnCommitAttachment();
|
||||
minTimes = 0;
|
||||
result = new RLTaskTxnCommitAttachment();
|
||||
result = attachment;
|
||||
routineLoadTaskInfo.getPartitions();
|
||||
minTimes = 0;
|
||||
result = Lists.newArrayList();
|
||||
@ -129,7 +137,7 @@ public class RoutineLoadJobTest {
|
||||
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
|
||||
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING);
|
||||
Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList);
|
||||
Deencapsulation.setField(routineLoadJob, "progress", progress);
|
||||
Deencapsulation.setField(routineLoadJob, "progress", currentProgress);
|
||||
routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);
|
||||
|
||||
Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState());
|
||||
|
||||
@ -17,10 +17,6 @@
|
||||
|
||||
package org.apache.doris.transaction;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.CatalogTestUtil;
|
||||
import org.apache.doris.catalog.FakeCatalog;
|
||||
@ -49,31 +45,31 @@ import org.apache.doris.thrift.TLoadSourceType;
|
||||
import org.apache.doris.thrift.TRLTaskTxnCommitAttachment;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
|
||||
import org.apache.doris.transaction.TransactionState.TxnSourceType;
|
||||
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.doris.transaction.TransactionState.TxnSourceType;
|
||||
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import mockit.Injectable;
|
||||
import mockit.Mocked;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import mockit.Injectable;
|
||||
import mockit.Mocked;
|
||||
|
||||
public class GlobalTransactionMgrTest {
|
||||
|
||||
private static FakeEditLog fakeEditLog;
|
||||
@ -345,7 +341,7 @@ public class GlobalTransactionMgrTest {
|
||||
rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA);
|
||||
TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress();
|
||||
Map<Integer, Long> kafkaProgress = Maps.newHashMap();
|
||||
kafkaProgress.put(1, 10L);
|
||||
kafkaProgress.put(1, 100L); // start from 0, so rows number is 101, and consumed offset is 100
|
||||
tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress);
|
||||
rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress);
|
||||
TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment);
|
||||
@ -358,7 +354,7 @@ public class GlobalTransactionMgrTest {
|
||||
|
||||
Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
|
||||
Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
|
||||
Assert.assertEquals(Long.valueOf(11L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1));
|
||||
Assert.assertEquals(Long.valueOf(101L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1));
|
||||
// todo(ml): change to assert queue
|
||||
// Assert.assertEquals(1, routineLoadManager.getNeedScheduleTasksQueue().size());
|
||||
// Assert.assertNotEquals("label", routineLoadManager.getNeedScheduleTasksQueue().peek().getId());
|
||||
@ -411,7 +407,7 @@ public class GlobalTransactionMgrTest {
|
||||
rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA);
|
||||
TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress();
|
||||
Map<Integer, Long> kafkaProgress = Maps.newHashMap();
|
||||
kafkaProgress.put(1, 10L);
|
||||
kafkaProgress.put(1, 110L); // start from 0, so rows number is 111, consumed offset is 110
|
||||
tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress);
|
||||
rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress);
|
||||
TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment);
|
||||
@ -422,9 +418,10 @@ public class GlobalTransactionMgrTest {
|
||||
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
|
||||
masterTransMgr.commitTransaction(1L, 1L, transTablets, txnCommitAttachment);
|
||||
|
||||
// current total rows and error rows will be reset after job pause, so here they should be 0.
|
||||
Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
|
||||
Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
|
||||
Assert.assertEquals(Long.valueOf(11L),
|
||||
Assert.assertEquals(Long.valueOf(111L),
|
||||
((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1));
|
||||
// todo(ml): change to assert queue
|
||||
// Assert.assertEquals(0, routineLoadManager.getNeedScheduleTasksQueue().size());
|
||||
|
||||
Reference in New Issue
Block a user