Fix routine load replay bugs (#770)

This commit is contained in:
Mingyu Chen
2019-03-19 18:32:43 +08:00
committed by ZHAO Chun
parent 95d0186e18
commit ff6844b7d6
11 changed files with 84 additions and 22 deletions

View File

@ -658,11 +658,6 @@ public class Catalog {
// the clear threads runs every min(transaction_clean_interval_second,stream_load_default_timeout_second)/10
txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second,
Config.stream_load_default_timeout_second) * 100L);
// 8. start routine load scheduler
routineLoadScheduler.start();
routineLoadTaskScheduler.start();
}
private void getClusterIdAndRole() throws IOException {
@ -1129,6 +1124,11 @@ public class Catalog {
domainResolver.start();
tabletStatMgr.start();
// start routine load scheduler
routineLoadScheduler.start();
routineLoadTaskScheduler.start();
MetricRepo.init();
}

View File

@ -418,7 +418,8 @@ public class JournalEntity implements Writable {
needRead = false;
break;
}
case OperationType.OP_CHANGE_ROUTINE_LOAD_JOB: {
case OperationType.OP_CHANGE_ROUTINE_LOAD_JOB:
case OperationType.OP_REMOVE_ROUTINE_LOAD_JOB: {
data = RoutineLoadOperation.read(in);
needRead = false;
break;

View File

@ -364,5 +364,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
for (int i = 0; i < size; i++) {
customKafkaPartitions.add(in.readInt());
}
setConsumer();
}
}

View File

@ -26,6 +26,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@ -50,6 +51,7 @@ import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnStateChangeListener;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -143,7 +145,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
protected RoutineLoadProgress progress;
protected String pausedReason;
protected String cancelReason;
protected long endTimestamp;
protected long endTimestamp = -1;
/*
* currentErrorRows and currentTotalRows is used for check error rate
@ -185,7 +187,6 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
this.clusterName = clusterName;
this.dbId = dbId;
this.tableId = tableId;
this.endTimestamp = -1;
this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser())
.append(ConnectContext.get().getRemoteIP())
.append(id).append(System.currentTimeMillis()).toString().hashCode();
@ -205,7 +206,6 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
this.desireTaskConcurrentNum = desireTaskConcurrentNum;
this.dataSourceType = dataSourceType;
this.maxErrorNum = maxErrorNum;
this.endTimestamp = -1;
}
protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
@ -602,6 +602,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
@Override
public void replayOnCommitted(TransactionState txnState) {
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
LOG.debug("replay on committed: {}", txnState);
}
// the task is aborted when the correct number of rows is more then 0
@ -658,6 +659,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
@Override
public void replayOnAborted(TransactionState txnState) {
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
LOG.debug("replay on aborted: {}", txnState);
}
// check task exists or not before call method
@ -789,6 +791,10 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
break;
}
if (state.isFinalState()) {
Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().unregister(id);
}
if (!isReplay) {
Catalog.getInstance().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState));
}
@ -894,6 +900,17 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
return job;
}
public boolean needRemove() {
if (state != JobState.CANCELLED && state != JobState.STOPPED) {
return false;
}
Preconditions.checkState(endTimestamp != -1, endTimestamp);
if ((System.currentTimeMillis() - endTimestamp) > Config.label_clean_interval_second * 1000) {
return true;
}
return false;
}
@Override
public void write(DataOutput out) throws IOException {
// ATTN: must write type first

View File

@ -24,7 +24,6 @@ import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@ -34,11 +33,11 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -110,8 +109,7 @@ public class RoutineLoadManager implements Writable {
}
}
}
LOG.debug("beIdToConcurrentTasks is {}", Joiner.on(",")
.withKeyValueSeparator(":").join(beIdToConcurrentTasks));
// LOG.debug("beIdToConcurrentTasks is {}", Joiner.on(",").withKeyValueSeparator(":").join(beIdToConcurrentTasks));
return beIdToConcurrentTasks;
}
@ -223,6 +221,7 @@ public class RoutineLoadManager implements Writable {
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
"User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job",
false /* not replay */);
LOG.info("pause routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName());
}
public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws DdlException,
@ -250,6 +249,7 @@ public class RoutineLoadManager implements Writable {
tableName);
}
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, "user operation", false /* not replay */);
LOG.info("resume routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName());
}
public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws DdlException, AnalysisException {
@ -276,6 +276,7 @@ public class RoutineLoadManager implements Writable {
tableName);
}
routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED, "user operation", false /* not replay */);
LOG.info("stop routine load job: {}, {}", routineLoadJob.getId(), routineLoadJob.getName());
}
public int getSizeOfIdToRoutineLoadTask() {
@ -388,7 +389,7 @@ public class RoutineLoadManager implements Writable {
if (routineLoadJobList == null) {
return null;
}
Optional<RoutineLoadJob> optional = routineLoadJobList.parallelStream()
Optional<RoutineLoadJob> optional = routineLoadJobList.stream()
.filter(entity -> !entity.getState().isFinalState()).findFirst();
if (!optional.isPresent()) {
return null;
@ -409,10 +410,10 @@ public class RoutineLoadManager implements Writable {
}
public List<RoutineLoadJob> getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) {
LOG.debug("begin to get routine load job by state {}", jobState.name());
// LOG.debug("begin to get routine load job by state {}", jobState.name());
List<RoutineLoadJob> stateJobs = idToRoutineLoadJob.values().stream()
.filter(entity -> entity.getState() == jobState).collect(Collectors.toList());
LOG.debug("got {} routine load jobs by state {}", stateJobs.size(), jobState.name());
// LOG.debug("got {} routine load jobs by state {}", stateJobs.size(), jobState.name());
return stateJobs;
}
@ -432,11 +433,13 @@ public class RoutineLoadManager implements Writable {
long currentTimestamp = System.currentTimeMillis();
while (iterator.hasNext()) {
RoutineLoadJob routineLoadJob = iterator.next().getValue();
long jobEndTimestamp = routineLoadJob.getEndTimestamp();
if (jobEndTimestamp != -1L &&
((currentTimestamp - jobEndTimestamp) > Config.label_clean_interval_second * 1000)) {
if (routineLoadJob.needRemove()) {
dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).get(routineLoadJob.getName()).remove(routineLoadJob);
iterator.remove();
RoutineLoadOperation operation = new RoutineLoadOperation(routineLoadJob.getId(),
JobState.CANCELLED);
Catalog.getInstance().getEditLog().logRemoveRoutineLoadJob(operation);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("end_timestamp", routineLoadJob.getEndTimestamp())
.add("current_timestamp", currentTimestamp)
@ -450,6 +453,19 @@ public class RoutineLoadManager implements Writable {
}
}
public void replayRemoveOldRoutineLoad(RoutineLoadOperation operation) {
writeLock();
try {
RoutineLoadJob job = idToRoutineLoadJob.remove(operation.getId());
if (job != null) {
dbToNameToRoutineLoadJob.get(job.getDbId()).get(job.getName()).remove(job);
}
LOG.info("replay remove routine load job: {}", operation.getId());
} finally {
writeUnlock();
}
}
public void updateRoutineLoadJob() {
for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
routineLoadJob.update();
@ -493,6 +509,9 @@ public class RoutineLoadManager implements Writable {
map.put(routineLoadJob.getName(), jobs);
}
jobs.add(routineLoadJob);
if (!routineLoadJob.getState().isFinalState()) {
Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().register(routineLoadJob);
}
}
}
}

View File

@ -93,6 +93,10 @@ public class RoutineLoadTaskScheduler extends Daemon {
int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum();
int needScheduleTaskNum = sizeOfTasksQueue < clusterIdleSlotNum ? sizeOfTasksQueue : clusterIdleSlotNum;
if (needScheduleTaskNum == 0) {
return;
}
LOG.info("There are {} tasks need to be scheduled in queue", needScheduleTasksQueue.size());
int scheduledTaskNum = 0;

View File

@ -674,6 +674,11 @@ public class EditLog {
Catalog.getCurrentCatalog().getRoutineLoadManager().replayChangeRoutineLoadJob(operation);
break;
}
case OperationType.OP_REMOVE_ROUTINE_LOAD_JOB: {
RoutineLoadOperation operation = (RoutineLoadOperation) journal.getData();
Catalog.getCurrentCatalog().getRoutineLoadManager().replayRemoveOldRoutineLoad(operation);
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@ -1184,4 +1189,8 @@ public class EditLog {
public void logOpRoutineLoadJob(RoutineLoadOperation routineLoadOperation) {
logEdit(OperationType.OP_CHANGE_ROUTINE_LOAD_JOB, routineLoadOperation);
}
public void logRemoveRoutineLoadJob(RoutineLoadOperation operation) {
logEdit(OperationType.OP_REMOVE_ROUTINE_LOAD_JOB, operation);
}
}

View File

@ -159,5 +159,5 @@ public class OperationType {
// routine load 200
public static final short OP_CREATE_ROUTINE_LOAD_JOB = 200;
public static final short OP_CHANGE_ROUTINE_LOAD_JOB = 201;
public static final short OP_REMOVE_ROUTINE_LOAD_JOB = 202;
}

View File

@ -64,7 +64,7 @@ public class RoutineLoadOperation implements Writable {
@Override
public void readFields(DataInput in) throws IOException {
in.readLong();
id = in.readLong();
jobState = JobState.valueOf(Text.readString(in));
}
}

View File

@ -135,7 +135,7 @@ public class TransactionState implements Writable {
private long publishVersionTime;
private TransactionStatus preStatus = null;
private long listenerId;
private long listenerId = -1;
// the result of calling txn state change listener.
// this is used for replaying
@ -393,6 +393,7 @@ public class TransactionState implements Writable {
if (txnCommitAttachment != null) {
sb.append(" attactment: ").append(txnCommitAttachment);
}
sb.append(", listen result: ").append(listenResult.name());
return sb.toString();
}
@ -440,6 +441,7 @@ public class TransactionState implements Writable {
txnCommitAttachment.write(out);
}
Text.writeString(out, listenResult.name());
out.writeLong(listenerId);
}
@Override
@ -470,6 +472,7 @@ public class TransactionState implements Writable {
txnCommitAttachment = TxnCommitAttachment.read(in);
}
listenResult = ListenResult.valueOf(Text.readString(in));
listenerId = in.readLong();
}
}
}

View File

@ -19,6 +19,9 @@ package org.apache.doris.transaction;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
/*
@ -28,6 +31,8 @@ import java.util.Map;
// saves all TxnStateChangeListeners
public class TxnStateListenerRegistry {
private static final Logger LOG = LogManager.getLogger(TxnStateListenerRegistry.class);
private Map<Long, TxnStateChangeListener> listeners = Maps.newHashMap();
public synchronized boolean register(TxnStateChangeListener listener) {
@ -35,11 +40,13 @@ public class TxnStateListenerRegistry {
return false;
}
listeners.put(listener.getId(), listener);
LOG.info("register txn state listener: {}", listener.getId());
return true;
}
public synchronized void unregister(long id) {
listeners.remove(id);
LOG.info("unregister txn state listener: {}", id);
}
public synchronized TxnStateChangeListener getListener(long id) {