Put begin txn into task scheduler (#687)

1. fix the nesting lock of db and txn
2. the txn of task will be init in task scheduler before take task from queue
This commit is contained in:
EmmyMiao87
2019-03-05 16:37:19 +08:00
committed by ZHAO Chun
parent 20b2b2c37f
commit 2314a3ecd4
19 changed files with 444 additions and 317 deletions

View File

@ -78,7 +78,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number";
// kafka type properties
public static final String KAFKA_ENDPOINT_PROPERTY = "kafka_endpoint";
public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list";
public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic";
// optional
public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
@ -93,7 +93,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.build();
private static final ImmutableSet<String> KAFKA_PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(KAFKA_ENDPOINT_PROPERTY)
.add(KAFKA_BROKER_LIST_PROPERTY)
.add(KAFKA_TOPIC_PROPERTY)
.add(KAFKA_PARTITIONS_PROPERTY)
.build();
@ -110,7 +110,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private RoutineLoadDesc routineLoadDesc;
private int desiredConcurrentNum;
private int maxErrorNum;
private String kafkaEndpoint;
private String kafkaBrokerList;
private String kafkaTopic;
private List<Integer> kafkaPartitions;
@ -121,7 +121,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
this.dbTableName = dbTableName;
this.loadPropertyList = loadPropertyList;
this.properties = properties;
this.typeName = typeName;
this.typeName = typeName.toUpperCase();
this.customProperties = customProperties;
}
@ -145,6 +145,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return customProperties;
}
// nullable
public RoutineLoadDesc getRoutineLoadDesc() {
return routineLoadDesc;
}
@ -157,8 +158,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return maxErrorNum;
}
public String getKafkaEndpoint() {
return kafkaEndpoint;
public String getKafkaBrokerList() {
return kafkaBrokerList;
}
public String getKafkaTopic() {
@ -176,6 +177,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
FeNameFormat.checkCommonName(NAME_TYPE, name);
// check dbName and tableName
checkDBTableName();
dbTableName.analyze(analyzer);
// check load properties include column separator etc.
checkLoadProperties(analyzer);
// check routine load properties include desired concurrent number etc.
@ -291,12 +293,16 @@ public class CreateRoutineLoadStmt extends DdlStmt {
throw new AnalysisException(optional.get() + " is invalid kafka custom property");
}
// check endpoint
kafkaEndpoint = customProperties.get(KAFKA_ENDPOINT_PROPERTY);
if (Strings.isNullOrEmpty(kafkaEndpoint)) {
throw new AnalysisException(KAFKA_ENDPOINT_PROPERTY + " is required property");
kafkaBrokerList = customProperties.get(KAFKA_BROKER_LIST_PROPERTY);
if (Strings.isNullOrEmpty(kafkaBrokerList)) {
throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + " is required property");
}
if (!Pattern.matches(ENDPOINT_REGEX, kafkaEndpoint)) {
throw new AnalysisException(KAFKA_ENDPOINT_PROPERTY + " not match pattern " + ENDPOINT_REGEX);
String[] kafkaBrokerList = this.kafkaBrokerList.split(",");
for (String broker : kafkaBrokerList) {
if (!Pattern.matches(ENDPOINT_REGEX, broker)) {
throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + ":" + broker
+ " not match pattern " + ENDPOINT_REGEX);
}
}
// check topic
kafkaTopic = customProperties.get(KAFKA_TOPIC_PROPERTY);

View File

@ -100,6 +100,7 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
@ -133,6 +134,8 @@ import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.load.routineload.RoutineLoadScheduler;
import org.apache.doris.load.routineload.RoutineLoadTaskScheduler;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.master.MetaHelper;
import org.apache.doris.meta.MetaContext;
@ -349,6 +352,10 @@ public class Catalog {
private TabletChecker tabletChecker;
private RoutineLoadScheduler routineLoadScheduler;
private RoutineLoadTaskScheduler routineLoadTaskScheduler;
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all
@ -466,6 +473,9 @@ public class Catalog {
this.stat = new TabletSchedulerStat();
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat);
this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);
this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager);
this.routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadManager);
}
public static void destroyCheckpoint() {
@ -650,6 +660,10 @@ public class Catalog {
txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second,
Config.stream_load_default_timeout_second) * 100L);
// 8. start routine load scheduler
routineLoadScheduler.start();
routineLoadTaskScheduler.start();
}
private void getClusterIdAndRole() throws IOException {

View File

@ -28,6 +28,7 @@ public class RoutineLoadDesc {
private final ColumnSeparator columnSeparator;
private final LoadColumnsInfo columnsInfo;
private final Expr wherePredicate;
// nullable
private final List<String> partitionNames;
public RoutineLoadDesc(ColumnSeparator columnSeparator, LoadColumnsInfo columnsInfo,
@ -50,6 +51,7 @@ public class RoutineLoadDesc {
return wherePredicate;
}
// nullable
public List<String> getPartitionNames() {
return partitionNames;
}

View File

@ -53,7 +53,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
private static final String FE_GROUP_ID = "fe_fetch_partitions";
private static final int FETCH_PARTITIONS_TIMEOUT = 10;
private String serverAddress;
private String brokerList;
private String topic;
// optional, user want to load partitions.
private List<Integer> customKafkaPartitions;
@ -63,9 +63,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// this is the kafka consumer which is used to fetch the number of partitions
private KafkaConsumer consumer;
public KafkaRoutineLoadJob(String name, long dbId, long tableId, String serverAddress, String topic) {
public KafkaRoutineLoadJob(String name, long dbId, long tableId, String brokerList, String topic) {
super(name, dbId, tableId, LoadDataSourceType.KAFKA);
this.serverAddress = serverAddress;
this.brokerList = brokerList;
this.topic = topic;
this.progress = new KafkaProgress();
this.customKafkaPartitions = new ArrayList<>();
@ -78,11 +78,11 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
public KafkaRoutineLoadJob(long id, String name, long dbId, long tableId,
RoutineLoadDesc routineLoadDesc,
int desireTaskConcurrentNum, int maxErrorNum,
String serverAddress, String topic, KafkaProgress kafkaProgress) {
String brokerList, String topic, KafkaProgress kafkaProgress) {
super(id, name, dbId, tableId, routineLoadDesc,
desireTaskConcurrentNum, LoadDataSourceType.KAFKA,
maxErrorNum);
this.serverAddress = serverAddress;
this.brokerList = brokerList;
this.topic = topic;
this.progress = kafkaProgress;
this.customKafkaPartitions = new ArrayList<>();
@ -94,32 +94,28 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return topic;
}
public String getServerAddress() {
return serverAddress;
public String getBrokerList() {
return brokerList;
}
// this is a unprotected method which is called in the initialization function
private void setCustomKafkaPartitions(List<Integer> kafkaPartitions) throws LoadException {
writeLock();
try {
if (this.customKafkaPartitions.size() != 0) {
throw new LoadException("Kafka partitions have been initialized");
}
// check if custom kafka partition is valid
List<Integer> allKafkaPartitions = getAllKafkaPartitions();
outter:
for (Integer customkafkaPartition : kafkaPartitions) {
for (Integer kafkaPartition : allKafkaPartitions) {
if (kafkaPartition.equals(customkafkaPartition)) {
continue outter;
}
}
throw new LoadException("there is a custom kafka partition " + customkafkaPartition
+ " which is invalid for topic " + topic);
}
this.customKafkaPartitions = kafkaPartitions;
} finally {
writeUnlock();
if (this.customKafkaPartitions.size() != 0) {
throw new LoadException("Kafka partitions have been initialized");
}
// check if custom kafka partition is valid
List<Integer> allKafkaPartitions = getAllKafkaPartitions();
outter:
for (Integer customkafkaPartition : kafkaPartitions) {
for (Integer kafkaPartition : allKafkaPartitions) {
if (kafkaPartition.equals(customkafkaPartition)) {
continue outter;
}
}
throw new LoadException("there is a custom kafka partition " + customkafkaPartition
+ " which is invalid for topic " + topic);
}
this.customKafkaPartitions = kafkaPartitions;
}
@Override
@ -130,17 +126,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
if (state == JobState.NEED_SCHEDULE) {
// divide kafkaPartitions into tasks
for (int i = 0; i < currentConcurrentTaskNum; i++) {
try {
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id);
routineLoadTaskInfoList.add(kafkaTaskInfo);
needScheduleTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);
} catch (UserException e) {
LOG.error("failed to begin txn for kafka routine load task, change job state to failed");
state = JobState.CANCELLED;
// TODO(ml): edit log
break;
}
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id);
routineLoadTaskInfoList.add(kafkaTaskInfo);
needScheduleTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);
}
if (result.size() != 0) {
for (int i = 0; i < currentKafkaPartitions.size(); i++) {
@ -154,6 +143,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
} else {
LOG.debug("Ignore to divide routine load job while job state {}", state);
}
// save task into queue of needScheduleTasks
Catalog.getCurrentCatalog().getRoutineLoadManager().addTasksToNeedScheduleQueue(result);
} finally {
writeUnlock();
}
@ -162,14 +153,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
@Override
public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
updateCurrentKafkaPartitions();
SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo();
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
LOG.warn("db {} is not exists from job {}", dbId, id);
throw new MetaNotFoundException("db " + dbId + " is not exists from job " + id);
}
int aliveBeNum = systemInfoService.getBackendIds(true).size();
int aliveBeNum = systemInfoService.getClusterBackendIds(getClusterName(), true).size();
int partitionNum = currentKafkaPartitions.size();
if (desireTaskConcurrentNum == 0) {
desireTaskConcurrentNum = partitionNum;
@ -178,16 +163,17 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
LOG.info("current concurrent task number is min "
+ "(current size of partition {}, desire task concurrent num {}, alive be num {})",
partitionNum, desireTaskConcurrentNum, aliveBeNum);
return Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum));
return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM);
}
@Override
protected void updateProgress(RoutineLoadProgress progress) {
this.progress.update(progress);
protected void updateProgress(RLTaskTxnCommitAttachment attachment) {
super.updateProgress(attachment);
this.progress.update(attachment.getProgress());
}
@Override
protected RoutineLoadTaskInfo reNewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException,
protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException,
LabelAlreadyUsedException, BeginTransactionException {
// remove old task
routineLoadTaskInfoList.remove(routineLoadTaskInfo);
@ -198,22 +184,38 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return kafkaTaskInfo;
}
@Override
protected void executeUpdate() {
updateNewPartitionProgress();
}
// if customKafkaPartition is not null, then return false immediately
// else if kafka partitions of topic has been changed, return true.
// else return false
// update current kafka partition at the same time
// current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions
@Override
protected boolean needReschedule() {
if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) {
currentKafkaPartitions = customKafkaPartitions;
return false;
} else {
List<Integer> newCurrentKafkaPartition = getAllKafkaPartitions();
List<Integer> newCurrentKafkaPartition;
try {
newCurrentKafkaPartition = getAllKafkaPartitions();
} catch (Exception e) {
LOG.warn("Job {} failed to fetch all current partition", id);
return false;
}
if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) {
if (currentKafkaPartitions.size() > newCurrentKafkaPartition.size()) {
currentKafkaPartitions = newCurrentKafkaPartition;
return true;
} else {
return false;
}
} else {
currentKafkaPartitions = newCurrentKafkaPartition;
return true;
}
@ -232,12 +234,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) throws AnalysisException,
LoadException {
checkCreate(stmt);
// find dbId
// check db and table
Database database = Catalog.getCurrentCatalog().getDb(stmt.getDBTableName().getDb());
Table table;
if (database == null) {
throw new AnalysisException("There is no database named " + stmt.getDBTableName().getDb());
}
database.readLock();
Table table;
try {
unprotectCheckCreate(stmt);
table = database.getTable(stmt.getDBTableName().getTbl());
} finally {
database.readUnlock();
@ -246,27 +251,19 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// init kafka routine load job
KafkaRoutineLoadJob kafkaRoutineLoadJob =
new KafkaRoutineLoadJob(stmt.getName(), database.getId(), table.getId(),
stmt.getKafkaEndpoint(),
stmt.getKafkaBrokerList(),
stmt.getKafkaTopic());
kafkaRoutineLoadJob.setOptional(stmt);
return kafkaRoutineLoadJob;
}
// current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions
private void updateCurrentKafkaPartitions() {
if (customKafkaPartitions == null || customKafkaPartitions.size() == 0) {
LOG.debug("All of partitions which belong to topic will be loaded for {} routine load job", name);
// fetch all of kafkaPartitions in topic
currentKafkaPartitions.addAll(getAllKafkaPartitions());
} else {
currentKafkaPartitions = customKafkaPartitions;
}
private void updateNewPartitionProgress() {
// update the progress of new partitions
for (Integer kafkaPartition : currentKafkaPartitions) {
try {
if (((KafkaProgress) progress).getPartitionIdToOffset().containsKey(kafkaPartition)) {
((KafkaProgress) progress).getPartitionIdToOffset().get(kafkaPartition);
} catch (NullPointerException e) {
} else {
((KafkaProgress) progress).getPartitionIdToOffset().put(kafkaPartition, 0L);
}
}
@ -274,7 +271,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
private void setConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", this.serverAddress);
props.put("bootstrap.servers", this.brokerList);
props.put("group.id", FE_GROUP_ID);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

View File

@ -20,22 +20,15 @@ package org.apache.doris.load.routineload;
import com.google.common.base.Joiner;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.task.KafkaRoutineLoadTask;
import org.apache.doris.task.RoutineLoadTask;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TKafkaLoadInfo;
import org.apache.doris.thrift.TLoadSourceType;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
@ -52,8 +45,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private List<Integer> partitions;
public KafkaTaskInfo(UUID id, long jobId) throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
public KafkaTaskInfo(UUID id, long jobId) {
super(id, jobId);
this.partitions = new ArrayList<>();
}
@ -74,7 +66,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
// todo: reuse plan fragment of stream load
@Override
public TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException {
public TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserException {
KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId);
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
for (Integer partitionId : partitions) {
@ -96,7 +88,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
tRoutineLoadTask.setTbl(database.getTable(routineLoadJob.getTableId()).getName());
StringBuilder stringBuilder = new StringBuilder();
// label = (serviceAddress_topic_partition1:offset_partition2:offset).hashcode()
String label = String.valueOf(stringBuilder.append(routineLoadJob.getServerAddress()).append("_")
String label = String.valueOf(stringBuilder.append(routineLoadJob.getBrokerList()).append("_")
.append(routineLoadJob.getTopic()).append("_")
.append(Joiner.on("_").withKeyValueSeparator(":")
.join(partitionIdToOffset)).toString().hashCode());
@ -104,21 +96,19 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
tRoutineLoadTask.setAuth_code(routineLoadJob.getAuthCode());
TKafkaLoadInfo tKafkaLoadInfo = new TKafkaLoadInfo();
tKafkaLoadInfo.setTopic((routineLoadJob).getTopic());
tKafkaLoadInfo.setBrokers((routineLoadJob).getServerAddress());
tKafkaLoadInfo.setBrokers((routineLoadJob).getBrokerList());
tKafkaLoadInfo.setPartition_begin_offset(partitionIdToOffset);
tRoutineLoadTask.setKafka_load_info(tKafkaLoadInfo);
tRoutineLoadTask.setType(TLoadSourceType.KAFKA);
tRoutineLoadTask.setParams(createTExecPlanFragmentParams(routineLoadJob));
tRoutineLoadTask.setParams(updateTExecPlanFragmentParams(routineLoadJob));
return tRoutineLoadTask;
}
private TExecPlanFragmentParams createTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException {
StreamLoadTask streamLoadTask = StreamLoadTask.fromRoutineLoadTaskInfo(this);
Database database = Catalog.getCurrentCatalog().getDb(routineLoadJob.getDbId());
StreamLoadPlanner planner = new StreamLoadPlanner(database,
(OlapTable) database.getTable(routineLoadJob.getTableId()),
streamLoadTask);
return planner.plan();
private TExecPlanFragmentParams updateTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException {
TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.gettExecPlanFragmentParams();
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutput_sink().getOlap_table_sink().setTxn_id(this.txnId);
return tExecPlanFragmentParams;
}
}

View File

@ -24,7 +24,6 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
@ -32,16 +31,14 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.TxnStateChangeListener;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendServiceImpl;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadTxnCommitRequest;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.transaction.AbortTransactionException;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
@ -57,9 +54,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@ -76,6 +71,7 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10;
private static final int BASE_OF_ERROR_RATE = 10000;
private static final String STAR_STRING = "*";
protected static final int DEFAULT_TASK_MAX_CONCURRENT_NUM = 3;
/**
* +-----------------+
@ -132,6 +128,7 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
protected RoutineLoadProgress progress;
protected String pausedReason;
protected String cancelReason;
// currentErrorNum and currentTotalNum will be update
// when currentTotalNum is more then ten thousand or currentErrorNum is more then maxErrorNum
@ -213,17 +210,28 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
return dbId;
}
public String getDbFullName() {
public String getDbFullName() throws MetaNotFoundException {
Database database = Catalog.getCurrentCatalog().getDb(dbId);
return database.getFullName();
if (database == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
database.readLock();
try {
return database.getFullName();
} finally {
database.readUnlock();
}
}
public long getTableId() {
return tableId;
}
public String getTableName() {
public String getTableName() throws MetaNotFoundException {
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
database.readLock();
try {
Table table = database.getTable(tableId);
@ -237,24 +245,16 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
return state;
}
public void setState(JobState state) {
this.state = state;
}
public long getAuthCode() {
return authCode;
}
// this is a unprotected method which is called in the initialization function
protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException {
writeLock();
try {
if (this.routineLoadDesc != null) {
throw new LoadException("Routine load desc has been initialized");
}
this.routineLoadDesc = routineLoadDesc;
} finally {
writeUnlock();
if (this.routineLoadDesc != null) {
throw new LoadException("Routine load desc has been initialized");
}
this.routineLoadDesc = routineLoadDesc;
}
public RoutineLoadDesc getRoutineLoadDesc() {
@ -270,13 +270,28 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
}
public String getPartitions() {
if (routineLoadDesc.getPartitionNames() == null || routineLoadDesc.getPartitionNames().size() == 0) {
if (routineLoadDesc == null
|| routineLoadDesc.getPartitionNames() == null
|| routineLoadDesc.getPartitionNames().size() == 0) {
return STAR_STRING;
} else {
return String.join(",", routineLoadDesc.getPartitionNames());
}
}
public String getClusterName() throws MetaNotFoundException {
Database database = Catalog.getCurrentCatalog().getDb(id);
if (database == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
database.readLock();
try {
return database.getClusterName();
} finally {
database.readUnlock();
}
}
protected void setDesireTaskConcurrentNum(int desireTaskConcurrentNum) throws LoadException {
writeLock();
try {
@ -323,13 +338,8 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
return needScheduleTaskInfoList;
}
public void updateState(JobState jobState) {
writeLock();
try {
state = jobState;
} finally {
writeUnlock();
}
public TExecPlanFragmentParams gettExecPlanFragmentParams() {
return tExecPlanFragmentParams;
}
public List<RoutineLoadTaskInfo> processTimeoutTasks() {
@ -355,7 +365,7 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
}
try {
result.add(reNewTask(routineLoadTaskInfo));
result.add(unprotectRenewTask(routineLoadTaskInfo));
LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled",
oldSignature, DEFAULT_TASK_TIMEOUT_SECONDS);
} catch (UserException e) {
@ -397,7 +407,10 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
}
}
abstract void updateProgress(RoutineLoadProgress progress);
// if rate of error data is more then max_filter_ratio, pause job
protected void updateProgress(RLTaskTxnCommitAttachment attachment) {
updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows());
}
public boolean containsTask(String taskId) {
readLock();
@ -413,11 +426,6 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
private void checkStateTransform(RoutineLoadJob.JobState desireState)
throws UnsupportedOperationException {
switch (state) {
case RUNNING:
if (desireState == JobState.NEED_SCHEDULE) {
throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState);
}
break;
case PAUSED:
if (desireState == JobState.PAUSED) {
throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState);
@ -461,9 +469,24 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
}
}
abstract RoutineLoadTaskInfo reNewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException,
abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException,
LabelAlreadyUsedException, BeginTransactionException;
public void plan() throws UserException {
StreamLoadTask streamLoadTask = StreamLoadTask.fromRoutineLoadJob(this);
Database database = Catalog.getCurrentCatalog().getDb(this.getDbId());
database.readLock();
try {
StreamLoadPlanner planner = new StreamLoadPlanner(database,
(OlapTable) database.getTable(this.tableId),
streamLoadTask);
tExecPlanFragmentParams = planner.plan();
} finally {
database.readUnlock();
}
}
@Override
public void beforeAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason)
throws AbortTransactionException {
@ -472,6 +495,7 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
if (txnStatusChangeReason != null) {
switch (txnStatusChangeReason) {
case TIMEOUT:
default:
String taskId = txnState.getLabel();
if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().equals(taskId))) {
throw new AbortTransactionException(
@ -497,29 +521,28 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional =
routineLoadTaskInfoList.parallelStream()
.filter(entity -> entity.getId().equals(txnState.getLabel())).findFirst();
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
if (routineLoadTaskInfoOptional.isPresent()) {
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
// step2: update job progress
updateProgress(rlTaskTxnCommitAttachment.getProgress());
// step2: update job progress
updateProgress(rlTaskTxnCommitAttachment);
// step4: if rate of error data is more then max_filter_ratio, pause job
updateNumOfData(rlTaskTxnCommitAttachment.getFilteredRows(), rlTaskTxnCommitAttachment.getLoadedRows());
if (state == JobState.RUNNING) {
// step5: create a new task for partitions
RoutineLoadTaskInfo newRoutineLoadTaskInfo = reNewTask(routineLoadTaskInfo);
Catalog.getCurrentCatalog().getRoutineLoadManager()
.getNeedScheduleTasksQueue().add(newRoutineLoadTaskInfo);
if (state == JobState.RUNNING) {
// step3: create a new task for partitions
RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo);
Catalog.getCurrentCatalog().getRoutineLoadManager()
.getNeedScheduleTasksQueue().add(newRoutineLoadTaskInfo);
}
} else {
LOG.debug("There is no {} task in task info list. Maybe task has been renew or job state has changed. "
+ " Transaction {} will not be committed",
txnState.getLabel(), txnState.getTransactionId());
}
} catch (NoSuchElementException e) {
LOG.debug("There is no {} task in task info list. Maybe task has been renew or job state has changed. "
+ " Transaction {} will not be committed",
txnState.getLabel(), txnState.getTransactionId());
} catch (Throwable e) {
LOG.error("failed to update offset in routine load task {} when transaction {} has been committed. "
+ "change job to paused",
rlTaskTxnCommitAttachment.getTaskId(), txnState.getTransactionId(), e);
executePause("failed to update offset when transaction "
updateState(JobState.PAUSED, "failed to update offset when transaction "
+ txnState.getTransactionId() + " has been committed");
} finally {
writeUnlock();
@ -528,12 +551,12 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
@Override
public void onAborted(TransactionState txnState, TransactionState.TxnStatusChangeReason txnStatusChangeReason) {
pause(txnStatusChangeReason.name());
updateState(JobState.PAUSED, txnStatusChangeReason.name());
LOG.debug("job {} need to be pause while txn {} abort with reason {}",
id, txnState.getTransactionId(), txnStatusChangeReason.name());
}
protected static void checkCreate(CreateRoutineLoadStmt stmt) throws AnalysisException {
protected static void unprotectCheckCreate(CreateRoutineLoadStmt stmt) throws AnalysisException {
// check table belong to db, partitions belong to table
if (stmt.getRoutineLoadDesc() == null) {
checkDBSemantics(stmt.getDBTableName(), null);
@ -546,44 +569,54 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
throws AnalysisException {
String tableName = dbTableName.getTbl();
String dbName = dbTableName.getDb();
// check database
// check table belong to database
Database database = Catalog.getCurrentCatalog().getDb(dbName);
if (database == null) {
throw new AnalysisException("There is no database named " + dbName);
Table table = database.getTable(tableName);
if (table == null) {
throw new AnalysisException("There is no table named " + tableName + " in " + dbName);
}
// check table type
if (table.getType() != Table.TableType.OLAP) {
throw new AnalysisException("Only doris table support routine load");
}
database.readLock();
try {
Table table = database.getTable(tableName);
// check table belong to database
if (table == null) {
throw new AnalysisException("There is no table named " + tableName + " in " + dbName);
}
// check table type
if (table.getType() != Table.TableType.OLAP) {
throw new AnalysisException("Only doris table support routine load");
}
if (partitionNames == null || partitionNames.size() == 0) {
return;
}
// check partitions belong to table
Optional<String> partitionNotInTable = partitionNames.parallelStream()
.filter(entity -> ((OlapTable) table).getPartition(entity) == null).findFirst();
if (partitionNotInTable != null && partitionNotInTable.isPresent()) {
throw new AnalysisException("Partition " + partitionNotInTable.get()
+ " does not belong to table " + tableName);
}
} finally {
database.readUnlock();
if (partitionNames == null || partitionNames.size() == 0) {
return;
}
// check partitions belong to table
Optional<String> partitionNotInTable = partitionNames.parallelStream()
.filter(entity -> ((OlapTable) table).getPartition(entity) == null).findFirst();
if (partitionNotInTable != null && partitionNotInTable.isPresent()) {
throw new AnalysisException("Partition " + partitionNotInTable.get()
+ " does not belong to table " + tableName);
}
}
public void pause(String reason) {
public void updateState(JobState jobState) {
updateState(jobState, null);
}
public void updateState(JobState jobState, String reason) {
writeLock();
try {
checkStateTransform(JobState.PAUSED);
executePause(reason);
checkStateTransform(jobState);
switch (jobState) {
case PAUSED:
executePause(reason);
break;
case NEED_SCHEDULE:
executeNeedSchedule();
break;
case STOPPED:
executeStop();
break;
case CANCELLED:
executeCancel(reason);
break;
default:
break;
}
} finally {
writeUnlock();
}
@ -598,43 +631,54 @@ public abstract class RoutineLoadJob implements Writable, TxnStateChangeListener
needScheduleTaskInfoList.clear();
}
public void resume() {
private void executeNeedSchedule() {
// TODO(ml): edit log
writeLock();
try {
checkStateTransform(JobState.NEED_SCHEDULE);
state = JobState.NEED_SCHEDULE;
} finally {
writeUnlock();
}
state = JobState.NEED_SCHEDULE;
routineLoadTaskInfoList.clear();
needScheduleTaskInfoList.clear();
}
public void stop() {
private void executeStop() {
// TODO(ml): edit log
writeLock();
try {
checkStateTransform(JobState.STOPPED);
state = JobState.STOPPED;
routineLoadTaskInfoList.clear();
needScheduleTaskInfoList.clear();
} finally {
writeUnlock();
}
state = JobState.STOPPED;
routineLoadTaskInfoList.clear();
needScheduleTaskInfoList.clear();
}
public void reschedule() {
if (needReschedule()) {
writeLock();
try {
if (state == JobState.RUNNING) {
state = JobState.NEED_SCHEDULE;
routineLoadTaskInfoList.clear();
needScheduleTaskInfoList.clear();
}
} finally {
writeUnlock();
private void executeCancel(String reason) {
cancelReason = reason;
state = JobState.CANCELLED;
routineLoadTaskInfoList.clear();
needScheduleTaskInfoList.clear();
}
public void update() {
// check if db and table exist
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
LOG.info("The database {} has been deleted. Change {} job state to stopped", dbId, id);
updateState(JobState.STOPPED);
}
database.readLock();
try {
Table table = database.getTable(tableId);
// check table belong to database
if (table == null) {
LOG.info("The table {} has been deleted. Change {} job state to stopeed", tableId, id);
updateState(JobState.STOPPED);
}
} finally {
database.readUnlock();
}
// check if partition has been changed
if (needReschedule()) {
executeUpdate();
updateState(JobState.NEED_SCHEDULE);
}
}
protected void executeUpdate() {
}
protected boolean needReschedule() {

View File

@ -92,6 +92,10 @@ public class RoutineLoadManager {
return needScheduleTasksQueue;
}
public void addTasksToNeedScheduleQueue(List<RoutineLoadTaskInfo> routineLoadTaskInfoList) {
needScheduleTasksQueue.addAll(routineLoadTaskInfoList);
}
private void updateBeIdToMaxConcurrentTasks() {
beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true)
.parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM));
@ -172,7 +176,7 @@ public class RoutineLoadManager {
writeLock();
try {
// check if db.routineLoadName has been used
if (isNameUsed(routineLoadJob.dbId, routineLoadJob.getName())) {
if (isNameUsed(routineLoadJob.getDbId(), routineLoadJob.getName())) {
throw new DdlException("Name " + routineLoadJob.getName() + " already used in db "
+ routineLoadJob.getDbId());
}
@ -228,17 +232,26 @@ public class RoutineLoadManager {
throw new DdlException("There is not routine load job with name " + pauseRoutineLoadStmt.getName());
}
// check auth
String dbFullName;
String tableName;
try {
dbFullName = routineLoadJob.getDbFullName();
tableName = routineLoadJob.getTableName();
} catch (MetaNotFoundException e) {
throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e);
}
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
routineLoadJob.getDbFullName(),
routineLoadJob.getTableName(),
dbFullName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
routineLoadJob.getTableName());
tableName);
}
routineLoadJob.pause("User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job");
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
"User " + ConnectContext.get().getQualifiedUser() + "pauses routine load job");
}
public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws DdlException,
@ -248,16 +261,24 @@ public class RoutineLoadManager {
throw new DdlException("There is not routine load job with name " + resumeRoutineLoadStmt.getName());
}
// check auth
String dbFullName;
String tableName;
try {
dbFullName = routineLoadJob.getDbFullName();
tableName = routineLoadJob.getTableName();
} catch (MetaNotFoundException e) {
throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e);
}
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
routineLoadJob.getDbFullName(),
routineLoadJob.getTableName(),
dbFullName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
routineLoadJob.getTableName());
tableName);
}
routineLoadJob.resume();
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE);
}
public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws DdlException, AnalysisException {
@ -266,16 +287,24 @@ public class RoutineLoadManager {
throw new DdlException("There is not routine load job with name " + stopRoutineLoadStmt.getName());
}
// check auth
String dbFullName;
String tableName;
try {
dbFullName = routineLoadJob.getDbFullName();
tableName = routineLoadJob.getTableName();
} catch (MetaNotFoundException e) {
throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e);
}
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
routineLoadJob.getDbFullName(),
routineLoadJob.getTableName(),
dbFullName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
routineLoadJob.getTableName());
tableName);
}
routineLoadJob.stop();
routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED);
}
public int getSizeOfIdToRoutineLoadTask() {
@ -304,21 +333,26 @@ public class RoutineLoadManager {
}
}
public long getMinTaskBeId() throws LoadException {
public long getMinTaskBeId(String clusterName) throws LoadException {
List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName);
if (beIdsInCluster == null) {
throw new LoadException("The " + clusterName + " has been deleted");
}
readLock();
try {
long result = -1L;
int maxIdleSlotNum = 0;
updateBeIdToMaxConcurrentTasks();
for (Map.Entry<Long, Integer> entry : beIdToMaxConcurrentTasks.entrySet()) {
if (beIdToConcurrentTasks.get(entry.getKey()) == null) {
result = maxIdleSlotNum < entry.getValue() ? entry.getKey() : result;
maxIdleSlotNum = Math.max(maxIdleSlotNum, entry.getValue());
} else {
int idelTaskNum = entry.getValue() - beIdToConcurrentTasks.get(entry.getKey());
result = maxIdleSlotNum < idelTaskNum ? entry.getKey() : result;
maxIdleSlotNum = Math.max(maxIdleSlotNum, idelTaskNum);
}
for (Long beId : beIdsInCluster) {
int idleTaskNum = 0;
if (beIdToConcurrentTasks.containsKey(beId)) {
idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
} else {
idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM;
}
result = maxIdleSlotNum < idleTaskNum ? beId : result;
maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum);
}
if (result < 0) {
throw new LoadException("There is no empty slot in cluster");
@ -369,17 +403,12 @@ public class RoutineLoadManager {
throw new MetaNotFoundException("could not found task by id " + taskId);
}
public List<RoutineLoadJob> getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) throws LoadException {
List<RoutineLoadJob> jobs = new ArrayList<>();
Collection<RoutineLoadJob> stateJobs = null;
public List<RoutineLoadJob> getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) {
LOG.debug("begin to get routine load job by state {}", jobState.name());
stateJobs = idToRoutineLoadJob.values().stream()
List<RoutineLoadJob> stateJobs = idToRoutineLoadJob.values().stream()
.filter(entity -> entity.getState() == jobState).collect(Collectors.toList());
if (stateJobs != null) {
jobs.addAll(stateJobs);
LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name());
}
return jobs;
LOG.debug("got {} routine load jobs by state {}", stateJobs.size(), jobState.name());
return stateJobs;
}
public List<RoutineLoadTaskInfo> processTimeoutTasks() {
@ -397,9 +426,9 @@ public class RoutineLoadManager {
// TODO(ml): remove old routine load job
}
public void rescheduleRoutineLoadJob() {
public void updateRoutineLoadJob() {
for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
routineLoadJob.reschedule();
routineLoadJob.update();
}
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.load.routineload;
import com.google.common.annotations.VisibleForTesting;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
@ -30,8 +31,20 @@ import java.util.List;
public class RoutineLoadScheduler extends Daemon {
private static final Logger LOG = LogManager.getLogger(RoutineLoadScheduler.class);
private static final int DEFAULT_INTERVAL_SECONDS = 10;
private RoutineLoadManager routineLoadManager = Catalog.getInstance().getRoutineLoadManager();
private RoutineLoadManager routineLoadManager;
@VisibleForTesting
public RoutineLoadScheduler() {
super();
routineLoadManager = Catalog.getInstance().getRoutineLoadManager();
}
public RoutineLoadScheduler(RoutineLoadManager routineLoadManager) {
super("Routine load", DEFAULT_INTERVAL_SECONDS * 1000);
this.routineLoadManager = routineLoadManager;
}
@Override
protected void runOneCycle() {
@ -44,7 +57,7 @@ public class RoutineLoadScheduler extends Daemon {
private void process() {
// update
routineLoadManager.rescheduleRoutineLoadJob();
routineLoadManager.updateRoutineLoadJob();
// get need schedule routine jobs
List<RoutineLoadJob> routineLoadJobList = null;
try {
@ -53,9 +66,11 @@ public class RoutineLoadScheduler extends Daemon {
LOG.error("failed to get need schedule routine jobs");
}
LOG.debug("there are {} job need schedule", routineLoadJobList.size());
LOG.info("there are {} job need schedule", routineLoadJobList.size());
for (RoutineLoadJob routineLoadJob : routineLoadJobList) {
try {
// create plan of routine load job
routineLoadJob.plan();
// judge nums of tasks more then max concurrent tasks of cluster
int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum();
int totalTaskNum = currentConcurrentTaskNum + routineLoadManager.getSizeOfIdToRoutineLoadTask();
@ -68,20 +83,21 @@ public class RoutineLoadScheduler extends Daemon {
totalTaskNum, routineLoadManager.getTotalMaxConcurrentTaskNum());
break;
}
// divide job into tasks
List<RoutineLoadTaskInfo> needScheduleTasksList =
routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum);
// save task into queue of needScheduleTasks
routineLoadManager.getNeedScheduleTasksQueue().addAll(needScheduleTasksList);
// check state and divide job into tasks
routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum);
} catch (MetaNotFoundException e) {
routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED);
routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED, e.getMessage());
} catch (Throwable e) {
LOG.warn("failed to scheduler job, change job state to paused", e);
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, e.getMessage());
continue;
}
}
LOG.debug("begin to check timeout tasks");
// check timeout tasks
List<RoutineLoadTaskInfo> rescheduleTasksList = routineLoadManager.processTimeoutTasks();
routineLoadManager.getNeedScheduleTasksQueue().addAll(rescheduleTasksList);
routineLoadManager.addTasksToNeedScheduleQueue(rescheduleTasksList);
}
private List<RoutineLoadJob> getNeedScheduleRoutineJobs() throws LoadException {

View File

@ -47,17 +47,11 @@ public abstract class RoutineLoadTaskInfo {
private long createTimeMs;
private long loadStartTimeMs;
private TExecPlanFragmentParams tExecPlanFragmentParams;
public RoutineLoadTaskInfo(UUID id, long jobId) throws BeginTransactionException,
LabelAlreadyUsedException, AnalysisException {
public RoutineLoadTaskInfo(UUID id, long jobId) {
this.id = id;
this.jobId = jobId;
this.createTimeMs = System.currentTimeMillis();
// begin a txn for task
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
routineLoadJob.getDbId(), id.toString(), -1, "streamLoad",
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob);
}
public UUID getId() {
@ -79,8 +73,16 @@ public abstract class RoutineLoadTaskInfo {
public long getTxnId() {
return txnId;
}
abstract TRoutineLoadTask createRoutineLoadTask(long beId) throws LoadException, UserException;
abstract TRoutineLoadTask createRoutineLoadTask() throws LoadException, UserException;
public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException {
// begin a txn for task
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
routineLoadJob.getDbId(), id.toString(), -1, "streamLoad",
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob);
}
@Override
public boolean equals(Object obj) {

View File

@ -17,6 +17,9 @@
package org.apache.doris.load.routineload;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.LoadException;
@ -28,9 +31,6 @@ import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TRoutineLoadTask;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -52,12 +52,16 @@ public class RoutineLoadTaskScheduler extends Daemon {
private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class);
private RoutineLoadManager routineLoadManager;
private LinkedBlockingQueue<RoutineLoadTaskInfo> needScheduleTasksQueue;
@VisibleForTesting
public RoutineLoadTaskScheduler() {
super("routine load task", 0);
routineLoadManager = Catalog.getInstance().getRoutineLoadManager();
needScheduleTasksQueue = (LinkedBlockingQueue) routineLoadManager.getNeedScheduleTasksQueue();
this.routineLoadManager = Catalog.getInstance().getRoutineLoadManager();
}
public RoutineLoadTaskScheduler(RoutineLoadManager routineLoadManager) {
super("routine load task", 0);
this.routineLoadManager = routineLoadManager;
}
@Override
@ -70,7 +74,9 @@ public class RoutineLoadTaskScheduler extends Daemon {
}
}
private void process() throws LoadException, UserException {
private void process() throws LoadException, UserException, InterruptedException {
LinkedBlockingQueue<RoutineLoadTaskInfo> needScheduleTasksQueue =
(LinkedBlockingQueue) routineLoadManager.getNeedScheduleTasksQueue();
// update current beIdMaps for tasks
routineLoadManager.updateBeIdTaskMaps();
@ -83,17 +89,13 @@ public class RoutineLoadTaskScheduler extends Daemon {
int scheduledTaskNum = 0;
// get idle be task num
// allocate task to be
if (needScheduleTaskNum == 0) {
Thread.sleep(1000);
return;
}
while (needScheduleTaskNum > 0) {
RoutineLoadTaskInfo routineLoadTaskInfo = null;
try {
routineLoadTaskInfo = needScheduleTasksQueue.take();
} catch (InterruptedException e) {
LOG.warn("Taking routine load task from queue has been interrupted with error msg {}",
e.getMessage());
return;
}
long beId = routineLoadManager.getMinTaskBeId();
// allocate be to task and begin transaction for task
RoutineLoadTaskInfo routineLoadTaskInfo = needScheduleTasksQueue.peek();
RoutineLoadJob routineLoadJob = null;
try {
routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getId().toString());
@ -101,7 +103,27 @@ public class RoutineLoadTaskScheduler extends Daemon {
LOG.warn("task {} has been abandoned", routineLoadTaskInfo.getId());
return;
}
TRoutineLoadTask tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask(beId);
long beId;
try {
beId = routineLoadManager.getMinTaskBeId(routineLoadJob.getClusterName());
routineLoadTaskInfo.beginTxn();
} catch (Exception e) {
LOG.warn("put task to the rear of queue with error " + e.getMessage());
needScheduleTasksQueue.take();
needScheduleTasksQueue.put(routineLoadTaskInfo);
needScheduleTaskNum--;
continue;
}
// task to thrift
try {
routineLoadTaskInfo = needScheduleTasksQueue.take();
} catch (InterruptedException e) {
LOG.warn("Taking routine load task from queue has been interrupted with error msg {}",
e.getMessage());
return;
}
TRoutineLoadTask tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask();
// remove task for needScheduleTasksList in job
routineLoadJob.removeNeedScheduleTask(routineLoadTaskInfo);
routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis());
@ -123,7 +145,6 @@ public class RoutineLoadTaskScheduler extends Daemon {
LOG.info("{} tasks have bean allocated to be.", scheduledTaskNum);
}
// todo: change to batch submit and reuse client
private void submitBatchTask(Map<Long, List<TRoutineLoadTask>> beIdToRoutineLoadTask) {
for (Map.Entry<Long, List<TRoutineLoadTask>> entry : beIdToRoutineLoadTask.entrySet()) {
Backend backend = Catalog.getCurrentSystemInfo().getBackend(entry.getKey());
@ -133,7 +154,6 @@ public class RoutineLoadTaskScheduler extends Daemon {
try {
client = ClientPool.backendPool.borrowObject(address);
client.submit_routine_load_task(entry.getValue());
ok = true;
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backend.getId(), e);

View File

@ -80,6 +80,7 @@ import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.proc.BackendsProcDir;
import org.apache.doris.common.proc.FrontendsProcNode;
@ -801,14 +802,22 @@ public class ShowExecutor {
}
// check auth
String dbFullName;
String tableName;
try {
dbFullName = routineLoadJob.getDbFullName();
tableName = routineLoadJob.getTableName();
} catch (MetaNotFoundException e) {
throw new AnalysisException("The metadata of job has been changed. The job will be cancelled automatically", e);
}
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
routineLoadJob.getDbFullName(),
routineLoadJob.getTableName(),
dbFullName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
routineLoadJob.getTableName());
tableName);
}
// get routine load info

View File

@ -45,6 +45,7 @@ import org.apache.logging.log4j.Logger;
import java.io.StringReader;
import java.util.Map;
import java.util.UUID;
public class StreamLoadTask {
@ -131,19 +132,18 @@ public class StreamLoadTask {
}
}
public static StreamLoadTask fromRoutineLoadTaskInfo(RoutineLoadTaskInfo routineLoadTaskInfo) {
TUniqueId queryId = new TUniqueId(routineLoadTaskInfo.getId().getMostSignificantBits(),
routineLoadTaskInfo.getId().getLeastSignificantBits());
StreamLoadTask streamLoadTask = new StreamLoadTask(queryId, routineLoadTaskInfo.getTxnId(),
// the taskId and txnId is faked
public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) {
UUID taskId = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(),
taskId.getLeastSignificantBits());
StreamLoadTask streamLoadTask = new StreamLoadTask(queryId, -1L,
TFileType.FILE_STREAM, TFileFormatType.FORMAT_CSV_PLAIN);
RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager();
streamLoadTask.setOptionalFromRoutineLoadTaskInfo(routineLoadTaskInfo,
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()));
streamLoadTask.setOptionalFromRoutineLoadJob(routineLoadJob);
return streamLoadTask;
}
private void setOptionalFromRoutineLoadTaskInfo(RoutineLoadTaskInfo routineLoadTaskInfo,
RoutineLoadJob routineLoadJob) {
private void setOptionalFromRoutineLoadJob(RoutineLoadJob routineLoadJob) {
if (routineLoadJob.getRoutineLoadDesc() != null) {
RoutineLoadDesc routineLoadDesc = routineLoadJob.getRoutineLoadDesc();
if (routineLoadDesc.getColumnsInfo() != null) {

View File

@ -317,6 +317,8 @@ public class TransactionState implements Writable {
case ABORTED:
txnStateChangeListener.beforeAborted(this, txnStatusChangeReason);
break;
default:
break;
}
}
@ -426,10 +428,6 @@ public class TransactionState implements Writable {
return System.currentTimeMillis() - publishVersionTime > timeoutMillis;
}
public void setTxnStateChangeListener(TxnStateChangeListener txnStateChangeListener) {
this.txnStateChangeListener = txnStateChangeListener;
}
public TxnStateChangeListener getTxnStateChangeListener() {
return txnStateChangeListener;
}

View File

@ -63,7 +63,7 @@ public class CreateRoutineLoadStmtTest {
Map<String, String> customProperties = Maps.newHashMap();
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName);
customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress);
customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress);
customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString);
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName,
@ -109,7 +109,7 @@ public class CreateRoutineLoadStmtTest {
Map<String, String> customProperties = Maps.newHashMap();
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName);
customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress);
customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress);
customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString);
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName,
@ -129,7 +129,7 @@ public class CreateRoutineLoadStmtTest {
Assert.assertEquals(partitionNames.getPartitionNames(), createRoutineLoadStmt.getRoutineLoadDesc().getPartitionNames());
Assert.assertEquals(2, createRoutineLoadStmt.getDesiredConcurrentNum());
Assert.assertEquals(0, createRoutineLoadStmt.getMaxErrorNum());
Assert.assertEquals(serverAddress, createRoutineLoadStmt.getKafkaEndpoint());
Assert.assertEquals(serverAddress, createRoutineLoadStmt.getKafkaBrokerList());
Assert.assertEquals(topicName, createRoutineLoadStmt.getKafkaTopic());
Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(createRoutineLoadStmt.getKafkaPartitions()));
}

View File

@ -367,7 +367,7 @@ public class KafkaRoutineLoadJobTest {
Map<String, String> customProperties = Maps.newHashMap();
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName);
customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress);
customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress);
customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString);
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName,

View File

@ -76,7 +76,7 @@ public class RoutineLoadManagerTest {
String topicName = "topic1";
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName);
String serverAddress = "http://127.0.0.1:8080";
customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress);
customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress);
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName,
loadPropertyList, properties,
typeName, customProperties);
@ -142,7 +142,7 @@ public class RoutineLoadManagerTest {
String topicName = "topic1";
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName);
String serverAddress = "http://127.0.0.1:8080";
customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress);
customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress);
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName,
loadPropertyList, properties,
typeName, customProperties);
@ -236,7 +236,7 @@ public class RoutineLoadManagerTest {
new Expectations() {
{
systemInfoService.getBackendIds(true);
systemInfoService.getClusterBackendIds(anyString, true);
result = beIds;
Catalog.getCurrentSystemInfo();
result = systemInfoService;
@ -245,7 +245,7 @@ public class RoutineLoadManagerTest {
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
routineLoadManager.addNumOfConcurrentTasksByBeId(1L);
Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId());
Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId("default"));
}
@Test

View File

@ -77,7 +77,7 @@ public class RoutineLoadSchedulerTest {
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L,
1L, routineLoadDesc ,3, 0,
"", "", new KafkaProgress());
routineLoadJob.setState(RoutineLoadJob.JobState.NEED_SCHEDULE);
Deencapsulation.setField(routineLoadJob,"state", RoutineLoadJob.JobState.NEED_SCHEDULE);
List<RoutineLoadJob> routineLoadJobList = new ArrayList<>();
routineLoadJobList.add(routineLoadJob);

View File

@ -113,7 +113,7 @@ public class RoutineLoadTaskSchedulerTest {
routineLoadManager.getNeedScheduleTasksQueue();
result = routineLoadTaskInfoQueue;
routineLoadManager.getMinTaskBeId();
routineLoadManager.getMinTaskBeId(anyString);
result = beId;
routineLoadManager.getJobByTaskId(anyString);
result = kafkaRoutineLoadJob1;

View File

@ -321,7 +321,7 @@ public class GlobalTransactionMgrTest {
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1");
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
transactionState.setTxnStateChangeListener(routineLoadJob);
Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob);
Map<Long, TransactionState> idToTransactionState = Maps.newHashMap();
idToTransactionState.put(1L, transactionState);
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
@ -330,7 +330,7 @@ public class GlobalTransactionMgrTest {
KafkaProgress oldkafkaProgress = new KafkaProgress();
oldkafkaProgress.setPartitionIdToOffset(oldKafkaProgressMap);
Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress);
routineLoadJob.setState(RoutineLoadJob.JobState.RUNNING);
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING);
TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = new TRLTaskTxnCommitAttachment();
rlTaskTxnCommitAttachment.setId(new TUniqueId());
@ -395,7 +395,7 @@ public class GlobalTransactionMgrTest {
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1");
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
transactionState.setTxnStateChangeListener(routineLoadJob);
Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob);
Map<Long, TransactionState> idToTransactionState = Maps.newHashMap();
idToTransactionState.put(1L, transactionState);
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
@ -404,7 +404,7 @@ public class GlobalTransactionMgrTest {
KafkaProgress oldkafkaProgress = new KafkaProgress();
oldkafkaProgress.setPartitionIdToOffset(oldKafkaProgressMap);
Deencapsulation.setField(routineLoadJob, "progress", oldkafkaProgress);
routineLoadJob.setState(RoutineLoadJob.JobState.RUNNING);
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING);
TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = new TRLTaskTxnCommitAttachment();
rlTaskTxnCommitAttachment.setId(new TUniqueId());